Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
test_example/**
.mcp.json
86 changes: 77 additions & 9 deletions qualytics/cli/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,30 @@ def checks_create(
failed = 0
total = len(items)

# Build reverse lookup (id → name) for validation when container_id is provided
id_to_name = {v: k for k, v in table_ids.items()}

for i, check in enumerate(items, 1):
container_name = check.get("container", "")
container_id = table_ids.get(container_name)
if container_id is None:
print(
f"[red]({i}/{total}) Container '{container_name}' not found "
f"in datastore {datastore_id}. Skipping.[/red]"
)
failed += 1
continue
# Support both container name (portable format) and container_id (API format)
container_id = check.get("container_id")
if container_id is not None:
if container_id not in id_to_name:
print(
f"[red]({i}/{total}) Container ID {container_id} not found "
f"in datastore {datastore_id}. Skipping.[/red]"
)
failed += 1
continue
else:
container_name = check.get("container", "")
container_id = table_ids.get(container_name)
if container_id is None:
print(
f"[red]({i}/{total}) Container '{container_name}' not found "
f"in datastore {datastore_id}. Skipping.[/red]"
)
failed += 1
continue
try:
payload = _build_create_payload(check, container_id)
result = create_quality_check(client, payload)
Expand Down Expand Up @@ -245,6 +259,60 @@ def checks_delete(
print(f"[green]{action} {len(id_list)} quality checks.[/green]")


# ── Activate (restore archived checks) ────────────────────────────────────


@checks_app.command("activate")
def checks_activate(
check_id: int | None = typer.Option(
None, "--id", help="Single quality check ID to activate"
),
ids: str | None = typer.Option(
None, "--ids", help='Comma-separated check IDs. Example: "1,2,3"'
),
):
"""Activate (unarchive) quality check(s)."""
if not check_id and not ids:
print("[red]Must specify --id or --ids.[/red]")
raise typer.Exit(code=1)

client = get_client()

id_list: list[int] = []
if check_id:
id_list.append(check_id)
if ids:
id_list.extend(int(x) for x in _parse_comma_list(ids))

activated = 0
failed = 0
total = len(id_list)

for i, cid in enumerate(id_list, 1):
try:
existing = get_quality_check(client, cid)
payload = {
"description": existing.get("description", ""),
"fields": [f["name"] for f in existing.get("fields", [])],
"coverage": existing.get("coverage"),
"filter": existing.get("filter"),
"properties": existing.get("properties") or {},
"tags": [t["name"] for t in existing.get("global_tags", [])],
"additional_metadata": existing.get("additional_metadata") or {},
"status": "Active",
}
update_quality_check(client, cid, payload)
print(f"[green]({i}/{total}) Activated quality check {cid}.[/green]")
activated += 1
except QualyticsAPIError as e:
print(
f"[red]({i}/{total}) Failed to activate check {cid}: {e.message}[/red]"
)
failed += 1

print(f"\n[bold]Activated {activated}, failed {failed} of {total} checks.[/bold]")


# ── Export (git-friendly, directory-based) ────────────────────────────────


Expand Down
13 changes: 13 additions & 0 deletions qualytics/cli/computed_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
create_container as api_create_container,
get_field_profiles as api_get_field_profiles,
list_containers_listing,
update_container as api_update_container,
)
from ..services.containers import build_update_container_payload
from ..api.operations import get_operation, list_operations
from ..api.quality_checks import create_quality_check
from ..config import BASE_PATH
Expand Down Expand Up @@ -496,6 +498,17 @@ def _create_computed_table(

_debug_log(f"Created computed table: {name} (ID: {result.get('id')})")

# The create endpoint ignores description — apply via a follow-up PUT
if description:
try:
update_payload = build_update_container_payload(
result, description=description
)
result = api_update_container(client, result["id"], update_payload)
_debug_log(f"Applied description to computed table: {name}")
except QualyticsAPIError as e:
_debug_log(f"Warning: Failed to apply description to '{name}': {e}")

log_file = _write_debug_log(
log_type="computed_table",
name=name,
Expand Down
14 changes: 13 additions & 1 deletion qualytics/cli/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,20 @@ def containers_create(

client = get_client()
result = create_container(client, payload)

# The create endpoint ignores description and tags — apply via a follow-up PUT
container_id = result["id"]
needs_update = {}
if description is not None:
needs_update["description"] = description
if tag_list is not None:
needs_update["tags"] = tag_list
if needs_update:
update_payload = build_update_container_payload(result, **needs_update)
result = update_container(client, container_id, update_payload)

print("[green]Container created successfully![/green]")
print(f"[green]Container ID: {result.get('id')}[/green]")
print(f"[green]Container ID: {container_id}[/green]")
print(f"[green]Container Name: {result.get('name')}[/green]")
print(f"[green]Container Type: {result.get('container_type')}[/green]")

Expand Down
16 changes: 13 additions & 3 deletions qualytics/cli/datastores.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..services.datastores import (
build_create_datastore_payload,
build_update_datastore_payload,
flatten_datastore_for_put,
get_datastore_by,
)
from ..utils import OutputFormat, format_for_display, redact_payload
Expand Down Expand Up @@ -50,10 +51,12 @@ def datastores_create(
None,
"--connection-name",
"-cn",
help="Name of an existing connection in Qualytics",
help="Name of an existing connection in Qualytics (required if --connection-id is not set)",
),
connection_id: int | None = typer.Option(
None, "--connection-id", help="Existing connection id to reference"
None,
"--connection-id",
help="Existing connection ID (required if --connection-name is not set)",
),
database: str = typer.Option(
...,
Expand Down Expand Up @@ -248,7 +251,14 @@ def datastores_update(
raise typer.Exit(code=1)

client = get_client()
result = update_datastore(client, datastore_id, payload)

with status("[bold cyan]Fetching current datastore...[/bold cyan]"):
current = get_datastore(client, datastore_id)

# PUT requires a full payload — merge user changes on top of current state
full_payload = {**flatten_datastore_for_put(current), **payload}

result = update_datastore(client, datastore_id, full_payload)
print(f"[green]Datastore {datastore_id} updated successfully.[/green]")
print(format_for_display(result, fmt))

Expand Down
8 changes: 6 additions & 2 deletions qualytics/cli/export_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,17 @@ def config_import(
for resource in (
"connections",
"datastores",
"catalog",
"containers",
"computed_fields",
"checks",
):
if include_set is None or resource in include_set:
# Catalog/sync is always shown (it's automatic), others depend on include
if resource == "catalog" or include_set is None or resource in include_set:
r = result[resource]
label = resource.replace("_", " ").title()
label = (
"Sync" if resource == "catalog" else resource.replace("_", " ").title()
)
table.add_row(
label,
str(r["created"]),
Expand Down
116 changes: 64 additions & 52 deletions qualytics/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,21 +633,24 @@ def run_catalog(
datastore_ids: list[int],
prune: bool = False,
recreate: bool = False,
) -> dict:
) -> list[dict]:
"""Trigger a catalog operation to discover containers in datastores.

Returns the operation details. Use get_operation to check progress.
Returns the operation details for each datastore. Use get_operation to check progress.
"""
from ..api.operations import run_operation

client = _client()
payload = {
"type": "catalog",
"datastore_ids": datastore_ids,
"prune": prune,
"recreate": recreate,
}
return _api_call(run_operation, client, payload)
results = []
for ds_id in datastore_ids:
payload = {
"type": "catalog",
"datastore_id": ds_id,
"prune": prune,
"recreate": recreate,
}
results.append(_api_call(run_operation, client, payload))
return results


@mcp.tool
Expand All @@ -656,27 +659,30 @@ def run_profile(
container_names: list[str] | None = None,
container_tags: list[str] | None = None,
max_records_analyzed_per_partition: int | None = None,
) -> dict:
) -> list[dict]:
"""Trigger a profile operation to infer quality checks.

Returns the operation details. Use get_operation to check progress.
Returns the operation details for each datastore. Use get_operation to check progress.
"""
from ..api.operations import run_operation

client = _client()
payload: dict = {
"type": "profile",
"datastore_ids": datastore_ids,
}
if container_names:
payload["container_names"] = container_names
if container_tags:
payload["container_tags"] = container_tags
if max_records_analyzed_per_partition is not None:
payload["max_records_analyzed_per_partition"] = (
max_records_analyzed_per_partition
)
return _api_call(run_operation, client, payload)
results = []
for ds_id in datastore_ids:
payload: dict = {
"type": "profile",
"datastore_id": ds_id,
}
if container_names:
payload["container_names"] = container_names
if container_tags:
payload["container_tags"] = container_tags
if max_records_analyzed_per_partition is not None:
payload["max_records_analyzed_per_partition"] = (
max_records_analyzed_per_partition
)
results.append(_api_call(run_operation, client, payload))
return results


@mcp.tool
Expand All @@ -686,53 +692,59 @@ def run_scan(
container_tags: list[str] | None = None,
incremental: bool | None = None,
max_records_analyzed_per_partition: int | None = None,
) -> dict:
) -> list[dict]:
"""Trigger a scan operation to detect anomalies.

Returns the operation details. Use get_operation to check progress.
Returns the operation details for each datastore. Use get_operation to check progress.
"""
from ..api.operations import run_operation

client = _client()
payload: dict = {
"type": "scan",
"datastore_ids": datastore_ids,
}
if container_names:
payload["container_names"] = container_names
if container_tags:
payload["container_tags"] = container_tags
if incremental is not None:
payload["incremental"] = incremental
if max_records_analyzed_per_partition is not None:
payload["max_records_analyzed_per_partition"] = (
max_records_analyzed_per_partition
)
return _api_call(run_operation, client, payload)
results = []
for ds_id in datastore_ids:
payload: dict = {
"type": "scan",
"datastore_id": ds_id,
}
if container_names:
payload["container_names"] = container_names
if container_tags:
payload["container_tags"] = container_tags
if incremental is not None:
payload["incremental"] = incremental
if max_records_analyzed_per_partition is not None:
payload["max_records_analyzed_per_partition"] = (
max_records_analyzed_per_partition
)
results.append(_api_call(run_operation, client, payload))
return results


@mcp.tool
def run_materialize(
datastore_ids: list[int],
container_names: list[str] | None = None,
container_tags: list[str] | None = None,
) -> dict:
) -> list[dict]:
"""Trigger a materialize operation for computed containers.

Returns the operation details. Use get_operation to check progress.
Returns the operation details for each datastore. Use get_operation to check progress.
"""
from ..api.operations import run_operation

client = _client()
payload: dict = {
"type": "materialize",
"datastore_ids": datastore_ids,
}
if container_names:
payload["container_names"] = container_names
if container_tags:
payload["container_tags"] = container_tags
return _api_call(run_operation, client, payload)
results = []
for ds_id in datastore_ids:
payload: dict = {
"type": "materialize",
"datastore_id": ds_id,
}
if container_names:
payload["container_names"] = container_names
if container_tags:
payload["container_tags"] = container_tags
results.append(_api_call(run_operation, client, payload))
return results


@mcp.tool
Expand Down
5 changes: 5 additions & 0 deletions qualytics/services/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ def build_create_container_payload(
if group_by_clause is not None:
payload["group_by_clause"] = group_by_clause

if description is not None:
payload["description"] = description
if tags is not None:
payload["tags"] = tags

return payload


Expand Down
Loading
Loading