Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion .github/workflows/api-deployer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ on:
description: Oauth client id part of the authorization for the operations API
required: true
type: string
WEB_APP_REVALIDATE_URL:
description: URL of the web app revalidation endpoint
required: false
type: string
WEB_APP_REVALIDATE_SECRET_1PASSWORD:
description: 1Password reference for the web app revalidation secret token
required: false
type: string
SKIP_TESTS:
description: The skip test parameter is useful for DEV environment deployments, not advised for QA and PROD.
required: true
Expand Down Expand Up @@ -289,6 +297,7 @@ jobs:
echo "OAUTH2_CLIENT_SECRET=${{ secrets.OAUTH2_CLIENT_SECRET }}" >> $GITHUB_ENV
echo "GLOBAL_RATE_LIMIT_REQ_PER_MINUTE=${{ inputs.GLOBAL_RATE_LIMIT_REQ_PER_MINUTE }}" >> $GITHUB_ENV
echo "VALIDATOR_ENDPOINT=${{ inputs.VALIDATOR_ENDPOINT }}" >> $GITHUB_ENV
echo "WEB_APP_REVALIDATE_URL=${{ inputs.WEB_APP_REVALIDATE_URL }}" >> $GITHUB_ENV

- name: Load secret from 1Password
uses: 1password/load-secrets-action@v2
Expand All @@ -300,10 +309,19 @@ jobs:
TDG_API_TOKEN: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/Transport.data.gouv.fr API Token/credential"
OPERATIONS_OAUTH2_CLIENT_ID: ${{ inputs.OPERATIONS_OAUTH2_CLIENT_ID_1PASSWORD }}

- name: Load web app revalidation secret from 1Password
if: ${{ inputs.WEB_APP_REVALIDATE_SECRET_1PASSWORD != '' }}
uses: 1password/load-secrets-action@v2
with:
export-env: true
env:
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
WEB_APP_REVALIDATE_SECRET: ${{ inputs.WEB_APP_REVALIDATE_SECRET_1PASSWORD }}

- name: Populate Variables
run: |
scripts/replace-variables.sh -in_file infra/backend.conf.rename_me -out_file infra/backend.conf -variables BUCKET_NAME,OBJECT_PREFIX
scripts/replace-variables.sh -in_file infra/vars.tfvars.rename_me -out_file infra/vars.tfvars -variables PROJECT_ID,REGION,ENVIRONMENT,DEPLOYER_SERVICE_ACCOUNT,FEED_API_IMAGE_VERSION,OAUTH2_CLIENT_ID,OAUTH2_CLIENT_SECRET,GLOBAL_RATE_LIMIT_REQ_PER_MINUTE,ARTIFACT_REPO_NAME,VALIDATOR_ENDPOINT,TRANSITLAND_API_KEY,OPERATIONS_OAUTH2_CLIENT_ID,TDG_API_TOKEN
scripts/replace-variables.sh -in_file infra/vars.tfvars.rename_me -out_file infra/vars.tfvars -variables PROJECT_ID,REGION,ENVIRONMENT,DEPLOYER_SERVICE_ACCOUNT,FEED_API_IMAGE_VERSION,OAUTH2_CLIENT_ID,OAUTH2_CLIENT_SECRET,GLOBAL_RATE_LIMIT_REQ_PER_MINUTE,ARTIFACT_REPO_NAME,VALIDATOR_ENDPOINT,TRANSITLAND_API_KEY,OPERATIONS_OAUTH2_CLIENT_ID,TDG_API_TOKEN,WEB_APP_REVALIDATE_URL,WEB_APP_REVALIDATE_SECRET

- uses: hashicorp/setup-terraform@v3
with:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/api-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:
SKIP_TESTS: true
VALIDATOR_ENDPOINT: https://stg-gtfs-validator-web-mbzoxaljzq-ue.a.run.app
OPERATIONS_OAUTH2_CLIENT_ID_1PASSWORD: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_RETOOL_OAUTH2_CREDS/username"
WEB_APP_REVALIDATE_URL: "https://staging.mobilitydatabase.org/api/revalidate"
WEB_APP_REVALIDATE_SECRET_1PASSWORD: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/MobilityDatabase Vercel Deployment/REVALIDATE_SECRET_QA"
secrets:
GCP_MOBILITY_FEEDS_SA_KEY: ${{ secrets.DEV_GCP_MOBILITY_FEEDS_SA_KEY }}
OAUTH2_CLIENT_ID: ${{ secrets.DEV_MOBILITY_FEEDS_OAUTH2_CLIENT_ID}}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/api-prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ jobs:
SKIP_TESTS: false
VALIDATOR_ENDPOINT: https://gtfs-validator-web-mbzoxaljzq-ue.a.run.app
OPERATIONS_OAUTH2_CLIENT_ID_1PASSWORD: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_RETOOL_OAUTH2_CREDS/username"
WEB_APP_REVALIDATE_URL: "https://mobilitydatabase.org/api/revalidate"
WEB_APP_REVALIDATE_SECRET_1PASSWORD: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/MobilityDatabase Vercel Deployment/REVALIDATE_SECRET"
secrets:
GCP_MOBILITY_FEEDS_SA_KEY: ${{ secrets.PROD_GCP_MOBILITY_FEEDS_SA_KEY }}
OAUTH2_CLIENT_ID: ${{ secrets.PROD_MOBILITY_FEEDS_OAUTH2_CLIENT_ID}}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/api-qa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ jobs:
GLOBAL_RATE_LIMIT_REQ_PER_MINUTE: ${{ vars.GLOBAL_RATE_LIMIT_REQ_PER_MINUTE }}
VALIDATOR_ENDPOINT: https://stg-gtfs-validator-web-mbzoxaljzq-ue.a.run.app
OPERATIONS_OAUTH2_CLIENT_ID_1PASSWORD: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_RETOOL_OAUTH2_CREDS/username"
WEB_APP_REVALIDATE_URL: "https://staging.mobilitydatabase.org/api/revalidate"
WEB_APP_REVALIDATE_SECRET_1PASSWORD: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/MobilityDatabase Vercel Deployment/REVALIDATE_SECRET_QA"
secrets:
GCP_MOBILITY_FEEDS_SA_KEY: ${{ secrets.QA_GCP_MOBILITY_FEEDS_SA_KEY }}
OAUTH2_CLIENT_ID: ${{ secrets.DEV_MOBILITY_FEEDS_OAUTH2_CLIENT_ID}}
Expand Down
3 changes: 2 additions & 1 deletion api/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ PyJWT
shapely
google-cloud-pubsub
pycountry
pytz
pytz
google-cloud-tasks
89 changes: 89 additions & 0 deletions api/src/shared/common/gcp_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
from typing import List

REFRESH_VIEW_TASK_EXECUTOR_BODY = json.dumps(
{"task": "refresh_materialized_view", "payload": {"dry_run": False}}
Expand Down Expand Up @@ -69,6 +70,94 @@ def create_refresh_materialized_view_task():
return {"error": "Error enqueuing task: %s" % error}, 500


def create_web_revalidation_task(feed_stable_ids: List[str]) -> None:
"""
Enqueue a Cloud Task to revalidate the web app cache for specific feed pages.
Uses time-bucketed task names for deduplication: multiple calls for the same
feed within the same 30-minute window are collapsed into one task.

Args:
feed_stable_ids: List of feed stable IDs whose pages should be revalidated.
"""
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
from datetime import datetime, timedelta

if not feed_stable_ids:
return

try:
now = datetime.now()

# BOUNCE WINDOW: next :00 or :30 (same pattern as materialized view refresh)
minute = now.minute
if minute < 30:
bucket_time = now.replace(minute=30, second=0, microsecond=0)
else:
bucket_time = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)

proto_time = timestamp_pb2.Timestamp()
proto_time.FromDatetime(bucket_time)

project = os.getenv("PROJECT_ID")
queue = os.getenv("WEB_REVALIDATION_QUEUE")
gcp_region = os.getenv("GCP_REGION")
environment_name = os.getenv("ENVIRONMENT")
url = f"https://{gcp_region}-{project}.cloudfunctions.net/" f"tasks_executor-{environment_name}"

if not queue:
logging.warning(
"WEB_REVALIDATION_QUEUE not set; skipping revalidation for %s",
feed_stable_ids,
)
return

client = tasks_v2.CloudTasksClient()
timestamp_str = bucket_time.strftime("%Y-%m-%d-%H-%M")

for feed_stable_id in feed_stable_ids:
task_name = f"revalidate-{feed_stable_id}-{timestamp_str}"
body = json.dumps(
{
"task": "revalidate_feed",
"payload": {"feed_stable_id": feed_stable_id},
}
).encode()

try:
create_http_task_with_name(
client=client,
body=body,
url=url,
project_id=project,
gcp_region=gcp_region,
queue_name=queue,
task_name=task_name,
task_time=proto_time,
http_method=tasks_v2.HttpMethod.POST,
)
logging.info(
"Scheduled web revalidation task for feed %s (%s)",
feed_stable_id,
task_name,
)
except Exception as e:
if "ALREADY_EXISTS" in str(e):
logging.info(
"Revalidation task already exists for %s, skipping.",
task_name,
)
else:
logging.error(
"Error creating revalidation task for %s: %s",
feed_stable_id,
e,
)

except Exception as error:
logging.error("Error enqueuing revalidation tasks: %s", error)


def create_http_task_with_name(
client: any, # tasks_v2.CloudTasksClient
body: bytes,
Expand Down
145 changes: 145 additions & 0 deletions api/tests/test_web_revalidation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#
# MobilityData 2025
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys
import unittest
from unittest.mock import MagicMock, patch


class TestCreateWebRevalidationTask(unittest.TestCase):
def setUp(self):
# google-cloud-tasks is not installed in the test environment.
# Provide a MagicMock so `from google.cloud import tasks_v2` succeeds
# for tests that proceed past the early-return guards.
self._mock_tasks_v2 = MagicMock()
self._sys_modules_patcher = patch.dict(sys.modules, {"google.cloud.tasks_v2": self._mock_tasks_v2})
self._sys_modules_patcher.start()

def tearDown(self):
self._sys_modules_patcher.stop()

def test_empty_feed_ids(self):
"""Should return early without creating any tasks."""
from shared.common.gcp_utils import create_web_revalidation_task

# Should not raise
create_web_revalidation_task([])

@patch.dict(
"os.environ",
{
"PROJECT_ID": "test-project",
"WEB_REVALIDATION_QUEUE": "",
"GCP_REGION": "us-central1",
"ENVIRONMENT": "dev",
},
)
def test_missing_queue_env_var(self):
"""Should log a warning and return without creating tasks."""
from shared.common.gcp_utils import create_web_revalidation_task

# Should not raise
create_web_revalidation_task(["mdb-123"])

@patch("shared.common.gcp_utils.create_http_task_with_name")
@patch.dict(
"os.environ",
{
"PROJECT_ID": "test-project",
"WEB_REVALIDATION_QUEUE": "web-revalidation-queue",
"GCP_REGION": "us-central1",
"ENVIRONMENT": "dev",
"SERVICE_ACCOUNT_EMAIL": "test@test.iam.gserviceaccount.com",
},
)
def test_creates_tasks_for_each_feed(self, mock_create_task):
"""Should create one Cloud Task per feed stable ID."""
from shared.common.gcp_utils import create_web_revalidation_task

create_web_revalidation_task(["mdb-100", "mdb-200"])

self.assertEqual(mock_create_task.call_count, 2)

# Verify the task bodies contain the correct feed IDs
first_call_body = mock_create_task.call_args_list[0]
second_call_body = mock_create_task.call_args_list[1]

self.assertIn(b"mdb-100", first_call_body.kwargs.get("body", b""))
self.assertIn(b"mdb-200", second_call_body.kwargs.get("body", b""))

@patch("shared.common.gcp_utils.create_http_task_with_name")
@patch.dict(
"os.environ",
{
"PROJECT_ID": "test-project",
"WEB_REVALIDATION_QUEUE": "web-revalidation-queue",
"GCP_REGION": "us-central1",
"ENVIRONMENT": "dev",
"SERVICE_ACCOUNT_EMAIL": "test@test.iam.gserviceaccount.com",
},
)
def test_dedup_task_name_contains_feed_id(self, mock_create_task):
"""Task name should include the feed stable ID for deduplication."""
from shared.common.gcp_utils import create_web_revalidation_task

create_web_revalidation_task(["mdb-42"])

self.assertEqual(mock_create_task.call_count, 1)
task_name = mock_create_task.call_args.kwargs.get("task_name", "")
self.assertTrue(task_name.startswith("revalidate-mdb-42-"))

@patch("shared.common.gcp_utils.create_http_task_with_name")
@patch.dict(
"os.environ",
{
"PROJECT_ID": "test-project",
"WEB_REVALIDATION_QUEUE": "web-revalidation-queue",
"GCP_REGION": "us-central1",
"ENVIRONMENT": "dev",
"SERVICE_ACCOUNT_EMAIL": "test@test.iam.gserviceaccount.com",
},
)
def test_already_exists_is_handled_gracefully(self, mock_create_task):
"""ALREADY_EXISTS errors should be caught and logged, not raised."""
mock_create_task.side_effect = Exception("409 ALREADY_EXISTS: task already exists")
from shared.common.gcp_utils import create_web_revalidation_task

# Should not raise
create_web_revalidation_task(["mdb-123"])

@patch("shared.common.gcp_utils.create_http_task_with_name")
@patch.dict(
"os.environ",
{
"PROJECT_ID": "test-project",
"WEB_REVALIDATION_QUEUE": "web-revalidation-queue",
"GCP_REGION": "us-central1",
"ENVIRONMENT": "dev",
"SERVICE_ACCOUNT_EMAIL": "test@test.iam.gserviceaccount.com",
},
)
def test_targets_tasks_executor_url(self, mock_create_task):
"""Tasks should target the tasks_executor Cloud Function URL."""
from shared.common.gcp_utils import create_web_revalidation_task

create_web_revalidation_task(["mdb-1"])

url = mock_create_task.call_args.kwargs.get("url", "")
self.assertIn("tasks_executor-dev", url)
self.assertIn("us-central1", url)


if __name__ == "__main__":
unittest.main()
15 changes: 14 additions & 1 deletion functions-python/batch_process_dataset/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
from sqlalchemy.orm import Session

from shared.common.gcp_memory_utils import limit_gcp_memory
from shared.common.gcp_utils import create_refresh_materialized_view_task
from shared.common.gcp_utils import (
create_refresh_materialized_view_task,
create_web_revalidation_task,
)
from shared.database.database import with_db_session
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Gtfsfile, Gtfsfeed
from shared.dataset_service.main import DatasetTraceService, DatasetTrace, Status
Expand Down Expand Up @@ -631,6 +634,16 @@ def process_dataset(cloud_event: CloudEvent):
dataset_file = processor.process_from_bucket()
else:
dataset_file = processor.process_from_producer_url(json_payload["feed_id"])
# Trigger web app cache revalidation for the updated feed
if dataset_file is not None:
try:
create_web_revalidation_task([stable_id])
except Exception as revalidation_error:
logger.warning(
"Failed to enqueue web revalidation task for %s: %s",
stable_id,
revalidation_error,
)
except Exception as e:
# This makes sure the logger is initialized
logger = get_logger("process_dataset", stable_id if stable_id else "UNKNOWN")
Expand Down
10 changes: 10 additions & 0 deletions functions-python/pmtiles_builder/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from shared.helpers.logger import get_logger, init_logger
from shared.helpers.runtime_metrics import track_metrics
from shared.database.database import with_db_session
from shared.common.gcp_utils import create_web_revalidation_task
from ephemeral_workdir import EphemeralOrDebugWorkdir
import flask
import functions_framework
Expand Down Expand Up @@ -112,6 +113,15 @@ def build_pmtiles_handler(request: flask.Request) -> dict:
result["warning"] = message
else:
result["message"] = "Successfully built pmtiles."
# Trigger web app cache revalidation for the feed
try:
create_web_revalidation_task([feed_stable_id])
except Exception as revalidation_error:
logging.warning(
"Failed to enqueue web revalidation task for %s: %s",
feed_stable_id,
revalidation_error,
)

return result

Expand Down
Loading
Loading