Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev
Submodule dev updated from 48be8a to 1a282a
160 changes: 143 additions & 17 deletions scidk/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,12 @@ def api_scan():
folders = []
files_skipped = 0
files_hashed = 0
# Cache variables (initialized for all providers)
use_cache = False
prev_scan_id = None
cache_hits = 0
cache_misses = 0

if provider_id in ('local_fs', 'mounted_fs'):
# Local/Mounted: enumerate filesystem and ingest into SQLite index
base = Path(path)
Expand All @@ -991,25 +997,98 @@ def api_scan():
fs.last_scan_source = 'python'
except Exception:
fs.last_scan_source = 'python'

# Cache-aware traversal optimization: check for previous scan and reuse if unchanged
use_cache = os.environ.get('SCIDK_CACHE_SCAN', '1').strip() in ('1', 'true', 'yes', 'on')

if use_cache:
try:
prev_scan_id = pix.get_previous_scan_for_path(str(base))
except Exception:
prev_scan_id = None

def _dir_unchanged(dir_path: Path, prev_sid: Optional[str]) -> bool:
"""Check if directory listing hasn't changed since previous scan."""
if not prev_sid:
return False
try:
cached_children = pix.get_cached_directory(prev_sid, str(dir_path))
if cached_children is None:
return False
# Check if current directory listing matches cache
current_children = set()
for child in dir_path.iterdir():
current_children.add(child.name)
return set(cached_children) == current_children
except Exception:
return False

try:
if recursive:
for p in base.rglob('*'):
try:
if p.is_dir():
items_dirs.add(p)
else:
items_files.append(p)
# ensure parent chain exists in dirs set
parent = p.parent
while parent and parent != parent.parent and str(parent).startswith(str(base)):
items_dirs.add(parent)
if parent == base:
break
parent = parent.parent
except Exception:
# For recursive scans, use cache-aware traversal
dirs_to_scan = [base]
visited = set()

while dirs_to_scan:
current_dir = dirs_to_scan.pop(0)
if str(current_dir) in visited:
continue
# include base itself as a folder
items_dirs.add(base)
visited.add(str(current_dir))
items_dirs.add(current_dir)

# Check if we can use cached data for this directory
if use_cache and prev_scan_id and _dir_unchanged(current_dir, prev_scan_id):
# Use cached scan_items for this directory subtree
cache_hits += 1
try:
cached_children = pix.get_cached_directory(prev_scan_id, str(current_dir))
if cached_children:
for child_name in cached_children:
child_path = current_dir / child_name
if child_path.exists():
if child_path.is_dir():
dirs_to_scan.append(child_path)
else:
items_files.append(child_path)
# ensure parent chain exists
parent = current_dir.parent
while parent and parent != parent.parent and str(parent).startswith(str(base)):
items_dirs.add(parent)
if parent == base:
break
parent = parent.parent
except Exception:
cache_misses += 1
# Fallback to filesystem scan for this directory
for child in current_dir.iterdir():
try:
if child.is_dir():
dirs_to_scan.append(child)
else:
items_files.append(child)
except Exception:
continue
else:
# Filesystem scan for this directory
cache_misses += 1
try:
for child in current_dir.iterdir():
try:
if child.is_dir():
dirs_to_scan.append(child)
else:
items_files.append(child)
# ensure parent chain exists
parent = child.parent
while parent and parent != parent.parent and str(parent).startswith(str(base)):
items_dirs.add(parent)
if parent == base:
break
parent = parent.parent
except Exception:
continue
except Exception:
continue
else:
for p in base.iterdir():
try:
Expand Down Expand Up @@ -1074,6 +1153,31 @@ def _row_from_local(pth: Path, typ: str) -> tuple:
for fpath in items_files:
rows.append(_row_from_local(fpath, 'file'))
ingested = pix.batch_insert_files(rows)

# Populate scan_items and directory_cache for selective scanning optimization
try:
scan_item_rows = []
dir_cache_map = {} # path -> list of children names
for row in rows:
# row format: (path, parent, name, depth, type, size, mtime, ext, mime, etag, hash, remote, scan_id, extra)
full_path, parent, name, depth, typ, size, mtime, ext, mime, etag, ahash, remote, _, extra = row
# Build scan_items row: (path, type, size, modified_time, file_extension, mime_type, etag, hash, extra_json)
scan_item_rows.append((full_path, typ, size, mtime, ext, mime, etag, ahash, extra))
# Build directory cache: track children per parent directory
if parent:
dir_cache_map.setdefault(parent, []).append(name)

# Insert scan_items
if scan_item_rows:
pix.record_scan_items(scan_id, scan_item_rows)

# Insert directory_cache for each directory
for dir_path, children_names in dir_cache_map.items():
pix.cache_directory_listing(scan_id, dir_path, children_names)
except Exception as cache_err:
# Non-fatal: log but continue
app.extensions['scidk'].setdefault('telemetry', {})['last_cache_error'] = str(cache_err)

# Also create in-memory datasets (keep legacy behavior)
count = 0
for fpath in items_files:
Expand Down Expand Up @@ -1354,6 +1458,12 @@ def _add_folder(full_path: str, name: str, parent: str):
'source': app.extensions['scidk'].get('interpreters', {}).get('source', 'default'),
}
},
'cache_stats': {
'enabled': use_cache,
'prev_scan_id': prev_scan_id,
'cache_hits': cache_hits,
'cache_misses': cache_misses,
},
}
scans = app.extensions['scidk'].setdefault('scans', {})
scans[scan_id] = scan
Expand Down Expand Up @@ -1400,7 +1510,23 @@ def _add_folder(full_path: str, name: str, parent: str):
'root_label': root_label,
})
drec.setdefault('scan_ids', []).append(scan_id)
return jsonify({"status": "ok", "scan_id": scan_id, "scanned": count, "folder_count": len(folders), "ingested_rows": int(ingested), "duration_sec": duration, "path": str(path), "recursive": bool(recursive), "provider_id": provider_id}), 200
return jsonify({
"status": "ok",
"scan_id": scan_id,
"scanned": count,
"folder_count": len(folders),
"ingested_rows": int(ingested),
"duration_sec": duration,
"path": str(path),
"recursive": bool(recursive),
"provider_id": provider_id,
"cache_stats": {
'enabled': use_cache,
'prev_scan_id': prev_scan_id,
'cache_hits': cache_hits,
'cache_misses': cache_misses,
}
}), 200
except Exception as e:
return jsonify({"status": "error", "error": str(e)}), 400

Expand Down
147 changes: 147 additions & 0 deletions scidk/core/path_index_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,150 @@ def apply_basic_change_history(scan_id: str, target_root: str) -> dict:
return {"created": int(created), "modified": int(modified), "deleted": int(deleted)}
finally:
conn.close()


def record_scan_items(scan_id: str, rows: Iterable[Tuple], batch_size: int = 10000) -> int:
"""
Record scan items into scan_items table for caching.
Rows: (path, type, size, modified_time, file_extension, mime_type, etag, hash, extra_json)
Returns total inserted.
"""
from .migrations import migrate
conn = connect()
migrate(conn)
total = 0
try:
cur = conn.cursor()
buf: List[Tuple] = []
for r in rows:
# Expand row to match scan_items schema
buf.append((scan_id,) + r)
if len(buf) >= batch_size:
cur.executemany(
"""INSERT INTO scan_items(scan_id, path, type, size, modified_time, file_extension, mime_type, etag, hash, extra_json)
VALUES (?,?,?,?,?,?,?,?,?,?)""",
buf,
)
conn.commit()
total += len(buf)
buf.clear()
if buf:
cur.executemany(
"""INSERT INTO scan_items(scan_id, path, type, size, modified_time, file_extension, mime_type, etag, hash, extra_json)
VALUES (?,?,?,?,?,?,?,?,?,?)""",
buf,
)
conn.commit()
total += len(buf)
return total
finally:
conn.close()


def cache_directory_listing(scan_id: str, dir_path: str, children: List[str]) -> None:
"""
Cache directory listing in directory_cache table.
children: list of child file/folder names (not full paths)
"""
import json
import time
from .migrations import migrate
conn = connect()
migrate(conn)
try:
children_json = json.dumps(children)
created = time.time()
conn.execute(
"""INSERT OR REPLACE INTO directory_cache(scan_id, path, children_json, created)
VALUES (?,?,?,?)""",
(scan_id, dir_path, children_json, created)
)
conn.commit()
finally:
conn.close()


def get_cached_directory(scan_id: str, dir_path: str) -> Optional[List[str]]:
"""
Retrieve cached directory listing from directory_cache.
Returns list of child names or None if not cached.
"""
import json
from .migrations import migrate
conn = connect()
migrate(conn)
try:
cur = conn.cursor()
cur.execute(
"SELECT children_json FROM directory_cache WHERE scan_id=? AND path=?",
(scan_id, dir_path)
)
row = cur.fetchone()
if not row:
return None
try:
return json.loads(row[0] or "[]")
except Exception:
return None
finally:
conn.close()


def get_previous_scan_for_path(path: str) -> Optional[str]:
"""
Find the most recent scan_id that includes this path.
Returns scan_id or None.
"""
conn = connect()
init_db(conn)
try:
cur = conn.cursor()
# Try scan_items first (more structured)
cur.execute(
"SELECT scan_id FROM scan_items WHERE path=? ORDER BY rowid DESC LIMIT 1",
(path,)
)
row = cur.fetchone()
if row:
return row[0]
# Fallback to files table
cur.execute(
"SELECT scan_id FROM files WHERE path=? ORDER BY rowid DESC LIMIT 1",
(path,)
)
row = cur.fetchone()
return row[0] if row else None
finally:
conn.close()


def get_scan_item(scan_id: str, path: str) -> Optional[Dict]:
"""
Retrieve scan item metadata from scan_items table.
Returns dict with path, type, size, modified_time, hash, etc. or None.
"""
from .migrations import migrate
conn = connect()
migrate(conn)
try:
cur = conn.cursor()
cur.execute(
"""SELECT path, type, size, modified_time, file_extension, mime_type, etag, hash
FROM scan_items WHERE scan_id=? AND path=?""",
(scan_id, path)
)
row = cur.fetchone()
if not row:
return None
return {
'path': row[0],
'type': row[1],
'size': row[2],
'modified_time': row[3],
'file_extension': row[4],
'mime_type': row[5],
'etag': row[6],
'hash': row[7],
}
finally:
conn.close()
Loading