Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ jobs:
run: |
.venv/bin/pytest tests/test_saved_searches_sync.py -v

- name: Run saved searches bundle test
run: |
.venv/bin/pytest tests/test_saved_searches_bundle.py -v

- name: Run Cypher syntax tests
run: |
.venv/bin/pytest tests/test_cypher_syntax.py -v

- name: Run extension validation tests
run: |
.venv/bin/pytest tests/test_extensions_format.py -v
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ dev = [
"types-pyyaml>=6.0.12.20250915",
"types-requests>=2.33.0.20260408",
"httpx>=0.28.1",
"antlr4-python3-runtime>=4.13.2",
]
90 changes: 69 additions & 21 deletions src/openhound/cli/saved_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from typing import Annotated

import typer
from rich.console import Console

from openhound.core.models.saved_search import QueryBundle
from openhound.core.progress import Progress
from openhound.core.saved_searches import SavedSearches, Strategy

Expand All @@ -15,30 +17,35 @@ class Format(str, Enum):
yaml = "yaml"


class OutputFormat(str, Enum):
json = "json"
zip = "zip"


@saved_searches.command(help="Upload saved searches to BloodHound")
def upload(
path: Annotated[
Path,
typer.Argument(
exists=True,
file_okay=False,
dir_okay=True,
readable=True,
resolve_path=True,
help="Directory where saved searches are located",
path: Annotated[
Path,
typer.Argument(
exists=True,
file_okay=False,
dir_okay=True,
readable=True,
resolve_path=True,
help="Directory where saved searches are located",
),
],
file_format: Format = typer.Option(
default=Format.json,
help="File format of the saved searches (json or yaml)",
),
strategy: Strategy = typer.Option(
default=Strategy.skip,
help="Skip or overwrite saved search if the name already exists",
),
progress: Progress = typer.Option(
Progress.tqdm, help="Select progress tracker option"
),
],
file_format: Format = typer.Option(
default=Format.json,
help="File format of the saved searches (json or yaml)",
),
strategy: Strategy = typer.Option(
default=Strategy.skip,
help="Skip or overwrite saved search if the name already exists",
),
progress: Progress = typer.Option(
Progress.tqdm, help="Select progress tracker option"
),
):
search_files = (
path.rglob("**/*.json")
Expand All @@ -48,3 +55,44 @@ def upload(
pipeline = SavedSearches(progress=progress, strategy=strategy)
results = pipeline.run(list(search_files), file_format=file_format)
return results


@saved_searches.command(help="Create a single saved searches json/zip bundle")
def bundle(
path: Annotated[
Path,
typer.Argument(
exists=True,
file_okay=False,
dir_okay=True,
readable=True,
resolve_path=True,
help="Directory where saved searches are located",
),
],
output_path: Annotated[typer.FileTextWrite, typer.Argument(
help="Output file path for the generated saved-search bundle (including filename)")],
file_format: Format = typer.Option(
default=Format.json,
help="File format of the saved searches (json or yaml)",
),
output_format: OutputFormat = typer.Option(
default=OutputFormat.json,
help="File format for the saved searches bundle (json or zip)",
)
):
search_files = list(
path.rglob("**/*.json")
if file_format == Format.json
else path.rglob("**/*.yaml")
)

bundle_object = QueryBundle.from_paths(search_files, file_format=file_format)
bundle_object.save(output_path, output_format=output_format)

console = Console()
console.print("[bold green]Saved-search bundle created[/bold green]")
console.print(f"[bold magenta]Saved searches:[/bold magenta] {len(bundle_object.queries)}")
console.print(
f"[bold magenta]Output path:[/bold magenta] [italic]{Path(output_path.name).resolve()}[/italic]"
)
59 changes: 57 additions & 2 deletions src/openhound/core/models/saved_search.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import json
import zipfile
from enum import Enum
from io import TextIOWrapper
from pathlib import Path
from typing import Optional, Union

from pydantic import BaseModel, ConfigDict, field_validator
from yaml import safe_load


class Format(str, Enum):
json = "json"
yaml = "yaml"


class OutputFormat(str, Enum):
json = "json"
zip = "zip"


class SavedSearch(BaseModel):
model_config = ConfigDict(extra="forbid")
# Required for an extension
Expand All @@ -14,7 +27,7 @@ class SavedSearch(BaseModel):
query: str

@classmethod
def from_json(cls, file_path: Path) -> "SavedSearch":
def from_file(cls, file_path: Path) -> "SavedSearch":
with open(file_path, "r") as file_object:
json_object = json.loads(file_object.read())
return cls(**json_object)
Expand Down Expand Up @@ -52,7 +65,49 @@ def acknowledgementsis_list(cls, value: str | list[str]) -> list[str]:
return value if isinstance(value, list) else [value]

@classmethod
def from_yaml(cls, file_path: Path) -> "SavedSearchExtended":
def from_file(cls, file_path: Path) -> "SavedSearchExtended":
with open(file_path, "r") as file_object:
yaml_object = safe_load(file_object.read())
return cls(**yaml_object)


class QueryBundle:
def __init__(self, queries: list[SavedSearchExtended | SavedSearch], file_format: Format = Format.json) -> None:
self.queries = queries
self.file_format = file_format

@classmethod
def from_paths(cls, all_files: list[Path], file_format: Format = Format.json) -> "QueryBundle":
model_choices = {
'yaml': SavedSearchExtended,
'json': SavedSearch,
}

queries = [
model_choices[file_format].from_file(cypher_query) for cypher_query in
all_files
]
return cls(queries, file_format)

def _to_json(self, output_file: TextIOWrapper) -> None:
all_objects = [query.model_dump() for query in self.queries]
output_file.write(json.dumps(all_objects, indent=2))

def _to_zip(self, output_file: TextIOWrapper) -> None:
with zipfile.ZipFile(
file=output_file.name,
mode="w",
compression=zipfile.ZIP_DEFLATED,
compresslevel=9,
) as archive:
for query in self.queries:
archive.writestr(
zinfo_or_arcname=f"{query.name}.json",
data=query.model_dump_json().encode(),
)

def save(self, output_file: TextIOWrapper, output_format: OutputFormat = OutputFormat.json) -> None:
if output_format == OutputFormat.json:
self._to_json(output_file)
elif output_format == OutputFormat.zip:
self._to_zip(output_file)
Loading