Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
},
"mounts": [
"source=${localEnv:HOME}/.claude,target=/home/vscode/.claude,type=bind",
"source=${localEnv:HOME}/.gitconfig,target=/home/vscode/.gitconfig,type=bind,readonly",
"source=fileglancer-pixi,target=${containerWorkspaceFolder}/.pixi,type=volume"
],
"remoteEnv": {
Expand Down
124 changes: 124 additions & 0 deletions fileglancer/alembic/versions/c4e8a7d92b15_add_user_apps_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""add user_apps table

Revision ID: c4e8a7d92b15
Revises: 20b763c28c4f
Create Date: 2026-05-24 00:00:00.000000

"""
from datetime import datetime, UTC

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'c4e8a7d92b15'
down_revision = '20b763c28c4f'
branch_labels = None
depends_on = None


def _parse_iso(value):
"""Parse an ISO 8601 timestamp string into a naive UTC datetime.

Returns None if value is falsy or cannot be parsed.
"""
if not value:
return None
if isinstance(value, datetime):
return value
try:
dt = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
except ValueError:
return None
if dt.tzinfo is not None:
dt = dt.astimezone(UTC).replace(tzinfo=None)
return dt


def upgrade() -> None:
op.create_table(
'user_apps',
sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True),
sa.Column('username', sa.String(), nullable=False),
sa.Column('url', sa.String(), nullable=False),
sa.Column('manifest_path', sa.String(), nullable=False, server_default=''),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.String(), nullable=True),
sa.Column('branch', sa.String(), nullable=True),
sa.Column('manifest', sa.JSON(), nullable=True),
sa.Column('added_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.UniqueConstraint('username', 'url', 'manifest_path', name='uq_user_app'),
)
op.create_index('ix_user_apps_username', 'user_apps', ['username'])

# Data migration: move user_preferences['apps'] into user_apps rows.
user_preferences = sa.table(
'user_preferences',
sa.column('id', sa.Integer),
sa.column('username', sa.String),
sa.column('key', sa.String),
sa.column('value', sa.JSON),
)
user_apps = sa.table(
'user_apps',
sa.column('username', sa.String),
sa.column('url', sa.String),
sa.column('manifest_path', sa.String),
sa.column('name', sa.String),
sa.column('description', sa.String),
sa.column('branch', sa.String),
sa.column('manifest', sa.JSON),
sa.column('added_at', sa.DateTime),
sa.column('updated_at', sa.DateTime),
)

conn = op.get_bind()
rows = conn.execute(
sa.select(
user_preferences.c.id,
user_preferences.c.username,
user_preferences.c.value,
).where(user_preferences.c.key == 'apps')
).fetchall()

now = datetime.now(UTC).replace(tzinfo=None)
seen: set[tuple[str, str, str]] = set()
inserts = []
for _pref_id, username, value in rows:
app_list = (value or {}).get('apps', []) if isinstance(value, dict) else []
for entry in app_list:
if not isinstance(entry, dict):
continue
url = entry.get('url')
if not url:
continue
manifest_path = entry.get('manifest_path') or ''
key = (username, url, manifest_path)
if key in seen:
continue
seen.add(key)
inserts.append({
'username': username,
'url': url,
'manifest_path': manifest_path,
'name': entry.get('name') or 'Unknown',
'description': entry.get('description'),
'branch': None,
'manifest': None,
'added_at': _parse_iso(entry.get('added_at')) or now,
'updated_at': _parse_iso(entry.get('updated_at')),
})

if inserts:
conn.execute(user_apps.insert(), inserts)

conn.execute(
user_preferences.delete().where(user_preferences.c.key == 'apps')
)


def downgrade() -> None:
op.drop_index('ix_user_apps_username', table_name='user_apps')
op.drop_table('user_apps')
2 changes: 2 additions & 0 deletions fileglancer/apps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
discover_app_manifests,
fetch_app_manifest,
get_app_branch,
get_or_load_manifest,
refresh_cached_manifest,
get_job_file_content,
get_job_file_paths,
get_service_url,
Expand Down
112 changes: 104 additions & 8 deletions fileglancer/apps/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,86 @@ async def fetch_app_manifest(url: str, manifest_path: str = "",
return _read_manifest_file(target_dir)


async def get_or_load_manifest(username: str, url: str,
manifest_path: str = "") -> AppManifest:
"""Return the manifest for an app, preferring the DB cache.

Hot path: a single SELECT plus model_validate — no disk I/O,
no worker dispatch.

If the cached manifest is missing (NULL) or fails validation
(schema drift), falls back to reading from disk via
fetch_app_manifest and writes the fresh value back to the row.

If no row exists for (username, url, manifest_path), reads from
disk and returns the manifest without creating a row (preview
semantics for not-yet-installed apps).
"""
from pydantic import ValidationError

settings = get_settings()

with db.get_db_session(settings.db_url) as session:
row = db.get_user_app(session, username, url, manifest_path)
stored = row.manifest if row else None
row_exists = row is not None

if stored is not None:
try:
return AppManifest(**stored)
except ValidationError as e:
logger.warning(f"Stored manifest schema mismatch for {url}: {e}")

manifest = await fetch_app_manifest(url, manifest_path, username=username)

if row_exists:
branch = await get_app_branch(url)
with db.get_db_session(settings.db_url) as session:
db.upsert_user_app(
session, username,
url=url, manifest_path=manifest_path,
name=manifest.name, description=manifest.description,
branch=branch,
manifest=manifest.model_dump(mode="json"),
bump_updated_at=False,
)

return manifest


async def refresh_cached_manifest(username: str, url: str,
manifest_path: str = "",
bump_updated_at: bool = False
) -> tuple[AppManifest, str]:
"""Re-read the manifest from disk and sync the cache.

Call this after any operation that mutates the on-disk YAML
(clone or git pull) so the DB cache stays in lockstep with disk.

No-op on the DB if (username, url, manifest_path) has no row —
callers that need to insert a new row should use upsert_user_app
directly.

Returns (manifest, branch).
"""
manifest = await fetch_app_manifest(url, manifest_path, username=username)
branch = await get_app_branch(url)

settings = get_settings()
with db.get_db_session(settings.db_url) as session:
if db.get_user_app(session, username, url, manifest_path) is not None:
db.upsert_user_app(
session, username,
url=url, manifest_path=manifest_path,
name=manifest.name, description=manifest.description,
branch=branch,
manifest=manifest.model_dump(mode="json"),
bump_updated_at=bump_updated_at,
)

return manifest, branch


async def get_app_branch(url: str) -> str:
"""Return the branch name for a GitHub app URL.

Expand Down Expand Up @@ -688,8 +768,11 @@ def build_command(entry_point: AppEntryPoint, parameters: dict, session=None) ->
# (bjobs, bsub, bkill) due to HPC root-squash policy. All LSF
# operations go through the persistent per-user worker pool.
#
# The poll loop picks any user with active jobs and dispatches ``bjobs
# -u all`` through that user's worker to get statuses for ALL users' jobs.
# The poll loop picks any user with active jobs and dispatches a ``poll``
# action through that user's worker, passing the explicit list of
# cluster_job_ids to query. py-cluster-api's executor then runs ``bjobs``
# for just those IDs. LSF normally allows querying jobs by ID across
# users, so one worker's call returns statuses for all users' jobs.

_poll_task = None
_POLL_LOCK_PATH = os.path.join(tempfile.gettempdir(), "fileglancer_poll.lock")
Expand Down Expand Up @@ -767,8 +850,10 @@ def _get_any_active_username(settings) -> str | None:
async def _reconnect_as_any_user(settings):
"""Reconnect to existing cluster jobs via the persistent worker.

Picks any user with active jobs to run bjobs as. If no active jobs
exist, reconnection is skipped (nothing to reconnect to).
Picks any user with active jobs in the DB and dispatches a ``reconnect``
action through their worker; py-cluster-api re-attaches to the jobs it
finds. If no active jobs exist in the DB, reconnection is skipped
(nothing to reconnect to).
"""
username = _get_any_active_username(settings)
if not username:
Expand Down Expand Up @@ -881,7 +966,9 @@ async def _poll_jobs(settings):
if settings.cluster.executor == "local":
return _poll_local_jobs(session, jobs_to_poll)

# Pick any user to run bjobs as (bjobs -u all sees all users' jobs)
# Pick any user to run the poll through. py-cluster-api will query
# each cluster_job_id explicitly; LSF allows querying jobs by ID
# across users, so one worker's call covers everyone's jobs.
poll_username = jobs_to_poll[0].username
# Pass current known statuses so stubs are seeded correctly.
# Without this, stubs default to PENDING and jobs whose status
Expand Down Expand Up @@ -1102,8 +1189,8 @@ async def submit_job(
"""
settings = get_settings()

# Fetch and validate manifest (clones repo into user's cache)
manifest = await fetch_app_manifest(app_url, manifest_path, username=username)
# Read manifest from the cache when available; fall back to disk.
manifest = await get_or_load_manifest(username, app_url, manifest_path)

# Find entry point
entry_point = None
Expand Down Expand Up @@ -1183,14 +1270,23 @@ async def submit_job(
session.commit()

# Clone/pull repo into the user's cache (~username/.fileglancer/apps).
if manifest.repo_url:
if manifest.repo_url and manifest.repo_url != app_url:
cached_repo_dir = await _ensure_repo_cache(manifest.repo_url, pull=pull_latest,
username=username)
cd_suffix = "repo"
pulled_manifest_repo = False
else:
cached_repo_dir = await _ensure_repo_cache(app_url, pull=pull_latest,
username=username)
cd_suffix = f"repo/{manifest_path}" if manifest_path else "repo"
pulled_manifest_repo = pull_latest

# If the pull just changed the YAML on disk, sync the cache.
if pulled_manifest_repo:
try:
await refresh_cached_manifest(username, app_url, manifest_path)
except Exception as e:
logger.warning(f"Failed to refresh cached manifest after pull: {e}")

# Build environment variable export lines
env_lines = ""
Expand Down
Loading
Loading