Feat/cal itp import#1670
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new Cal-ITP data import pipeline to the tasks executor, including the import implementation, CKAN query, tests, and a scheduled monthly execution in GCP.
Changes:
- Introduces Cal-ITP import handler + CKAN SQL query for retrieving feed records.
- Registers the new
cal_itp_importtask in the tasks executor and adds unit/e2e tests. - Adds a monthly Cloud Scheduler job to invoke the Cal-ITP import task.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| infra/functions-python/main.tf | Adds a monthly Cloud Scheduler job to call the tasks executor with cal_itp_import. |
| functions-python/tasks_executor/src/main.py | Registers the new cal_itp_import task and handler. |
| functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.py | Implements Cal-ITP dataset retrieval, filtering, upsert logic, and orchestration/commit hooks. |
| functions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sql | Provides the CKAN datastore SQL used to retrieve Cal-ITP feed records. |
| functions-python/tasks_executor/tests/tasks/data_import/test_cal_itp_import.py | Adds helper/unit tests and an end-to-end DB test for the Cal-ITP import flow. |
|
Recent PR commit changes and the comments they address: PR Comments/Concerns
Dry Run rollbackI noticed that we don't perform a rollback on dry run = True with tdg commit. I think it might be necessary because the @with_db_session decorator does not rollback itself, it will commit on its own if no exception is thrown. It will close the session with session.close() in finally block but by then it will also already have committed. We don't currently commit changes on a dry_run anywhere except one location. In the per-dataset processing, on the first run with brand new data, the feed will be created in get_or_create_feed() which will add and flush on line 105 and 106. So the first dry_run with brand new data will write the feed to the db which is the scenario I encountered. Adding the rollback on dry_run from this point onwards isn't necessary maybe because the case only occurs once... PaginationSQL search endpoint itself doesn’t support pagination, we would need to use LIMIT directly in the SQL query or switch to the other endpoint "datastore_search". If we switch to the other endpoint, then we will need to perform the JOIN logic ourselves in python, but with the datastore_search_sql endpoint the JOIN is done on CKAN server side. I added a debug log statement to view the response time and size so we can decide whether its necessary to move to pagination. Currently we received~600 records with SQL query in ~349ms. Reverse Geo-location concernThe fingerprint between db and api feeds doesn't compare the locations, so reverse geolocation overwriting the location won't result in a feed being considered an update. |
Summary:
Closes #1642
This pull request introduces support for importing data from Cal-ITP into the system involving an import handler and its tests.
Cal-ITP Import Feature:
functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.pycontains import handler and associated import logicfunctions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sqlis the CKAN API SQL query to retrieve feeds from Cal-ITPfunctions-python/tasks_executor/tests/tasks/data_import/cal_itp/test_cal_itp_import.pycontains the associated unit and e2e testsinfra/functions-python/main.tfincludes the Google Cloud Scheduler job to run the import monthlyfunctions-python/tasks_executor/src/main.pyincludes the handler to the task listOut of scope:
Redirecting MDB feeds to new Cal-ITP: the redirect and csv defining redirect links will be included in follow up PR
Include licensing for Cal-ITP feeds (follow up PR after confirmation with Cal-ITP)
Cal-ITP Import — Execution Flow & Design Doc
Overview
The Cal-ITP import pipeline fetches GTFS schedule and real-time feeds from the California Integrated Travel Project (Cal-ITP) CKAN API and upserts them into the Mobility Feed API database. It runs as a scheduled HTTP Cloud Function (tasks_executor), triggered monthly by Cloud Scheduler, and fans out to dataset download and web revalidation tasks on completion.
Architecture Diagram
Step-by-Step Execution Flow
1. Cloud Scheduler Trigger
Terraform resource:
google_cloud_scheduler_job.cal_itp_import_schedule(
infra/functions-python/main.tf~line 564)0 0 3 * *— 3 AM UTC, monthly (1st of each month)tasks_executorCloud Function URLfunctions_service_account{"task": "cal_itp_import", "payload": {"dry_run": false}}2. Cloud Function — tasks_executor
Terraform resource:
google_cloudfunctions2_function.tasks_executor(
infra/functions-python/main.tf~line 1090)tasks_executor(inmain.py)FEEDS_DATABASE_URL,FEEDS_CREDENTIALS,WEB_APP_REVALIDATE_SECRETKey env vars set by Terraform:
DATASET_PROCESSING_TOPIC_NAME→datasets-batch-topic-{env}(Pub/Sub)WEB_REVALIDATION_QUEUE→ Cloud Tasks queue nameWEB_APP_REVALIDATE_URL→ web app revalidation endpointPROJECT_ID,ENVIRONMENT,SERVICE_ACCOUNT_EMAIL3. HTTP Router —
main.py:tasks_executor()The function parses
request.get_json()for a"task"key and dispatches to the registered handler:For unknown tasks → HTTP 400. For handler exceptions → HTTP 500.
4.
import_cal_itp_handler(payload)File:
import_cal_itp_feeds.pydry_runfrom payload (default:True)_import_cal_itp(dry_run=dry_run){ "message": "Cal-ITP import executed successfully.", "created_gtfs": 12, "updated_gtfs": 5, "created_rt": 8, "total_processed_items": 120, "params": {"dry_run": false} }5.
_import_cal_itp(db_session, dry_run)— OrchestratorDecorated with
@with_db_session(manages SQLAlchemy session lifecycle).Batch size is controlled by
COMMIT_BATCH_SIZEenv var (default: 5).6. Data Fetching —
_fetch_cal_itp_datasets()https://data.ca.gov/api/3/action/datastore_search_sql?sql=<encoded>ckan_query.sql— joins 4 CKAN datasets:gtfs_datasetse4ca5bd4-...servicesdbacfa9f-...provider_gtfs_dataebe116fb-...organizations677e1271-...is_public = 'Yes'AND at least one feed URL present7. Record Filtering —
_filter_cal_itp_records()Records are grouped by
service_source_record_id. For each group:Bay Area 511 services (detected by "Bay Area 511 Regional" in any name column):
Regional Precursor Feed(preferred)Regional SubfeedCombined Regional FeedAll other services:
gtfs_service_data_customer_facing == true/yes/18. Per-Dataset Processing —
_process_cal_itp_dataset()For each filtered dataset record:
a. Resource Expansion
Expand one dataset dict into 1–4 resource dicts:
schedule_dataset_urlpresent)"entity_type": ["{rt_type}"](a single-element list)Resources are sorted: schedule first, then RT feeds.
b. Validation —
_validate_required_cal_itp_fields()For each resource, validates required fields exist and are non-empty:
schedule_source_record_id,schedule_gtfs_dataset_name,schedule_dataset_url{type}_source_record_id,{type}_gtfs_dataset_name,{type}_dataset_urlRaises
InvalidCalItpFeedErroron failure; resource is skipped.c. Stable ID Generation
Type codes:
s(schedule),tu(trip updates),vp(vehicle positions),sa(service alerts)d. Location Mapping —
_get_cal_itp_locations()Maps
caltrans_district_name→LocationDB row:United States(hardcoded)California(hardcoded)caltrans_district_namee. GTFS Schedule Feed Processing
_probe_head_format()— verify URL returns a ZIP_delete_and_recreate_feed_if_type_changed()— handles type conflicts (delete + flush + recreate)(stable_id, feed_name, provider, producer_url)feed_name,provider,producer_url,operational_status,locations_ensure_cal_itp_external_id()— ensureExternalidrow exists for this feedf. GTFS-RT Feed Processing
_get_entity_types_from_resource()— maps RT type string to entity type codesENTITY_TYPES_MAP:{"trip_updates": "tu", "vehicle_positions": "vp", "service_alerts": "sa"}get_or_create_entity_type(db_session, et)— upsertsEntitytyperowsstatic_current_feedreferencestatic_refsandentity_typesg. Error Handling Per Resource
IntegrityError→ rollback to savepoint, log, continueException→ rollback to savepoint, log, continue9. Stale Feed Deprecation —
_deprecate_stale_feeds()After all datasets are processed:
Feedrows wherestable_id LIKE 'cal_itp-%'processed_stable_ids→ setstatus = "deprecated"10. Commit & Downstream Triggers —
commit_changes()On
IntegrityError: rollback, log, re-raise (propagates to caller).11. Dry Run Mode
When
dry_run=True(the default when called without a payload):Key Design Decisions
cal_itp-{id}-{type})Entity Types Map
trip_updatestuvehicle_positionsvpservice_alertssa