Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/app/service_credential/models/personal.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ class Github(models.Model):
credential = fields.OneToOneField("models.Credential", related_name="github", on_delete=fields.CASCADE)
id = fields.UUIDField(primary_key=True, default=uuid.uuid4)
username = fields.CharField(max_length=50, null=False)
token = fields.CharField(max_length=200, null=False)
token = EncryptedTextField(max_length=200, null=False)
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max_length parameter on an EncryptedTextField may not behave as expected since encrypted data will be larger than the original plaintext. Encrypted text typically expands beyond the original length due to encryption overhead. Consider removing max_length or ensuring the field can accommodate the encrypted size.

Suggested change
token = EncryptedTextField(max_length=200, null=False)
token = EncryptedTextField(null=False)

Copilot uses AI. Check for mistakes.
16 changes: 10 additions & 6 deletions backend/app/service_job/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ async def create_job(self, project_id: UUID, job: JobSerializerIn, request: Requ
project = await Project.get_or_none(id=project_id)
params["bucket_name"] = project.bucket_name
await project.fetch_related("storage")

# Validate that storage credentials are configured (don't generate temp creds here)
storage_manager = await get_storage_manager(project.storage)
temp_secrets = storage_manager.get_temp_secret(session_name=user.email, duration=job["time"])
if not temp_secrets:
return bad_request({"message": "The ARN failed to create"})
if not storage_manager.is_valid():
return bad_request({"message": "Storage credentials are not valid"})
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message 'Storage credentials are not valid' is vague and doesn't help users understand what action to take. Consider providing a more specific message, such as 'Storage credentials are missing or expired. Please reconfigure storage settings.'

Suggested change
return bad_request({"message": "Storage credentials are not valid"})
return bad_request({"message": "Storage credentials are missing, invalid, or expired. Please review and reconfigure the project's storage settings."})

Copilot uses AI. Check for mistakes.

if analysis.allow_access:
job = await WebJob.create(**job)
Expand All @@ -74,9 +75,12 @@ async def create_job(self, project_id: UUID, job: JobSerializerIn, request: Requ
job_out = job_out.model_dump()
user = await request.identity.get_user()
job_out["created_by"] = user.email
# Submit to monitor job
params.update(temp_secrets)
submit_job.apply_async((job.id, analysis.allow_access, params))

# SECURITY: Pass only storage_id and user_email, not credentials
# Worker will regenerate credentials just-in-time to avoid Redis exposure
submit_job.apply_async(
(str(job.id), analysis.allow_access, params, str(project.storage.pk), user.email, job.time)
)
return created(job_out)

@auth("viewer")
Expand Down
2 changes: 1 addition & 1 deletion backend/app/service_job/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class JobStatus(str, Enum):
PENDING = "PENDING"
FAILED = "FAILED"
COMPLETED = "COMPLETED"
SERVER_ERROR = "SERVER_ERROR"
HPC_DISCONNECTED = "HPC_DISCONNECTED"
CANCELLED = "CANCELLED"
CANCELLING = "CANCELLED+"
TIMEOUT = "TIMEOUT"
Expand Down
51 changes: 48 additions & 3 deletions backend/app/service_job/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,24 @@


@shared_task
def submit_job(job_id: str, is_web_job: bool, params: dict):
def submit_job(job_id: str, is_web_job: bool, params: dict, storage_id: str, user_email: str, duration: int):
"""
Submit job to compute executor.

SECURITY: Regenerates AWS credentials just-in-time in the worker to avoid
passing credentials through Redis/Celery broker.

Args:
job_id: Job UUID
is_web_job: Whether this is a web job
params: Job parameters (without credentials)
storage_id: Storage credential ID to fetch from database
user_email: User email for STS session naming
duration: Duration in seconds for temporary credentials
"""
from app.utils.executor.manager import get_compute_executor
from app.utils.storage.manager import get_storage_manager
from app.service_credential.models.base import Credential
import logging

logger = logging.getLogger(__name__)
Expand All @@ -23,13 +39,36 @@ async def _handle():
return

try:
# SECURITY: Fetch storage credential from DB and generate temp credentials
# This happens in the worker, so credentials never pass through Redis
storage_credential = await Credential.get_or_none(id=storage_id)
if not storage_credential:
logger.error(f"Storage credential {storage_id} not found for job {job_id}")
job.status = JobStatus.FAILED
await job.save()
return

storage_manager = await get_storage_manager(storage_credential)
temp_secrets = storage_manager.get_temp_secret(session_name=user_email, duration=duration)

if not temp_secrets:
logger.error(f"Failed to generate temporary credentials for job {job_id}")
job.status = JobStatus.FAILED
await job.save()
return

# Add temp credentials to params
params.update(temp_secrets)
logger.info(f"Generated temporary credentials for job {job_id} (duration: {duration}s)")

# Submit job with credentials
await job.fetch_related("compute")
executor = await get_compute_executor(job.compute)
result = executor.submit(job, params)

job.external_id = result.get("job_id")
if job.external_id is None:
job.status = JobStatus.SERVER_ERROR
job.status = JobStatus.HPC_DISCONNECTED
else:
job.status = JobStatus.SUBMITTED
await job.save()
Expand Down Expand Up @@ -77,13 +116,19 @@ async def _handle():
JobStatus.CANCELLED,
JobStatus.CANCELLING,
JobStatus.TIMEOUT,
JobStatus.SERVER_ERROR,
]:
logger.info(f"Rescheduling monitoring for job {job.id}, current status: {job.status}")
monitor_job.apply_async(args=[job.id, is_web_job], countdown=5)
else:
logger.info(f"Monitoring finished for job {job.id}, status: {job.status}")

# SECURITY: Cleanup params.json from HPC in case job failed before it could delete it
try:
await executor.cleanup_credentials(job)
logger.info(f"Cleaned up credential files for job {job.id}")
except Exception as e:
logger.warning(f"Failed to cleanup credentials for job {job.id}: {e}")

except Exception:
logger.exception(f"Monitoring failed for WebJob {job_id}")
job.status = JobStatus.FAILED
Expand Down
28 changes: 20 additions & 8 deletions backend/app/utils/executor/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,20 @@ def _create_tunnel(job: WebJob):

def _delete_tunnel(job: WebJob):
try:
killed = False
for proc in psutil.process_iter(["pid", "name", "connections"]):
for conn in proc.info.get("connections", []):
if conn.laddr and conn.laddr.port == job.local_port:
logger.info(f"[INFO] Killing process {proc.pid} on port {job.local_port}")
proc.kill()
try:
for conn in proc.info.get("connections", []):
if conn.laddr and conn.laddr.port == job.local_port:
logger.info(f"[INFO] Killing process {proc.pid} on port {job.local_port}")
proc.kill()
killed = True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if not killed:
logger.info(f"[INFO] No process found on port {job.local_port}")
except Exception as e:
logger.error(f"[ERROR] Failed to kill tunnel: {e}")
logger.error(f"Failed to kill tunnel: {e}")

def _create_traefik_config(job: WebJob):
config_path = os.path.join(TRAEFIK_DYNAMIC_FOLDER, f"{job.id}.yaml")
Expand Down Expand Up @@ -306,9 +313,11 @@ def _create_traefik_config(job: WebJob):
# Monitor job
STAGING_STATUS = [
JobStatus.SUBMITTED,
JobStatus.SUBMITTING,
JobStatus.PENDING,
JobStatus.RUNNING,
JobStatus.CANCELLING,
JobStatus.HPC_DISCONNECTED,
]
if job.status in STAGING_STATUS:
update_info = {}
Expand All @@ -317,9 +326,12 @@ def _create_traefik_config(job: WebJob):
out, error, exit_code = self._run_cmd(
f"squeue --job {job.external_id} --format='%.18i %.9T %.10M %.20V %R' --noheader | head -n1"
)
logger.info(f"squeue output: {out.strip() if out else ''}, error: {error.strip()}, exit_code: {exit_code}")

if exit_code == 0 and out.strip():
logger.info(
f"squeue output: {out.strip() if out else ''}, error: {error.strip() if error else ''}, exit_code: {exit_code}"
)
if exit_code is None:
update_info["status"] = JobStatus.HPC_DISCONNECTED
elif exit_code == 0 and out.strip():
_get_update_info(update_info, out, job, method="squeue")
else:
# Fallback to sacct (for completed, failed, or recently finished jobs)
Expand Down
49 changes: 49 additions & 0 deletions backend/migrations/models/5_20260105205918_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from tortoise import BaseDBAsyncClient


async def upgrade(db: BaseDBAsyncClient) -> str:
return """
COMMENT ON COLUMN "batchjob"."status" IS 'SUBMITTING: SUBMITTING
SUBMITTED: SUBMITTED
RUNNING: RUNNING
PENDING: PENDING
FAILED: FAILED
COMPLETED: COMPLETED
HPC_DISCONNECTED: HPC_DISCONNECTED
CANCELLED: CANCELLED
CANCELLING: CANCELLED+
TIMEOUT: TIMEOUT';
COMMENT ON COLUMN "webjob"."status" IS 'SUBMITTING: SUBMITTING
SUBMITTED: SUBMITTED
RUNNING: RUNNING
PENDING: PENDING
FAILED: FAILED
COMPLETED: COMPLETED
HPC_DISCONNECTED: HPC_DISCONNECTED
CANCELLED: CANCELLED
CANCELLING: CANCELLED+
TIMEOUT: TIMEOUT';"""


async def downgrade(db: BaseDBAsyncClient) -> str:
return """
COMMENT ON COLUMN "webjob"."status" IS 'SUBMITTING: SUBMITTING
SUBMITTED: SUBMITTED
RUNNING: RUNNING
PENDING: PENDING
FAILED: FAILED
COMPLETED: COMPLETED
SERVER_ERROR: SERVER_ERROR
CANCELLED: CANCELLED
CANCELLING: CANCELLED+
TIMEOUT: TIMEOUT';
COMMENT ON COLUMN "batchjob"."status" IS 'SUBMITTING: SUBMITTING
SUBMITTED: SUBMITTED
RUNNING: RUNNING
PENDING: PENDING
FAILED: FAILED
COMPLETED: COMPLETED
SERVER_ERROR: SERVER_ERROR
CANCELLED: CANCELLED
CANCELLING: CANCELLED+
TIMEOUT: TIMEOUT';"""
11 changes: 11 additions & 0 deletions backend/migrations/models/6_20260105224743_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from tortoise import BaseDBAsyncClient


async def upgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "github" ALTER COLUMN "token" TYPE TEXT USING "token"::TEXT;"""


async def downgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "github" ALTER COLUMN "token" TYPE VARCHAR(200) USING "token"::VARCHAR(200);"""