Open
Conversation
…ebuild task - Add task_run and task_execution_log DB tables (Liquibase migration) for generic, reusable execution tracking across tasks and functions. Compatible with DatasetTraceService pattern for future migration. - Add TaskExecutionTracker helper (functions-python/helpers/task_execution/) with start_run, is_triggered, mark_triggered, mark_completed, mark_failed, get_summary. Designed for reuse across any function or task. - Extend rebuild_missing_validation_reports task with: - validator_endpoint override (enables staging URL for pre-release runs) - validator_version auto-fetch from endpoint - Updated DB query: also matches datasets with stale validator version - force_update param - GCS blob existence check (ported from update_validation_report) - limit param for chunked/end-to-end testing - bypass_db_update set automatically for non-default endpoints - TaskExecutionTracker integration for idempotent, resumable execution - Integrate TaskExecutionTracker into execute_workflows() helper: skips already-triggered datasets, stores GCP Workflow execution_ref - Add get_validation_run_status task in tasks_executor for end-to-end monitoring; supports sync_workflow_status=true to poll GCP Workflows API for pre-release (bypass_db_update=true) runs - Update process_validation_report to mark tracker completed after DB write - 100 tests pass, 82% branch coverage Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Fix query_side_effect to accept *args (session.query called with multiple columns for TaskExecutionLog query) - Remove TestGetValidationRunStatus from helpers tests — tasks.* module not on PYTHONPATH there; coverage already in tasks_executor tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…n_reports Remove implicit derivation of bypass_db_update from validator_endpoint comparison. Callers now pass bypass_db_update explicitly (default: False). This decouples the routing concern (which URL to call) from the DB-write concern (whether to surface results in the API). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
When omitted, all datasets are considered regardless of download date. Previously defaulted to 7 days which conflicted with the limit param use case (end-to-end testing across all feeds). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Move limit-slicing before start_run so total_count reflects the intended trigger count for this call (not all candidates). pending = total_count - (triggered + completed + failed); if > 0 the dispatch loop timed out and rebuild should be called again. - Store total_candidates in task_run.params for full-picture reporting. - Add dispatch_complete and total_candidates to get_validation_run_status response so callers can distinguish timeout from normal in-progress. - Expose params from get_summary() for downstream consumers. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…e runbook - Document all new parameters: bypass_db_update, limit, force_update, updated filter_after_in_days (now optional, no default) - Add get_validation_run_status task documentation with response field glossary - Add step-by-step pre-release validator analytics runbook (dry run → test batch → full run → monitor → BigQuery trigger) - Update top-level tasks_executor README with new task examples Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Use the correct curl invocation with identity token auth instead of the gcloud functions call command. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Filters datasets by Feed.operational_status. When omitted, defaults to ["published"] so only live feeds are validated by default. Callers can pass ["published", "unpublished", "wip"] to include all feeds. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…_reports Previously _filter_datasets_with_existing_blob checked GCS for all candidate datasets (up to ~5000), even when limit=10. Now the limit slice is applied first so only the intended datasets are checked. total_candidates in the result still reflects the full DB query count. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Pass limit into _filter_datasets_with_existing_blob so it stops iterating as soon as it has collected enough valid datasets. Candidates with no blob are skipped and the next one is checked, ensuring we always return up to limit valid datasets without unnecessary GCS API calls. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The set_ dict in PostgreSQL upserts requires the actual DB column name. 'metadata_' is only the SQLAlchemy ORM attribute alias (reserved name); the real column is 'metadata', so the DO UPDATE clause was failing. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This reverts commit 2a88475.
…sync_task_run_status - New generic task sync_task_run_status: - Works for any task_run (not GTFS-specific) - Polls GCP Workflows API for triggered entries with execution_ref - Updates task_execution_log statuses (triggered → completed/failed) - Marks task_run.status = 'completed' when all settled - Re-schedules itself via Cloud Task every 10 minutes until complete - TaskExecutionTracker.schedule_status_sync(): - Enqueues a Cloud Task to tasks_executor using TASK_RUN_SYNC_QUEUE - 10-min epoch bucket for idempotent task naming - No-op when env vars missing (safe in tests/local) - rebuild_missing_validation_reports calls tracker.schedule_status_sync() after start_run() on every non-dry run - Terraform: new task-run-sync-queue + TASK_RUN_SYNC_QUEUE env var in tasks_executor - Remove get_validation_run_status_task.py (superseded) - Update README and pre-release runbook Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ync_task_run_status Replace self-scheduling pattern with proper Cloud Tasks retry mechanism: - TaskExecutionTracker.schedule_status_sync(): stable task name derived from task_name+run_id only (no time bucket) — idempotent, ALREADY_EXISTS is silently swallowed so repeated calls to rebuild don't create duplicate polling loops - sync_task_run_status: no longer re-schedules itself. Returns normally (→ HTTP 200) when complete; raises TaskInProgressError (→ HTTP 503) when still in progress. Cloud Tasks retries on 503 per queue retry_config. - tasks_executor main.py: catches TaskInProgressError and returns 503 - Terraform queue retry_config: min_backoff=max_backoff=600s, max_doublings=0 → constant 10-min polling interval, max_attempts=100 (covers ~16 hours of validation runs) - Add TaskInProgressError to task_execution_tracker module Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The UUID is returned from start_run() and part of the class's public API. Using a private name (_) was inconsistent with that behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Returns current DB state for any (task_name, run_id) without side effects: - No GCP Workflows API calls - No status transitions - No TaskInProgressError / 503 — always returns HTTP 200 Response includes all get_summary() fields plus dispatch_complete (pending == 0) to indicate whether the dispatch loop finished. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Passing delay_seconds=0 caused the Cloud Task to fire immediately, before the dispatch loop had run — guaranteed 503 wasted call. Use delay_seconds=600 to match the queue retry interval so the first check happens after the dispatch loop has had meaningful time to run. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
In dry run mode, query task_execution_log to count how many of the candidate datasets are already tracked (triggered/completed) for the current run_id. This tells you how many would be skipped if the real run were executed. New response field: total_already_tracked (int, always present; 0 in non-dry runs) New tracker method: count_already_tracked(entity_ids) — single bulk COUNT query Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…g_validation_reports The update_validation_report Cloud Function is fully covered by the rebuild_missing_validation_reports task in tasks_executor, which adds: - TaskExecutionTracker integration (idempotent, resumable) - dry_run, limit, filter_after_in_days, filter_op_statuses params - dispatch_complete indicator and Cloud Tasks status sync Removed: - functions-python/update_validation_report/ (entire directory) - Terraform: google_cloudfunctions2_function.update_validation_report - Terraform: google_storage_bucket_object.update_validation_report_zip - Terraform: google_cloud_tasks_queue.update_validation_report_task_queue - Terraform locals: function_update_validation_report_config / _zip - Terraform locals: all_secret_dicts entry for update_validation_report Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
davidgamez
commented
Apr 7, 2026
| # Convert the JSON data to a single NDJSON record (one line) | ||
| storage_client = storage.Client(project=project_id) | ||
| bucket = storage_client.get_bucket(bucket_name) | ||
| bucket = storage_client.bucket(bucket_name) |
Member
Author
There was a problem hiding this comment.
This fixes an issue with generating the ndjson files caused by a missing listing permission on the bucket.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
From our AI friend
This pull request introduces a new workflow execution tracking system, adds a read-only status query endpoint, and integrates idempotent execution tracking into the validation report update process. The main changes include the addition of the
TaskExecutionTrackerwith comprehensive tests, new API endpoints for querying and syncing task run status, and updates to existing workflow logic to ensure robust and observable execution.Execution Tracking and Status Querying
TaskExecutionTrackerutility with full unit tests to track workflow execution status, including methods for marking runs as triggered, completed, or failed, and for summarizing run status.get_task_run_statusAPI endpoint and handler, which provides a read-only snapshot of a tracked workflow run, including counts of triggered, completed, failed, and pending entities. [1] [2] [3] [4]Integration with Validation Workflows
execute_workflowsinvalidation_report_update.pyto support idempotent execution usingTaskExecutionTracker, skipping already triggered/completed datasets and logging new triggers and failures. [1] [2] [3]create_validation_report_entitiesto always update the execution tracker upon completion, ensuring monitoring works for all validation runs. [1] [2]API and Error Handling Enhancements
get_task_run_status,sync_task_run_status) in the main task executor, including documentation and handler wiring. [1] [2]These changes provide robust, observable, and idempotent workflow execution tracking, making it easier to monitor and debug large-scale batch processes.
Expected behavior:
The
rebuild missing validation reporttask supports rebuilding all reports from the specified version. The caller can track the progress using theget_task_run_statustask. The progress is updated every 10 minutes by thesync_task_run_statustasks that is called via a cloud task queue.Testing tips:
This can be tested by initiating the process, calling the task in dev with the following parameters:
{ "task": "rebuild_missing_validation_reports", "payload": { "dry_run": true, "bypass_db_update": true, "filter_after_in_days": null, "filter_statuses": [], "force_update": false, "validator_endpoint": "https://stg-gtfs-validator-web-mbzoxaljzq-ue.a.run.app", "limit": 1 } }Please make sure these boxes are checked before submitting your pull request - thanks!
./scripts/api-tests.shto make sure you didn't break anything