Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 70 additions & 37 deletions python/api/routers/hfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
route_id_pattern = re.compile(r"^[A-Za-z0-9]+$")
CHUNK_SIZE = 10000 # Adjust to optimize if needed


class GzippedFileResponse(Response):
media_type = "application/gzip"

Expand Down Expand Up @@ -359,19 +360,26 @@ async def get_tlp_raw_data(
)
return response


@router.get(
"/delay_analytics",
summary="Get delay analytics data.",
description="Returns delay analytics as packaged zip file. Initial request will start the analysis. Following requests will return the status of the analysis or the data.",
description=(
"Returns delay analytics as packaged zip file. Initial request will start the analysis. "
"Following requests will return the status of the analysis or the data. "
"Re-execute the analysis to refresh the status of the analysis if needed."
),
responses={
200: {
"description": "The data is returned as an attachment in the response.",
"content": {"application/gzip": {"schema": None, "example": None}}
"content": {"application/gzip": {"schema": None, "example": None}},
},
202: {
"description": "Status message returned. Analysis queued, running or created, check again later."
},
202: {"description": "Status message returned. Analysis queued, running or created, check again later."},
204: {"description": "Query returned no data with the given parameters."},
422: {"description": "Query had invalid parameters."}
}
422: {"description": "Query had invalid parameters."},
},
)
async def get_delay_analytics_data_durable(
route_id: Optional[str] = Query(
Expand All @@ -384,30 +392,35 @@ async def get_delay_analytics_data_durable(
default=None,
title="From oday (YYYY-MM-DD)",
description=(
"The oday from which the preprocessed clusters and departures will be used.",
"If same oday is used for from_oday and to_oday the analysis for that day will be returned.",
"If no date given the default value will be used (five days prior)."
"The oday from which the preprocessed clusters and departures will be used. ",
"If same oday is used for from_oday and to_oday the analysis for that day will be returned. ",
"If no date given the default value will be used (five days prior). ",
"The database contains data from first of May 2025. ",
"Format `yyyy-MM-dd`. ",
),
example="2025-02-10"
example="2025-04-01",
),
to_oday: Optional[date] = Query(
default=None,
title="To oday (YYYY-MM-DD)",
description=(
"The oday to which the preprocessed clusters and departures will be used.",
"If same oday is used for from_oday and to_oday the analysis for that day will be returned.",
"If no date given the default value will be used (yesterday)."
"The oday to which the preprocessed clusters and departures will be used. ",
"If same oday is used for from_oday and to_oday the analysis for that day will be returned. ",
"If no date given the default value will be used (yesterday). ",
"The database contains data from first of May 2025. ",
"Format `yyyy-MM-dd`. ",
),
example="2025-02-10"
example="2025-04-07",
),
exclude_dates: Optional[str] = Query(
default=None,
title="Days to exclude (YYYY-MM-DD)",
description=(
"The days to be excluded from the analysis."
"Provide valid date or dates separated with a comma."
"The days to be excluded from the analysis. "
"Provide valid date or dates separated with a comma. "
"Format `yyyy-MM-dd`. "
),
example="2025-02-10,2025-02-11"
example="2025-04-02,2025-04-03",
),
) -> Response:
"""
Expand All @@ -418,7 +431,7 @@ async def get_delay_analytics_data_durable(
"""

default_from_oday = get_target_oday(15)
default_to_oday = get_target_oday()
default_to_oday = get_target_oday()
if not from_oday:
from_oday = default_from_oday
if not to_oday:
Expand All @@ -428,8 +441,10 @@ async def get_delay_analytics_data_durable(
from_oday=from_oday, to_oday=to_oday
)
if not is_date_range_valid_:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=date_range_validity_message)

raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=date_range_validity_message,
)

if route_id is None or not route_id.strip():
route_ids = []
Expand All @@ -447,10 +462,9 @@ async def get_delay_analytics_data_durable(
"msg": f"Invalid route ID: {rid}. Only letters and digits allowed.",
"input": rid,
}
]
],
)


if exclude_dates is not None:
raw_dates = [r.strip() for r in exclude_dates.split(",") if r.strip()]
valid_dates = []
Expand All @@ -467,7 +481,7 @@ async def get_delay_analytics_data_durable(
"msg": f"Invalid date: {d}. Expected format is YYYY-MM-DD.",
"input": d,
}
]
],
)
valid_dates.sort()
exclude_dates = valid_dates
Expand All @@ -478,7 +492,7 @@ async def get_delay_analytics_data_durable(
"route_ids": route_ids,
"from_oday": str(from_oday),
"to_oday": str(to_oday),
"days_excluded": exclude_dates
"days_excluded": exclude_dates,
}

try:
Expand All @@ -489,7 +503,7 @@ async def get_delay_analytics_data_durable(
return Response(
status_code=status.HTTP_202_ACCEPTED,
content=resp.content,
media_type=resp.headers.get("Content-Type", "application/json")
media_type=resp.headers.get("Content-Type", "application/json"),
)
resp.raise_for_status()
try:
Expand All @@ -501,10 +515,13 @@ async def get_delay_analytics_data_durable(
media_type=resp.headers.get("Content-Type", "application/zip"),
headers={
"Content-Disposition": 'attachment; filename="clusters.zip"'
}
},
)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Could not start Durable function: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Could not start Durable function: {e}",
)


@router.post(
Expand All @@ -522,21 +539,37 @@ async def add_preprocess_data_from_blob_to_db(
preprocess_type: Literal["clusters", "departures"], response: Response
) -> dict[str, List[str]]:
with CustomDbLogHandler("api"):
client = FlowAnalyticsContainerClient()

blob_data = await client.get_existing_blob_data_from_previous_2_months(preprocess_type=preprocess_type)
logger.debug(f'Found {len(blob_data)} blobs in blob storage')
client = FlowAnalyticsContainerClient()

blob_data = await client.get_existing_blob_data_from_previous_2_months(
preprocess_type=preprocess_type
)
logger.debug(f"Found {len(blob_data)} blobs in blob storage")

db_data = await get_existing_date_and_route_id_from_preprocess_table(preprocess_type=preprocess_type)
db_data = await get_existing_date_and_route_id_from_preprocess_table(
preprocess_type=preprocess_type
)
logger.debug(f"Found {len(db_data)} rows in database")

missing_data = await find_missing_preprocess_data_in_db_compared_to_blob_storage(db_data=db_data, blobs_data=blob_data)
logger.debug(f"Found { len(missing_data)} blobs which are not in the database yet.")

missing_data = (
await find_missing_preprocess_data_in_db_compared_to_blob_storage(
db_data=db_data, blobs_data=blob_data
)
)
logger.debug(
f"Found {len(missing_data)} blobs which are not in the database yet."
)

if len(missing_data) > 0:
await upload_missing_preprocess_data_to_db(client=client, missing_blobs=missing_data, preprocess_type=preprocess_type)
logger.debug(f"Successfully imported {len(missing_data)} blobs from blob storage to database")
await upload_missing_preprocess_data_to_db(
client=client,
missing_blobs=missing_data,
preprocess_type=preprocess_type,
)
logger.debug(
f"Successfully imported {len(missing_data)} blobs from blob storage to database"
)
else:
logger.debug("There are 0 blobs to be added from blob storage to database")
response.status_code = status.HTTP_201_CREATED
return {'imported data': [blob.blob_path for blob in missing_data]}
return {"imported data": [blob.blob_path for blob in missing_data]}
Loading
Loading