Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/assets/database/queries/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from app.assets.database.queries.asset import (
asset_exists_by_hash,
bulk_insert_assets,
create_stub_asset,
get_asset_by_hash,
get_existing_asset_ids,
reassign_asset_references,
Expand All @@ -12,6 +13,7 @@
UnenrichedReferenceRow,
bulk_insert_references_ignore_conflicts,
bulk_update_enrichment_level,
count_active_siblings,
bulk_update_is_missing,
bulk_update_needs_verify,
convert_metadata_to_rows,
Expand Down Expand Up @@ -80,6 +82,8 @@
"bulk_insert_references_ignore_conflicts",
"bulk_insert_tags_and_meta",
"bulk_update_enrichment_level",
"count_active_siblings",
"create_stub_asset",
"bulk_update_is_missing",
"bulk_update_needs_verify",
"convert_metadata_to_rows",
Expand Down
12 changes: 12 additions & 0 deletions app/assets/database/queries/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ def upsert_asset(
return asset, created, updated


def create_stub_asset(
session: Session,
size_bytes: int,
mime_type: str | None = None,
) -> Asset:
"""Create a new asset with no hash (stub for later enrichment)."""
asset = Asset(size_bytes=size_bytes, mime_type=mime_type, hash=None)
session.add(asset)
session.flush()
return asset


def bulk_insert_assets(
session: Session,
rows: list[dict],
Expand Down
17 changes: 17 additions & 0 deletions app/assets/database/queries/asset_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,23 @@ def get_reference_by_file_path(
)


def count_active_siblings(
session: Session,
asset_id: str,
exclude_reference_id: str,
) -> int:
"""Count active (non-deleted) references to an asset, excluding one reference."""
return (
session.query(AssetReference)
.filter(
AssetReference.asset_id == asset_id,
AssetReference.id != exclude_reference_id,
AssetReference.deleted_at.is_(None),
)
.count()
)


def reference_exists_for_asset_id(
session: Session,
asset_id: str,
Expand Down
15 changes: 15 additions & 0 deletions app/assets/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
delete_references_by_ids,
ensure_tags_exist,
get_asset_by_hash,
get_reference_by_id,
get_references_for_prefixes,
get_unenriched_references,
mark_references_missing_outside_prefixes,
Expand Down Expand Up @@ -338,6 +339,7 @@ def build_asset_specs(
"metadata": metadata,
"hash": asset_hash,
"mime_type": mime_type,
"job_id": None,
}
)
tag_pool.update(tags)
Expand Down Expand Up @@ -426,6 +428,7 @@ def enrich_asset(
except OSError:
return new_level

initial_mtime_ns = get_mtime_ns(stat_p)
rel_fname = compute_relative_filename(file_path)
mime_type: str | None = None
metadata = None
Expand Down Expand Up @@ -489,6 +492,18 @@ def enrich_asset(
except Exception as e:
logging.warning("Failed to hash %s: %s", file_path, e)

# Optimistic guard: if the reference's mtime_ns changed since we
# started (e.g. ingest_existing_file updated it), our results are
# stale — discard them to avoid overwriting fresh registration data.
ref = get_reference_by_id(session, reference_id)
if ref is None or ref.mtime_ns != initial_mtime_ns:
session.rollback()
logging.info(
"Ref %s mtime changed during enrichment, discarding stale result",
reference_id,
)
return ENRICHMENT_STUB

if extract_metadata and metadata:
system_metadata = metadata.to_user_metadata()
set_reference_system_metadata(session, reference_id, system_metadata)
Expand Down
66 changes: 59 additions & 7 deletions app/assets/seeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class _AssetSeeder:
"""

def __init__(self) -> None:
self._lock = threading.Lock()
# RLock is required because _run_scan() drains pending work while
# holding _lock and re-enters start() which also acquires _lock.
self._lock = threading.RLock()
self._state = State.IDLE
self._progress: Progress | None = None
self._last_progress: Progress | None = None
Expand All @@ -92,6 +94,7 @@ def __init__(self) -> None:
self._prune_first: bool = False
self._progress_callback: ProgressCallback | None = None
self._disabled: bool = False
self._pending_enrich: dict | None = None

def disable(self) -> None:
"""Disable the asset seeder, preventing any scans from starting."""
Expand Down Expand Up @@ -196,6 +199,42 @@ def start_enrich(
compute_hashes=compute_hashes,
)

def enqueue_enrich(
self,
roots: tuple[RootType, ...] = ("models", "input", "output"),
compute_hashes: bool = False,
) -> bool:
"""Start an enrichment scan now, or queue it for after the current scan.
If the seeder is idle, starts immediately. Otherwise, the enrich
request is stored and will run automatically when the current scan
finishes.
Args:
roots: Tuple of root types to scan
compute_hashes: If True, compute blake3 hashes
Returns:
True if started immediately, False if queued for later
"""
with self._lock:
if self.start_enrich(roots=roots, compute_hashes=compute_hashes):
return True
if self._pending_enrich is not None:
existing_roots = set(self._pending_enrich["roots"])
existing_roots.update(roots)
self._pending_enrich["roots"] = tuple(existing_roots)
self._pending_enrich["compute_hashes"] = (
self._pending_enrich["compute_hashes"] or compute_hashes
)
else:
self._pending_enrich = {
"roots": roots,
"compute_hashes": compute_hashes,
}
logging.info("Enrich scan queued (roots=%s)", self._pending_enrich["roots"])
return False

def cancel(self) -> bool:
"""Request cancellation of the current scan.
Expand Down Expand Up @@ -381,9 +420,13 @@ def mark_missing_outside_prefixes(self) -> int:
return marked
finally:
with self._lock:
self._last_progress = self._progress
self._state = State.IDLE
self._progress = None
self._reset_to_idle()

def _reset_to_idle(self) -> None:
"""Reset state to IDLE, preserving last progress. Caller must hold _lock."""
self._last_progress = self._progress
self._state = State.IDLE
self._progress = None

def _is_cancelled(self) -> bool:
"""Check if cancellation has been requested."""
Expand Down Expand Up @@ -594,9 +637,18 @@ def _run_scan(self) -> None:
},
)
with self._lock:
self._last_progress = self._progress
self._state = State.IDLE
self._progress = None
self._reset_to_idle()
pending = self._pending_enrich
if pending is not None:
self._pending_enrich = None
if not self.start_enrich(
roots=pending["roots"],
compute_hashes=pending["compute_hashes"],
):
logging.warning(
"Pending enrich scan could not start (roots=%s)",
pending["roots"],
)

def _run_fast_phase(self, roots: tuple[RootType, ...]) -> tuple[int, int, int]:
"""Run phase 1: fast scan to create stub records.
Expand Down
4 changes: 4 additions & 0 deletions app/assets/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
DependencyMissingError,
HashMismatchError,
create_from_hash,
ingest_existing_file,
register_output_files,
upload_from_temp_path,
)
from app.assets.database.queries import (
Expand Down Expand Up @@ -72,6 +74,8 @@
"delete_asset_reference",
"get_asset_by_hash",
"get_asset_detail",
"ingest_existing_file",
"register_output_files",
"get_mtime_ns",
"get_size_and_mtime_ns",
"list_assets_page",
Expand Down
3 changes: 3 additions & 0 deletions app/assets/services/bulk_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SeedAssetSpec(TypedDict):
metadata: ExtractedMetadata | None
hash: str | None
mime_type: str | None
job_id: str | None


class AssetRow(TypedDict):
Expand All @@ -60,6 +61,7 @@ class ReferenceRow(TypedDict):
name: str
preview_id: str | None
user_metadata: dict[str, Any] | None
job_id: str | None
created_at: datetime
updated_at: datetime
last_access_time: datetime
Expand Down Expand Up @@ -167,6 +169,7 @@ def batch_insert_seed_assets(
"name": spec["info_name"],
"preview_id": None,
"user_metadata": user_metadata,
"job_id": spec.get("job_id"),
"created_at": current_time,
"updated_at": current_time,
"last_access_time": current_time,
Expand Down
102 changes: 101 additions & 1 deletion app/assets/services/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import app.assets.services.hashing as hashing
from app.assets.database.queries import (
add_tags_to_reference,
count_active_siblings,
create_stub_asset,
ensure_tags_exist,
fetch_reference_and_asset,
get_asset_by_hash,
get_reference_by_file_path,
Expand All @@ -23,7 +26,8 @@
upsert_reference,
validate_tags_exist,
)
from app.assets.helpers import normalize_tags
from app.assets.helpers import get_utc_now, normalize_tags
from app.assets.services.bulk_ingest import batch_insert_seed_assets
from app.assets.services.file_utils import get_size_and_mtime_ns
from app.assets.services.path_utils import (
compute_relative_filename,
Expand Down Expand Up @@ -130,6 +134,102 @@ def _ingest_file_from_path(
)


def register_output_files(
file_paths: Sequence[str],
user_metadata: UserMetadata = None,
job_id: str | None = None,
) -> int:
"""Register a batch of output file paths as assets.
Returns the number of files successfully registered.
"""
registered = 0
for abs_path in file_paths:
if not os.path.isfile(abs_path):
continue
try:
if ingest_existing_file(
abs_path, user_metadata=user_metadata, job_id=job_id
):
registered += 1
except Exception:
logging.exception("Failed to register output: %s", abs_path)
return registered


def ingest_existing_file(
abs_path: str,
user_metadata: UserMetadata = None,
extra_tags: Sequence[str] = (),
owner_id: str = "",
job_id: str | None = None,
) -> bool:
"""Register an existing on-disk file as an asset stub.
If a reference already exists for this path, updates mtime_ns, job_id,
size_bytes, and resets enrichment so the enricher will re-hash it.
For brand-new paths, inserts a stub record (hash=NULL) for immediate
UX visibility.
Returns True if a row was inserted or updated, False otherwise.
"""
locator = os.path.abspath(abs_path)
size_bytes, mtime_ns = get_size_and_mtime_ns(abs_path)
mime_type = mimetypes.guess_type(abs_path, strict=False)[0]
name, path_tags = get_name_and_tags_from_asset_path(abs_path)
tags = list(dict.fromkeys(path_tags + list(extra_tags)))

with create_session() as session:
existing_ref = get_reference_by_file_path(session, locator)
if existing_ref is not None:
now = get_utc_now()
existing_ref.mtime_ns = mtime_ns
existing_ref.job_id = job_id
existing_ref.is_missing = False
existing_ref.deleted_at = None
existing_ref.updated_at = now
existing_ref.enrichment_level = 0

asset = existing_ref.asset
if asset:
# If other refs share this asset, detach to a new stub
# instead of mutating the shared row.
siblings = count_active_siblings(session, asset.id, existing_ref.id)
if siblings > 0:
new_asset = create_stub_asset(
session,
size_bytes=size_bytes,
mime_type=mime_type or asset.mime_type,
)
existing_ref.asset_id = new_asset.id
else:
asset.hash = None
asset.size_bytes = size_bytes
if mime_type:
asset.mime_type = mime_type
session.commit()
return True

spec = {
"abs_path": abs_path,
"size_bytes": size_bytes,
"mtime_ns": mtime_ns,
"info_name": name,
"tags": tags,
"fname": os.path.basename(abs_path),
"metadata": None,
"hash": None,
"mime_type": mime_type,
"job_id": job_id,
}
if tags:
ensure_tags_exist(session, tags)
result = batch_insert_seed_assets(session, [spec], owner_id=owner_id)
session.commit()
return result.won_paths > 0


def _register_existing_asset(
asset_hash: str,
name: str,
Expand Down
1 change: 1 addition & 0 deletions comfy/model_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class CPUState(Enum):

# Training Related State
in_training = False
training_fp8_bwd = False


def get_supported_float8_types():
Expand Down
Loading
Loading