Skip to content

Commit 7d5534d

Browse files
feat(assets): register output files as assets after prompt execution (Comfy-Org#12812)
1 parent 5ebb0c2 commit 7d5534d

File tree

14 files changed

+764
-14
lines changed

14 files changed

+764
-14
lines changed

app/assets/database/queries/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from app.assets.database.queries.asset import (
22
asset_exists_by_hash,
33
bulk_insert_assets,
4+
create_stub_asset,
45
get_asset_by_hash,
56
get_existing_asset_ids,
67
reassign_asset_references,
@@ -12,6 +13,7 @@
1213
UnenrichedReferenceRow,
1314
bulk_insert_references_ignore_conflicts,
1415
bulk_update_enrichment_level,
16+
count_active_siblings,
1517
bulk_update_is_missing,
1618
bulk_update_needs_verify,
1719
convert_metadata_to_rows,
@@ -80,6 +82,8 @@
8082
"bulk_insert_references_ignore_conflicts",
8183
"bulk_insert_tags_and_meta",
8284
"bulk_update_enrichment_level",
85+
"count_active_siblings",
86+
"create_stub_asset",
8387
"bulk_update_is_missing",
8488
"bulk_update_needs_verify",
8589
"convert_metadata_to_rows",

app/assets/database/queries/asset.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,18 @@ def upsert_asset(
7878
return asset, created, updated
7979

8080

81+
def create_stub_asset(
82+
session: Session,
83+
size_bytes: int,
84+
mime_type: str | None = None,
85+
) -> Asset:
86+
"""Create a new asset with no hash (stub for later enrichment)."""
87+
asset = Asset(size_bytes=size_bytes, mime_type=mime_type, hash=None)
88+
session.add(asset)
89+
session.flush()
90+
return asset
91+
92+
8193
def bulk_insert_assets(
8294
session: Session,
8395
rows: list[dict],

app/assets/database/queries/asset_reference.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,23 @@ def get_reference_by_file_path(
114114
)
115115

116116

117+
def count_active_siblings(
118+
session: Session,
119+
asset_id: str,
120+
exclude_reference_id: str,
121+
) -> int:
122+
"""Count active (non-deleted) references to an asset, excluding one reference."""
123+
return (
124+
session.query(AssetReference)
125+
.filter(
126+
AssetReference.asset_id == asset_id,
127+
AssetReference.id != exclude_reference_id,
128+
AssetReference.deleted_at.is_(None),
129+
)
130+
.count()
131+
)
132+
133+
117134
def reference_exists_for_asset_id(
118135
session: Session,
119136
asset_id: str,

app/assets/scanner.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
delete_references_by_ids,
1414
ensure_tags_exist,
1515
get_asset_by_hash,
16+
get_reference_by_id,
1617
get_references_for_prefixes,
1718
get_unenriched_references,
1819
mark_references_missing_outside_prefixes,
@@ -338,6 +339,7 @@ def build_asset_specs(
338339
"metadata": metadata,
339340
"hash": asset_hash,
340341
"mime_type": mime_type,
342+
"job_id": None,
341343
}
342344
)
343345
tag_pool.update(tags)
@@ -426,6 +428,7 @@ def enrich_asset(
426428
except OSError:
427429
return new_level
428430

431+
initial_mtime_ns = get_mtime_ns(stat_p)
429432
rel_fname = compute_relative_filename(file_path)
430433
mime_type: str | None = None
431434
metadata = None
@@ -489,6 +492,18 @@ def enrich_asset(
489492
except Exception as e:
490493
logging.warning("Failed to hash %s: %s", file_path, e)
491494

495+
# Optimistic guard: if the reference's mtime_ns changed since we
496+
# started (e.g. ingest_existing_file updated it), our results are
497+
# stale — discard them to avoid overwriting fresh registration data.
498+
ref = get_reference_by_id(session, reference_id)
499+
if ref is None or ref.mtime_ns != initial_mtime_ns:
500+
session.rollback()
501+
logging.info(
502+
"Ref %s mtime changed during enrichment, discarding stale result",
503+
reference_id,
504+
)
505+
return ENRICHMENT_STUB
506+
492507
if extract_metadata and metadata:
493508
system_metadata = metadata.to_user_metadata()
494509
set_reference_system_metadata(session, reference_id, system_metadata)

app/assets/seeder.py

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ class _AssetSeeder:
7777
"""
7878

7979
def __init__(self) -> None:
80-
self._lock = threading.Lock()
80+
# RLock is required because _run_scan() drains pending work while
81+
# holding _lock and re-enters start() which also acquires _lock.
82+
self._lock = threading.RLock()
8183
self._state = State.IDLE
8284
self._progress: Progress | None = None
8385
self._last_progress: Progress | None = None
@@ -92,6 +94,7 @@ def __init__(self) -> None:
9294
self._prune_first: bool = False
9395
self._progress_callback: ProgressCallback | None = None
9496
self._disabled: bool = False
97+
self._pending_enrich: dict | None = None
9598

9699
def disable(self) -> None:
97100
"""Disable the asset seeder, preventing any scans from starting."""
@@ -196,6 +199,42 @@ def start_enrich(
196199
compute_hashes=compute_hashes,
197200
)
198201

202+
def enqueue_enrich(
203+
self,
204+
roots: tuple[RootType, ...] = ("models", "input", "output"),
205+
compute_hashes: bool = False,
206+
) -> bool:
207+
"""Start an enrichment scan now, or queue it for after the current scan.
208+
209+
If the seeder is idle, starts immediately. Otherwise, the enrich
210+
request is stored and will run automatically when the current scan
211+
finishes.
212+
213+
Args:
214+
roots: Tuple of root types to scan
215+
compute_hashes: If True, compute blake3 hashes
216+
217+
Returns:
218+
True if started immediately, False if queued for later
219+
"""
220+
with self._lock:
221+
if self.start_enrich(roots=roots, compute_hashes=compute_hashes):
222+
return True
223+
if self._pending_enrich is not None:
224+
existing_roots = set(self._pending_enrich["roots"])
225+
existing_roots.update(roots)
226+
self._pending_enrich["roots"] = tuple(existing_roots)
227+
self._pending_enrich["compute_hashes"] = (
228+
self._pending_enrich["compute_hashes"] or compute_hashes
229+
)
230+
else:
231+
self._pending_enrich = {
232+
"roots": roots,
233+
"compute_hashes": compute_hashes,
234+
}
235+
logging.info("Enrich scan queued (roots=%s)", self._pending_enrich["roots"])
236+
return False
237+
199238
def cancel(self) -> bool:
200239
"""Request cancellation of the current scan.
201240
@@ -381,9 +420,13 @@ def mark_missing_outside_prefixes(self) -> int:
381420
return marked
382421
finally:
383422
with self._lock:
384-
self._last_progress = self._progress
385-
self._state = State.IDLE
386-
self._progress = None
423+
self._reset_to_idle()
424+
425+
def _reset_to_idle(self) -> None:
426+
"""Reset state to IDLE, preserving last progress. Caller must hold _lock."""
427+
self._last_progress = self._progress
428+
self._state = State.IDLE
429+
self._progress = None
387430

388431
def _is_cancelled(self) -> bool:
389432
"""Check if cancellation has been requested."""
@@ -594,9 +637,18 @@ def _run_scan(self) -> None:
594637
},
595638
)
596639
with self._lock:
597-
self._last_progress = self._progress
598-
self._state = State.IDLE
599-
self._progress = None
640+
self._reset_to_idle()
641+
pending = self._pending_enrich
642+
if pending is not None:
643+
self._pending_enrich = None
644+
if not self.start_enrich(
645+
roots=pending["roots"],
646+
compute_hashes=pending["compute_hashes"],
647+
):
648+
logging.warning(
649+
"Pending enrich scan could not start (roots=%s)",
650+
pending["roots"],
651+
)
600652

601653
def _run_fast_phase(self, roots: tuple[RootType, ...]) -> tuple[int, int, int]:
602654
"""Run phase 1: fast scan to create stub records.

app/assets/services/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
DependencyMissingError,
2424
HashMismatchError,
2525
create_from_hash,
26+
ingest_existing_file,
27+
register_output_files,
2628
upload_from_temp_path,
2729
)
2830
from app.assets.database.queries import (
@@ -72,6 +74,8 @@
7274
"delete_asset_reference",
7375
"get_asset_by_hash",
7476
"get_asset_detail",
77+
"ingest_existing_file",
78+
"register_output_files",
7579
"get_mtime_ns",
7680
"get_size_and_mtime_ns",
7781
"list_assets_page",

app/assets/services/bulk_ingest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class SeedAssetSpec(TypedDict):
3737
metadata: ExtractedMetadata | None
3838
hash: str | None
3939
mime_type: str | None
40+
job_id: str | None
4041

4142

4243
class AssetRow(TypedDict):
@@ -60,6 +61,7 @@ class ReferenceRow(TypedDict):
6061
name: str
6162
preview_id: str | None
6263
user_metadata: dict[str, Any] | None
64+
job_id: str | None
6365
created_at: datetime
6466
updated_at: datetime
6567
last_access_time: datetime
@@ -167,6 +169,7 @@ def batch_insert_seed_assets(
167169
"name": spec["info_name"],
168170
"preview_id": None,
169171
"user_metadata": user_metadata,
172+
"job_id": spec.get("job_id"),
170173
"created_at": current_time,
171174
"updated_at": current_time,
172175
"last_access_time": current_time,

app/assets/services/ingest.py

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import app.assets.services.hashing as hashing
1010
from app.assets.database.queries import (
1111
add_tags_to_reference,
12+
count_active_siblings,
13+
create_stub_asset,
14+
ensure_tags_exist,
1215
fetch_reference_and_asset,
1316
get_asset_by_hash,
1417
get_reference_by_file_path,
@@ -23,7 +26,8 @@
2326
upsert_reference,
2427
validate_tags_exist,
2528
)
26-
from app.assets.helpers import normalize_tags
29+
from app.assets.helpers import get_utc_now, normalize_tags
30+
from app.assets.services.bulk_ingest import batch_insert_seed_assets
2731
from app.assets.services.file_utils import get_size_and_mtime_ns
2832
from app.assets.services.path_utils import (
2933
compute_relative_filename,
@@ -130,6 +134,102 @@ def _ingest_file_from_path(
130134
)
131135

132136

137+
def register_output_files(
138+
file_paths: Sequence[str],
139+
user_metadata: UserMetadata = None,
140+
job_id: str | None = None,
141+
) -> int:
142+
"""Register a batch of output file paths as assets.
143+
144+
Returns the number of files successfully registered.
145+
"""
146+
registered = 0
147+
for abs_path in file_paths:
148+
if not os.path.isfile(abs_path):
149+
continue
150+
try:
151+
if ingest_existing_file(
152+
abs_path, user_metadata=user_metadata, job_id=job_id
153+
):
154+
registered += 1
155+
except Exception:
156+
logging.exception("Failed to register output: %s", abs_path)
157+
return registered
158+
159+
160+
def ingest_existing_file(
161+
abs_path: str,
162+
user_metadata: UserMetadata = None,
163+
extra_tags: Sequence[str] = (),
164+
owner_id: str = "",
165+
job_id: str | None = None,
166+
) -> bool:
167+
"""Register an existing on-disk file as an asset stub.
168+
169+
If a reference already exists for this path, updates mtime_ns, job_id,
170+
size_bytes, and resets enrichment so the enricher will re-hash it.
171+
172+
For brand-new paths, inserts a stub record (hash=NULL) for immediate
173+
UX visibility.
174+
175+
Returns True if a row was inserted or updated, False otherwise.
176+
"""
177+
locator = os.path.abspath(abs_path)
178+
size_bytes, mtime_ns = get_size_and_mtime_ns(abs_path)
179+
mime_type = mimetypes.guess_type(abs_path, strict=False)[0]
180+
name, path_tags = get_name_and_tags_from_asset_path(abs_path)
181+
tags = list(dict.fromkeys(path_tags + list(extra_tags)))
182+
183+
with create_session() as session:
184+
existing_ref = get_reference_by_file_path(session, locator)
185+
if existing_ref is not None:
186+
now = get_utc_now()
187+
existing_ref.mtime_ns = mtime_ns
188+
existing_ref.job_id = job_id
189+
existing_ref.is_missing = False
190+
existing_ref.deleted_at = None
191+
existing_ref.updated_at = now
192+
existing_ref.enrichment_level = 0
193+
194+
asset = existing_ref.asset
195+
if asset:
196+
# If other refs share this asset, detach to a new stub
197+
# instead of mutating the shared row.
198+
siblings = count_active_siblings(session, asset.id, existing_ref.id)
199+
if siblings > 0:
200+
new_asset = create_stub_asset(
201+
session,
202+
size_bytes=size_bytes,
203+
mime_type=mime_type or asset.mime_type,
204+
)
205+
existing_ref.asset_id = new_asset.id
206+
else:
207+
asset.hash = None
208+
asset.size_bytes = size_bytes
209+
if mime_type:
210+
asset.mime_type = mime_type
211+
session.commit()
212+
return True
213+
214+
spec = {
215+
"abs_path": abs_path,
216+
"size_bytes": size_bytes,
217+
"mtime_ns": mtime_ns,
218+
"info_name": name,
219+
"tags": tags,
220+
"fname": os.path.basename(abs_path),
221+
"metadata": None,
222+
"hash": None,
223+
"mime_type": mime_type,
224+
"job_id": job_id,
225+
}
226+
if tags:
227+
ensure_tags_exist(session, tags)
228+
result = batch_insert_seed_assets(session, [spec], owner_id=owner_id)
229+
session.commit()
230+
return result.won_paths > 0
231+
232+
133233
def _register_existing_asset(
134234
asset_hash: str,
135235
name: str,

0 commit comments

Comments
 (0)