Skip to content

feat: GTFS validator task sync#1650

Open
davidgamez wants to merge 31 commits intomainfrom
feat/gtfs-validators-orchestrator
Open

feat: GTFS validator task sync#1650
davidgamez wants to merge 31 commits intomainfrom
feat/gtfs-validators-orchestrator

Conversation

@davidgamez
Copy link
Copy Markdown
Member

@davidgamez davidgamez commented Mar 31, 2026

From our AI friend

This pull request introduces a new workflow execution tracking system, adds a read-only status query endpoint, and integrates idempotent execution tracking into the validation report update process. The main changes include the addition of the TaskExecutionTracker with comprehensive tests, new API endpoints for querying and syncing task run status, and updates to existing workflow logic to ensure robust and observable execution.

Execution Tracking and Status Querying

  • Added a new TaskExecutionTracker utility with full unit tests to track workflow execution status, including methods for marking runs as triggered, completed, or failed, and for summarizing run status.
  • Introduced a new get_task_run_status API endpoint and handler, which provides a read-only snapshot of a tracked workflow run, including counts of triggered, completed, failed, and pending entities. [1] [2] [3] [4]

Integration with Validation Workflows

  • Updated execute_workflows in validation_report_update.py to support idempotent execution using TaskExecutionTracker, skipping already triggered/completed datasets and logging new triggers and failures. [1] [2] [3]
  • Modified create_validation_report_entities to always update the execution tracker upon completion, ensuring monitoring works for all validation runs. [1] [2]

API and Error Handling Enhancements

  • Registered new tasks (get_task_run_status, sync_task_run_status) in the main task executor, including documentation and handler wiring. [1] [2]
  • Improved error handling in the main task executor to return HTTP 503 for in-progress runs, allowing Cloud Tasks to retry appropriately. (Fa9b4387L20R20, functions-python/tasks_executor/src/main.pyR224-R228)

These changes provide robust, observable, and idempotent workflow execution tracking, making it easier to monitor and debug large-scale batch processes.

Expected behavior:

The rebuild missing validation report task supports rebuilding all reports from the specified version. The caller can track the progress using the get_task_run_status task. The progress is updated every 10 minutes by the sync_task_run_status tasks that is called via a cloud task queue.

Testing tips:

This can be tested by initiating the process, calling the task in dev with the following parameters:

{
  "task": "rebuild_missing_validation_reports",
  "payload": {
    "dry_run": true,
    "bypass_db_update": true,
    "filter_after_in_days": null,
    "filter_statuses": [],
    "force_update": false,
    "validator_endpoint": "https://stg-gtfs-validator-web-mbzoxaljzq-ue.a.run.app",
    "limit": 1
  }
}

Please make sure these boxes are checked before submitting your pull request - thanks!

  • Run the unit tests with ./scripts/api-tests.sh to make sure you didn't break anything
  • Add or update any needed documentation to the repo
  • Format the title like "feat: [new feature short description]". Title must follow the Conventional Commit Specification(https://www.conventionalcommits.org/en/v1.0.0/).
  • Linked all relevant issues
  • Include screenshot(s) showing how this pull request works and fixes the issue(s)

davidgamez and others added 29 commits March 31, 2026 12:42
…ebuild task

- Add task_run and task_execution_log DB tables (Liquibase migration) for
  generic, reusable execution tracking across tasks and functions.
  Compatible with DatasetTraceService pattern for future migration.

- Add TaskExecutionTracker helper (functions-python/helpers/task_execution/)
  with start_run, is_triggered, mark_triggered, mark_completed, mark_failed,
  get_summary. Designed for reuse across any function or task.

- Extend rebuild_missing_validation_reports task with:
  - validator_endpoint override (enables staging URL for pre-release runs)
  - validator_version auto-fetch from endpoint
  - Updated DB query: also matches datasets with stale validator version
  - force_update param
  - GCS blob existence check (ported from update_validation_report)
  - limit param for chunked/end-to-end testing
  - bypass_db_update set automatically for non-default endpoints
  - TaskExecutionTracker integration for idempotent, resumable execution

- Integrate TaskExecutionTracker into execute_workflows() helper:
  skips already-triggered datasets, stores GCP Workflow execution_ref

- Add get_validation_run_status task in tasks_executor for end-to-end
  monitoring; supports sync_workflow_status=true to poll GCP Workflows API
  for pre-release (bypass_db_update=true) runs

- Update process_validation_report to mark tracker completed after DB write

- 100 tests pass, 82% branch coverage

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Fix query_side_effect to accept *args (session.query called with
  multiple columns for TaskExecutionLog query)
- Remove TestGetValidationRunStatus from helpers tests — tasks.*
  module not on PYTHONPATH there; coverage already in tasks_executor tests

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…n_reports

Remove implicit derivation of bypass_db_update from validator_endpoint
comparison. Callers now pass bypass_db_update explicitly (default: False).
This decouples the routing concern (which URL to call) from the DB-write
concern (whether to surface results in the API).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
When omitted, all datasets are considered regardless of download date.
Previously defaulted to 7 days which conflicted with the limit param
use case (end-to-end testing across all feeds).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Move limit-slicing before start_run so total_count reflects the
  intended trigger count for this call (not all candidates).
  pending = total_count - (triggered + completed + failed); if > 0
  the dispatch loop timed out and rebuild should be called again.
- Store total_candidates in task_run.params for full-picture reporting.
- Add dispatch_complete and total_candidates to get_validation_run_status
  response so callers can distinguish timeout from normal in-progress.
- Expose params from get_summary() for downstream consumers.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…e runbook

- Document all new parameters: bypass_db_update, limit, force_update,
  updated filter_after_in_days (now optional, no default)
- Add get_validation_run_status task documentation with response field glossary
- Add step-by-step pre-release validator analytics runbook (dry run →
  test batch → full run → monitor → BigQuery trigger)
- Update top-level tasks_executor README with new task examples

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Use the correct curl invocation with identity token auth instead of
the gcloud functions call command.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Filters datasets by Feed.operational_status. When omitted, defaults to
["published"] so only live feeds are validated by default. Callers can
pass ["published", "unpublished", "wip"] to include all feeds.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…_reports

Previously _filter_datasets_with_existing_blob checked GCS for all
candidate datasets (up to ~5000), even when limit=10. Now the limit
slice is applied first so only the intended datasets are checked.

total_candidates in the result still reflects the full DB query count.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Pass limit into _filter_datasets_with_existing_blob so it stops
iterating as soon as it has collected enough valid datasets. Candidates
with no blob are skipped and the next one is checked, ensuring we always
return up to limit valid datasets without unnecessary GCS API calls.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The set_ dict in PostgreSQL upserts requires the actual DB column name.
'metadata_' is only the SQLAlchemy ORM attribute alias (reserved name);
the real column is 'metadata', so the DO UPDATE clause was failing.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…sync_task_run_status

- New generic task sync_task_run_status:
  - Works for any task_run (not GTFS-specific)
  - Polls GCP Workflows API for triggered entries with execution_ref
  - Updates task_execution_log statuses (triggered → completed/failed)
  - Marks task_run.status = 'completed' when all settled
  - Re-schedules itself via Cloud Task every 10 minutes until complete

- TaskExecutionTracker.schedule_status_sync():
  - Enqueues a Cloud Task to tasks_executor using TASK_RUN_SYNC_QUEUE
  - 10-min epoch bucket for idempotent task naming
  - No-op when env vars missing (safe in tests/local)

- rebuild_missing_validation_reports calls tracker.schedule_status_sync()
  after start_run() on every non-dry run

- Terraform: new task-run-sync-queue + TASK_RUN_SYNC_QUEUE env var in tasks_executor
- Remove get_validation_run_status_task.py (superseded)
- Update README and pre-release runbook

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ync_task_run_status

Replace self-scheduling pattern with proper Cloud Tasks retry mechanism:

- TaskExecutionTracker.schedule_status_sync(): stable task name derived
  from task_name+run_id only (no time bucket) — idempotent, ALREADY_EXISTS
  is silently swallowed so repeated calls to rebuild don't create duplicate
  polling loops

- sync_task_run_status: no longer re-schedules itself. Returns normally
  (→ HTTP 200) when complete; raises TaskInProgressError (→ HTTP 503) when
  still in progress. Cloud Tasks retries on 503 per queue retry_config.

- tasks_executor main.py: catches TaskInProgressError and returns 503

- Terraform queue retry_config: min_backoff=max_backoff=600s,
  max_doublings=0 → constant 10-min polling interval, max_attempts=100
  (covers ~16 hours of validation runs)

- Add TaskInProgressError to task_execution_tracker module

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The UUID is returned from start_run() and part of the class's public API.
Using a private name (_) was inconsistent with that behavior.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Returns current DB state for any (task_name, run_id) without side effects:
- No GCP Workflows API calls
- No status transitions
- No TaskInProgressError / 503 — always returns HTTP 200

Response includes all get_summary() fields plus dispatch_complete
(pending == 0) to indicate whether the dispatch loop finished.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Passing delay_seconds=0 caused the Cloud Task to fire immediately,
before the dispatch loop had run — guaranteed 503 wasted call.

Use delay_seconds=600 to match the queue retry interval so the first
check happens after the dispatch loop has had meaningful time to run.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
In dry run mode, query task_execution_log to count how many of the
candidate datasets are already tracked (triggered/completed) for the
current run_id. This tells you how many would be skipped if the real
run were executed.

New response field: total_already_tracked (int, always present; 0 in non-dry runs)
New tracker method: count_already_tracked(entity_ids) — single bulk COUNT query

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…g_validation_reports

The update_validation_report Cloud Function is fully covered by the
rebuild_missing_validation_reports task in tasks_executor, which adds:
- TaskExecutionTracker integration (idempotent, resumable)
- dry_run, limit, filter_after_in_days, filter_op_statuses params
- dispatch_complete indicator and Cloud Tasks status sync

Removed:
- functions-python/update_validation_report/ (entire directory)
- Terraform: google_cloudfunctions2_function.update_validation_report
- Terraform: google_storage_bucket_object.update_validation_report_zip
- Terraform: google_cloud_tasks_queue.update_validation_report_task_queue
- Terraform locals: function_update_validation_report_config / _zip
- Terraform locals: all_secret_dicts entry for update_validation_report

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@davidgamez davidgamez changed the title feat: GTFS validator orchestrator — generic task tracker + extended r… feat: GTFS validator task sync Apr 7, 2026
@davidgamez davidgamez marked this pull request as ready for review April 7, 2026 17:35
# Convert the JSON data to a single NDJSON record (one line)
storage_client = storage.Client(project=project_id)
bucket = storage_client.get_bucket(bucket_name)
bucket = storage_client.bucket(bucket_name)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes an issue with generating the ndjson files caused by a missing listing permission on the bucket.

@davidgamez davidgamez linked an issue Apr 7, 2026 that may be closed by this pull request
@davidgamez davidgamez requested review from cka-y and jcpitre April 7, 2026 17:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Review and fix generate analytics workflow

1 participant