Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,47 @@ The application will be available at `http://localhost:8000`

For the full documentation site, run `mkdocs serve` and visit http://localhost:8000

## Demo: full run lifecycle

End-to-end demo of a complete run lifecycle driven by MQTT telemetry from the simulator.

```bash
# Terminal 1 — start the full databus stack
cd databus && bash scripts/dev.sh

# Terminal 2 — load GTFS feed
docker compose -f compose.dev.yml exec orchestrator \
uv run python manage.py loaddata gtfs.json

# Terminal 3 — start the simulator (wired to databus broker)
# The simulator's scheduler posts to /api/create-run on each schedule entry's
# start_time. The UI's Operator tab handles confirmation. No databus-side
# bootstrap command is required.
cd ../simulator && docker compose up simulator web

# Terminal 4 — observe (optional)
open http://localhost:8080 # live map
watch ls backend/feed/files/ # GTFS-RT outputs (refresh every 15 s)
```

Within ~30 s of starting the simulator:

- Every run advances `CONFIRMED → TRACKING → IN_PROGRESS`
- `backend/feed/files/vehicle_positions.pb` contains one `FeedEntity` per active run
- `backend/feed/files/trip_updates.pb` contains stop-time predictions

Killing the simulator triggers `RUN_TRACKING_LOST` after 60 s and
`RUN_TRACKING_EXPIRED → CANCELLED` after 300 s.

Verify the protobuf output:

```python
from google.transit import gtfs_realtime_pb2
msg = gtfs_realtime_pb2.FeedMessage()
msg.ParseFromString(open("backend/feed/files/vehicle_positions.pb", "rb").read())
print(len(msg.entity)) # should equal the number of active runs
```

## 🛣️ Roadmap

Where is this going? Check SIMOVI's [roadmap](https://github.com/simovilab/context/blob/main/roadmap.md).
Expand Down
7 changes: 3 additions & 4 deletions backend/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Progression,
Occupancy,
)
from runs.domain.events import RunLifecycleEvents
from runs.domain import RunLifecycleEvents
from feed.models import *
from django.contrib.auth.models import User
from rest_framework import serializers
Expand Down Expand Up @@ -130,10 +130,9 @@ class CreateRunSerializer(serializers.Serializer):
)


class UpdateRunSerializer(serializers.Serializer):
run_id = serializers.CharField(max_length=100)
class RunUpdateSerializer(serializers.Serializer):
event = serializers.ChoiceField(choices=RunLifecycleEvents)
details = serializers.JSONField()
details = serializers.JSONField(required=False, default=dict)


class PositionSerializer(serializers.HyperlinkedModelSerializer):
Expand Down
14 changes: 13 additions & 1 deletion backend/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,19 @@
path("login/", views.LoginView.as_view(), name="login"),
# path("route-stops/", views.RouteStopView.as_view(), name="route_stops"),
path("create-run/", views.CreateRunViewSet.as_view(), name="create_run"),
path("update-run/", views.UpdateRunViewSet.as_view(), name="update_run"),
path(
"runs/<uuid:run_id>/state/", views.RunStateViewSet.as_view(), name="run_state"
),
path(
"runs/<uuid:run_id>/update/",
views.RunUpdateViewSet.as_view(),
name="run_update",
),
path(
"runs/<uuid:run_id>/history/",
views.RunHistoryView.as_view(),
name="run_history",
),
path("service-today/", views.ServiceTodayView.as_view(), name="service_today"),
path("which-shapes/", views.WhichShapesView.as_view(), name="which_shapes"),
path("find-trips/", views.FindTripsView.as_view(), name="find_trips"),
Expand Down
113 changes: 97 additions & 16 deletions backend/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from realtime_engine.tasks import run_lifecycle_event
from runs.services.exceptions import RunLifecycleError
from runs.services.lifecycle import RunLifecycleService
from runs.domain.events import RunLifecycleEvents
from runs.domain.states import RunLifecycleStates
from runs.domain import RunLifecycleEvents
from runs.domain import RunLifecycleStates
from operations.models import (
Vehicle,
Operator,
Expand All @@ -26,6 +26,7 @@
)
from runs.models import (
Run,
RunLifecycleTransition,
Position,
Progression,
Occupancy,
Expand Down Expand Up @@ -55,7 +56,7 @@
EquipmentLogSerializer,
OperatorSerializer,
CreateRunSerializer,
UpdateRunSerializer,
RunUpdateSerializer,
PositionSerializer,
ProgressionSerializer,
OccupancySerializer,
Expand Down Expand Up @@ -206,12 +207,14 @@ def post(self, request):
{"status": "error", "step": "operational_validation", "errors": errors},
status=status.HTTP_400_BAD_REQUEST,
)
# Registration of the run (event: RUN_REQUESTED, state: REQUESTED)
# Record creation puts the run in REQUESTED state (run_requested event)
try:
run = Run.objects.create(**payload)
run.vehicle.set([vehicle])
run.operator.set([operator_obj])
payload["run_id"] = run.id
payload["vehicle_id"] = vehicle_id
payload["operator_id"] = operator_id
except Exception as e:
return Response(
{
Expand All @@ -221,26 +224,23 @@ def post(self, request):
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# First transition: GTFS validation (run_lifecycle_state = REQUESTED)
# REQUESTED → VALIDATED: GTFS consistency check
try:
service.process_event(RunLifecycleEvents.RUN_REQUESTED, payload)
service.process_event(RunLifecycleEvents.VALIDATE_RUN, payload)
except RunLifecycleError as e:
payload["guards"] = e.errors.attempts.guards
payload["actions"] = e.errors.attempts.actions
service.process_event(RunLifecycleEvents.RUN_REJECTED, payload)
return Response(
{"status": "error", "step": "gtfs_validation", "errors": e.errors},
status=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
# System initialization (run_lifecycle_state = VALIDATED)
# VALIDATED → INITIALIZED: write system state
try:
service.process_event(RunLifecycleEvents.VALIDATE_RUN, payload)
service.process_event(RunLifecycleEvents.INITIALIZE_RUN, payload)
except RunLifecycleError as e:
return Response(
{"status": "error", "step": "initialization", "errors": e.errors},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# If successful, give a 200 OK response (run_lifecycle_state = INITIALIZED)
return Response(
{
"status": "success",
Expand All @@ -251,32 +251,76 @@ def post(self, request):
)


class UpdateRunViewSet(APIView):
class RunStateViewSet(APIView):
"""
Endpoint to get the current lifecycle state of a run.

It only allows the GET method with the run_id as path parameter.
"""

def get(self, request, run_id):
run = Run.objects.filter(id=run_id).first()
if not run:
return Response(
{"status": "error", "errors": {"run_id": "Run not found"}},
status=status.HTTP_404_NOT_FOUND,
)
return Response(
{"status": "success", "run_lifecycle_state": run.run_lifecycle_state},
status=status.HTTP_200_OK,
)


class RunUpdateViewSet(APIView):
"""
Endpoint to request an update of the lifecycle state of an existing run.

It only allows the POST method with the event to process.
"""

def post(self, request):
def post(self, request, run_id):
service = RunLifecycleService()
serializer = UpdateRunSerializer(data=request.data)
serializer = RunUpdateSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"status": "error", "errors": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
payload = dict(serializer.validated_data)
run_id = payload.get("run_id")
# Flatten `details` into payload so guards/actions can read fields like
# `actor_role` at the top level — matches the convention used by the
# internal Celery path (realtime_engine/tasks.py).
details = payload.pop("details", {}) or {}
if isinstance(details, dict):
payload.update(details)
payload["run_id"] = run_id
run = Run.objects.filter(id=run_id).first()
if not run:
return Response(
{"status": "error", "errors": {"run_id": "Run not found"}},
status=status.HTTP_404_NOT_FOUND,
)
# Ensure the effective event (after payload normalization) is valid.
event = payload.get("event")
event_value = (
event.value if isinstance(event, RunLifecycleEvents) else str(event)
)
allowed_events = {e.value for e in RunLifecycleEvents}
if event_value not in allowed_events:
return Response(
{
"status": "error",
"errors": {
"event": f"Invalid event '{event_value}'. Allowed values: {sorted(allowed_events)}"
},
},
status=status.HTTP_400_BAD_REQUEST,
)
payload["event"] = event_value
try:
new_run_lifecycle_state = service.process_event(event, payload)
new_run_lifecycle_state, _guards, _actions = service.process_event(
event_value, payload
)
except RunLifecycleError as e:
return Response(
{"status": "error", "errors": e.errors},
Expand All @@ -288,6 +332,43 @@ def post(self, request):
)


class RunHistoryView(APIView):
"""
Return the ordered list of FSM transitions for a run.

Read-only audit log derived from RunLifecycleTransition, which the
lifecycle service writes before any external side-effect (so the log
is authoritative even if a downstream action later fails).
"""

def get(self, request, run_id):
if not Run.objects.filter(id=run_id).exists():
return Response(
{"status": "error", "errors": {"detail": f"run {run_id} not found"}},
status=status.HTTP_404_NOT_FOUND,
)
transitions = RunLifecycleTransition.objects.filter(run_id=run_id).order_by(
"timestamp", "created_at"
)
return Response(
{
"run_id": str(run_id),
"transitions": [
{
"event": t.event_name,
"from_state": t.from_state,
"to_state": t.to_state,
"timestamp": t.timestamp.isoformat(),
"actions": t.actions or {},
"guards": t.guards or {},
}
for t in transitions
],
},
status=status.HTTP_200_OK,
)


class PositionViewSet(viewsets.ModelViewSet):
queryset = Position.objects.all()
serializer_class = PositionSerializer
Expand Down
10 changes: 10 additions & 0 deletions backend/databus/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
# Load task modules from all registered Django apps.
app.autodiscover_tasks()

# Register the MQTT consumer bootstep. The class is gated internally by
# MQTT_CONSUMER_ENABLED so only the realtime-engine worker actually starts it.
from realtime_engine.mqtt import MQTTConsumerStep # noqa: E402

app.steps["worker"].add(MQTTConsumerStep)


@app.task(bind=True, ignore_result=True)
def debug_task(self):
Expand All @@ -40,4 +46,8 @@ def debug_task(self):
"task": "schedule_engine.tasks.build_alerts",
"schedule": timedelta(seconds=10),
},
"scan-stale-runs-every-30s": {
"task": "realtime_engine.tasks.scan_stale_runs",
"schedule": timedelta(seconds=30),
},
}
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [
"gunicorn>=23.0.0",
"kombu>=5.6.2",
"mkdocs-material>=9.6.18",
"paho-mqtt>=2.1.0",
"pandas>=2.3.3",
"pillow>=11.3.0",
"prefect>=3.6.12",
Expand Down
61 changes: 61 additions & 0 deletions backend/realtime_engine/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# realtime_engine

Celery worker that processes lifecycle events and hosts the MQTT telemetry
consumer as an in-process bootstep.

## MQTT consumer: Celery bootstep approach

The MQTT consumer (`mqtt.py`) runs inside the realtime-engine worker as a
Celery `bootsteps.StartStopStep`. It starts when the worker boots and shuts
down when the worker shuts down. paho's `loop_start()` runs the network loop
in its own background thread, so it does not block Celery task execution.

The bootstep is registered globally in `databus/celery.py` but gated by the
`MQTT_CONSUMER_ENABLED` env var. Only the `realtime-engine` compose service
sets it to `true`; other workers (schedule-engine, beat) skip the bootstep so
the broker doesn't see duplicate subscribers.

**Why a bootstep instead of a separate service?**
The dedicated `realtime-consumer` service was an early choice that traded
operational simplicity for separation-of-concerns at a scale the demo doesn't
need. paho's network loop already runs in its own thread, so colocating it
with the Celery worker doesn't starve task execution. The bootstep avoids an
extra container, Dockerfile target, and bind-mount on the backend tree.

## Topic subscriptions

```
transit/vehicle/+/position QoS 0
transit/vehicle/+/progression QoS 0
transit/vehicle/+/occupancy QoS 0
```

The `data` leaf (static metadata) is not subscribed here — vehicle metadata is
written to Redis by `RunLifecycleActions.update_system_state` when a run is
initialized.

## Lifecycle events fired by the consumer

| Run state | Trigger condition | Event fired |
|--------------|-----------------------------------------------------------|-------------------------|
| `Confirmed` | Any valid ping received | `run_tracking_started` |
| `Tracking` | `position.speed > 0.5` m/s | `run_started` |
| `No Signal` | Any valid ping received | `run_tracking_restored` |
| `In Progress`| `progression.current_status == STOPPED_AT` with `stop_id` | `complete_run` |

## Stale run scanning

`scan_stale_runs` runs every 30 s via Celery Beat:

- `IN_PROGRESS` + staleness > 60 s → `run_tracking_lost`
- `NO_SIGNAL` + staleness > 300 s → `run_tracking_expired`

## Environment variables

| Var | Default | Purpose |
|-------------------------|--------------------|---------------------------------------------------------------|
| `MQTT_CONSUMER_ENABLED` | `false` | Master switch; set `true` only on the realtime-engine worker. |
| `MQTT_HOST` | `telemetry-broker` | Broker hostname (resolved inside compose). |
| `MQTT_PORT` | `1883` | Broker port. |
| `REDIS_HOST` | `state` | Redis hostname. |
| `REDIS_PORT` | `6379` | Redis port. |
Loading