Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions agentex/src/api/schemas/authorization_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class AgentexResourceType(StrEnum):
agent = "agent"
task = "task"
api_key = "api_key"
schedule = "schedule"


# Resources that inherit permissions from their parent task
Expand Down Expand Up @@ -42,6 +43,10 @@ def task(cls, selector: str) -> "AgentexResource":
def api_key(cls, selector: str) -> "AgentexResource":
return cls(type=AgentexResourceType.api_key, selector=selector)

@classmethod
def schedule(cls, selector: str) -> "AgentexResource":
return cls(type=AgentexResourceType.schedule, selector=selector)


class AgentexResourceOptionalSelector(BaseModel):
type: AgentexResourceType
Expand All @@ -58,3 +63,9 @@ def task(cls, selector: str | None = None) -> "AgentexResourceOptionalSelector":
@classmethod
def api_key(cls, selector: str | None = None) -> "AgentexResourceOptionalSelector":
return cls(type=AgentexResourceType.api_key, selector=selector)

@classmethod
def schedule(
cls, selector: str | None = None
) -> "AgentexResourceOptionalSelector":
return cls(type=AgentexResourceType.schedule, selector=selector)
113 changes: 101 additions & 12 deletions agentex/src/domain/services/schedule_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from temporalio.client import ScheduleActionStartWorkflow, ScheduleDescription

from src.adapters.temporal.adapter_temporal import DTemporalAdapter
from src.api.schemas.authorization_types import AgentexResource
from src.api.schemas.schedules import (
CreateScheduleRequest,
ScheduleActionInfo,
Expand All @@ -15,6 +16,7 @@
ScheduleState,
)
from src.domain.entities.agents import AgentEntity
from src.domain.services.authorization_service import DAuthorizationService
from src.utils.logging import make_logger

logger = make_logger(__name__)
Expand Down Expand Up @@ -44,8 +46,10 @@ class ScheduleService:
def __init__(
self,
temporal_adapter: DTemporalAdapter,
authorization_service: DAuthorizationService,
):
self.temporal_adapter = temporal_adapter
self.authorization_service = authorization_service

async def create_schedule(
self,
Expand Down Expand Up @@ -80,23 +84,105 @@ async def create_schedule(
else None
)

await self.temporal_adapter.create_schedule(
schedule_id=schedule_id,
workflow=request.workflow_name,
workflow_id=workflow_id_prefix,
args=args,
task_queue=request.task_queue,
cron_expressions=cron_expressions,
interval_seconds=request.interval_seconds,
execution_timeout=execution_timeout,
start_at=request.start_at,
end_at=request.end_at,
paused=request.paused,
# Schedules have no Postgres row — Temporal is the store and the auth
# selector is the schedule id ({agent_id}--{schedule_name}). Register
# the auth tuple (with the parent_agent edge) BEFORE the Temporal write
# (fail-closed), and compensate if the Temporal create fails so we never
# leave an orphan tuple. The read-back below is intentionally OUTSIDE
# the compensation scope: a describe failure must not deregister a
# schedule that was actually created.
registered = await self._register_schedule_in_auth(
schedule_id=schedule_id, agent_id=agent.id
)
try:
await self.temporal_adapter.create_schedule(
schedule_id=schedule_id,
workflow=request.workflow_name,
workflow_id=workflow_id_prefix,
args=args,
task_queue=request.task_queue,
cron_expressions=cron_expressions,
interval_seconds=request.interval_seconds,
execution_timeout=execution_timeout,
start_at=request.start_at,
end_at=request.end_at,
paused=request.paused,
)
except Exception:
# Orphan-tuple guard: the tuple was written but the schedule never
# landed in Temporal. Best-effort compensating deregister, then
# re-raise the original error.
if registered:
await self._deregister_schedule_from_auth(schedule_id=schedule_id)
raise

# Fetch and return the created schedule
return await self.get_schedule(agent.id, request.name)

async def _register_schedule_in_auth(
self, *, schedule_id: str, agent_id: str
) -> bool:
"""Register a new agent_schedule with the auth service, including the
parent_agent edge so permissions cascade from the owning agent.

Called BEFORE the Temporal write — a failure raises and prevents the
schedule from being created. Skipped with a warning when no usable
creator identity is available on the principal context (e.g.
agent-bypass / internal paths without an authenticated user); this is
the interim behavior until on-behalf-of-user identity is threaded.

Returns True when a tuple was actually registered (so the caller knows
whether a compensating deregister is warranted), False when skipped.
"""
principal_context = self.authorization_service.principal_context
user_id = getattr(principal_context, "user_id", None)
service_account_id = getattr(principal_context, "service_account_id", None)
if user_id is None and service_account_id is None:
logger.warning(
"Skipping auth registration for schedule: no creator resolvable",
extra={"schedule_id": schedule_id, "agent_id": agent_id},
)
return False
try:
await self.authorization_service.register_resource(
resource=AgentexResource.schedule(schedule_id),
parent=AgentexResource.agent(agent_id),
)
except Exception as exc:
# Fail closed: log + re-raise so the schedule is never created.
logger.exception(
"Auth register_resource failed for agent_schedule; aborting create",
extra={
"schedule_id": schedule_id,
"agent_id": agent_id,
"error_type": type(exc).__name__,
},
)
raise
return True

async def _deregister_schedule_from_auth(self, *, schedule_id: str) -> None:
"""Best-effort deregistration of a schedule's auth tuples.

``deregister_resource`` removes the resource and all of its
relationships (owner, parent, grantees) atomically. Used both on delete
and as the compensating action when a Temporal create fails. Failures
are logged but never propagate.
"""
try:
await self.authorization_service.deregister_resource(
resource=AgentexResource.schedule(schedule_id),
)
except Exception as exc:
logger.warning(
"Auth deregister failed for agent_schedule; tuple may be orphaned",
extra={
"schedule_id": schedule_id,
"error_type": type(exc).__name__,
},
exc_info=True,
)

async def get_schedule(self, agent_id: str, schedule_name: str) -> ScheduleResponse:
"""
Get details of a schedule.
Expand Down Expand Up @@ -237,6 +323,9 @@ async def delete_schedule(self, agent_id: str, schedule_name: str) -> None:
"""
schedule_id = build_schedule_id(agent_id, schedule_name)
await self.temporal_adapter.delete_schedule(schedule_id)
# Best-effort: drop the auth tuple after the Temporal delete. A failure
# here is logged but never blocks the delete.
await self._deregister_schedule_from_auth(schedule_id=schedule_id)

def _description_to_response(
self, schedule_id: str, description: ScheduleDescription
Expand Down
Loading
Loading