H-3848, H-6154: Handle larger Flow payloads, historical flight data Flows#8356
H-3848, H-6154: Handle larger Flow payloads, historical flight data Flows#8356
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
PR SummaryHigh Risk Overview Updates storage providers to support flow-output keys plus direct upload/download, refactors flow-context lookup to query FlowRun entities by Adds new aviation integration capabilities: a historical-arrivals flow ( Written by Cursor Bugbot for commit 02a7f30. This will update automatically on new commits. Configure here. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #8356 +/- ##
==========================================
- Coverage 60.10% 60.10% -0.01%
==========================================
Files 1235 1234 -1
Lines 118201 118217 +16
Branches 5180 5184 +4
==========================================
Hits 71050 71050
- Misses 46324 46340 +16
Partials 827 827
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
🤖 Augment PR SummarySummary: This PR adds support for larger Flow payloads by offloading certain activity inputs/outputs to storage, and introduces new aviation integration flows for historical flight arrivals and live flight position updates. Changes:
Technical Notes: Stored payload kinds are now represented as references in Flow I/O types and must be resolved by activities (and by backend APIs when returning Flow run details to clients). 🤖 Was this summary useful? React with 👍 or 👎 |
| arrivesAtLink.properties, | ||
| ) | ||
| ) { | ||
| const primaryKeyPropertyMetadata = entity.propertyMetadata([ |
There was a problem hiding this comment.
primaryKeyPropertyMetadata is fetched only for flightNumber but then reused (via casts) for flightDate too; if this metadata includes a property-specific dataTypeId, this can make the proposed flight-date metadata invalid and cause downstream validation/patch failures.
Severity: high
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| step.logs.sort((a, b) => a.recordedAt.localeCompare(b.recordedAt)); | ||
| } | ||
|
|
||
| // Resolve any stored payload references in step outputs |
There was a problem hiding this comment.
This resolves stored payload refs inside per-step outputs, but workflowOutputs parsed from the workflow completion event are still returned unchanged; if those outputs include StoredPayloadRefs, GraphQL clients expecting resolved values may still receive refs here.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| */ | ||
| query += ` ORDER BY StartTime DESC`; | ||
|
|
||
| const workflowIterable = temporalClient.workflow.list({ query }); |
There was a problem hiding this comment.
With the explicit ORDER BY StartTime DESC removed, the reset de-dupe logic later in this function can become order-dependent; if Temporal returns runs out of order, callers may see multiple runs for the same workflowId.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| * Get a singleton instance of the S3 storage provider. | ||
| * This is shared across all activities in a worker. | ||
| */ | ||
| export const getStorageProvider = (): FileStorageProvider => { |
There was a problem hiding this comment.
getStorageProvider() always instantiates AwsS3StorageProvider (and requires AWS config); if a deployment uses LOCAL_FILE_SYSTEM/other storage backends, flow payload offloading will still try S3 and may fail unexpectedly.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| }); | ||
|
|
||
| // Handle rate limiting with retry | ||
| if (response.status === 429) { |
There was a problem hiding this comment.
The 429 handler retries via recursion with no max attempts/backoff escalation, so a sustained rate-limit condition could loop indefinitely (and potentially blow the call stack) rather than failing fast with a clearer error.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| /** | ||
| * If set when proxying an activity, the period of time without a heartbeat after which the activity is considered to have failed. | ||
| */ | ||
| export const heartbeatTimeoutSeconds = 10; |
There was a problem hiding this comment.
A heartbeatTimeoutSeconds of 10s seems very tight given some operations (e.g., batch Graph API calls) can plausibly take >10s between Context.current().heartbeat() calls, which could cause spurious activity timeouts.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| body: string | Buffer; | ||
| contentType?: string; | ||
| }): Promise<void> { | ||
| const filePath = path.join(this.fileUploadPath, path.normalize(key)); |
There was a problem hiding this comment.
The path traversal guard uses filePath.startsWith(this.fileUploadPath), which can be bypassed by prefix collisions (e.g. base /tmp/uploads vs resolved /tmp/uploads2/...); using a resolved path comparison with a path separator boundary would be safer.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Pull request overview
This PR adds support for retrieving historical flight arrival information over time spans and improves handling of large Flow payloads by offloading them to S3 storage. It also fixes enum data type generation issues (H-3848).
Changes:
- Introduced a
StoredPayloadRefsystem that stores large payloads (ProposedEntity, ProposedEntityWithResolvedLinks, PersistedEntitiesMetadata) in S3 instead of passing them through Temporal activities - Added historical flight data retrieval capabilities with automatic 24-hour chunking to handle API limitations
- Implemented rate limiting for AeroAPI (5 requests/second) and retry logic for 429 errors
- Added live flight position tracking via FlightRadar24 integration
- Fixed enum data type code generation by removing redundant
allOfinheritance - Extended activity timeouts and added heartbeat support for long-running batch operations
Reviewed changes
Copilot reviewed 56 out of 56 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| libs/@local/hash-isomorphic-utils/src/flows/types.ts | Core type system changes introducing StoredPayloadRef, StoredPayloadKind, and separated Payload/ResolvedPayload types |
| libs/@local/hash-backend-utils/src/flows/payload-storage.ts | New payload storage infrastructure for S3 offloading with store/retrieve/resolve functions |
| libs/@local/hash-backend-utils/src/integrations/aviation/aero-api/client.ts | Added historical arrivals API integration with date chunking and rate limiting |
| apps/hash-integration-worker/src/activities/flow-activities/integration-activities/persist-integration-entities-action.ts | Refactored to batch operations and resolve stored payloads, with heartbeat support |
| apps/hash-integration-worker/src/activities/flow-activities/aviation-activities/get-historical-flight-arrivals-action.ts | New action for fetching historical flight data over date ranges |
| apps/hash-integration-worker/src/activities/flow-activities/aviation-activities/get-live-flight-positions-action.ts | New action for fetching live flight positions from FlightRadar24 |
| libs/@blockprotocol/graph/src/codegen/preprocess/remove-redundant-data-type-inheritance.ts | New preprocessing step to fix enum data type generation |
| libs/@local/hash-backend-utils/src/flows/get-flow-run-details.ts | Added StoredPayloadRef resolution for step outputs before returning to GraphQL clients |
| apps/hash-integration-worker/src/workflows/run-flow-workflow.ts | Extended activity timeouts to 10 hours with 10-second heartbeat timeouts for batch operations |
| libs/@local/hash-backend-utils/src/file-storage.ts | Unified FileStorageProvider interface with direct upload/download methods |
| apps/hash-ai-worker-ts/src/activities/shared/get-flow-context.ts | Changed flow entity lookup from UUID-based to property query-based |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Handle rate limiting with retry | ||
| if (response.status === 429) { | ||
| await sleep(RATE_LIMIT_RETRY_DELAY_MS); | ||
| return makeRequest<T>(url); | ||
| } |
There was a problem hiding this comment.
The 429 retry logic performs unbounded recursive retries without any exponential backoff or maximum retry limit. If the API continues to return 429s, this will cause infinite recursion until the call stack overflows.
Add:
- A maximum retry count
- Exponential backoff for retries
- Return an error after max retries exceeded
| } catch (error) { | ||
| // If batch creation fails, add all entities in this batch to failed proposals | ||
| const errorMessage = | ||
| error instanceof Error ? error.message : "Unknown error"; | ||
| failedEntityProposals.push({ | ||
| proposedEntity, | ||
| message: `Failed to persist entity: ${errorMessage}`, | ||
| }); | ||
| for (const { proposedEntity } of batch) { | ||
| failedEntityProposals.push({ | ||
| proposedEntity, | ||
| message: `Failed to create entity in batch: ${errorMessage}. ${stringifyError(error)}`, | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
When a batch creation fails, all entities in that batch are marked as failed, even though some may have been successfully created before the error occurred. The HashEntity.createMultiple method may perform partial creation before failing.
This could lead to:
- Entities being created in the database but marked as failed in the flow output
- Retry attempts creating duplicate entities
Consider checking which entities were actually created (if the API provides that info) or handling partial batch failures more granularly.
| const getFlowEntityInfo = async (params: { | ||
| workflowId: string; | ||
| userAuthentication: { actorId: UserId }; | ||
| }): Promise<FlowEntityInfo> => { | ||
| const { workflowId, userAuthentication } = params; | ||
|
|
||
| const cache = await getCache(); | ||
| const cacheKey = `flowEntity-${workflowId}`; | ||
|
|
||
| const cachedInfo = await cache.get<FlowEntityInfo>(cacheKey); | ||
| if (cachedInfo) { | ||
| return cachedInfo; | ||
| } | ||
|
|
||
| // Query for the flow entity using the workflowId property | ||
| const { | ||
| entities: [flowEntity], | ||
| } = await queryEntities<FlowRunEntity>( | ||
| { graphApi: graphApiClient }, | ||
| userAuthentication, | ||
| { | ||
| filter: { | ||
| all: [ | ||
| { | ||
| equal: [ | ||
| { | ||
| path: [ | ||
| "properties", | ||
| systemPropertyTypes.workflowId.propertyTypeBaseUrl, | ||
| ], | ||
| }, | ||
| { parameter: workflowId }, | ||
| ], | ||
| }, | ||
| generateVersionedUrlMatchingFilter( | ||
| systemEntityTypes.flowRun.entityTypeId, | ||
| { ignoreParents: true }, | ||
| ), | ||
| ], | ||
| }, | ||
| temporalAxes: currentTimeInstantTemporalAxes, | ||
| includeDrafts: false, | ||
| includePermissions: false, | ||
| }, | ||
| ); | ||
|
|
||
| if (!flowEntity) { | ||
| throw new Error( | ||
| `Flow entity not found for workflowId ${workflowId}. The flow entity may not have been persisted yet.`, | ||
| ); | ||
| } | ||
|
|
||
| const flowEntityInfo: FlowEntityInfo = { | ||
| flowEntityId: flowEntity.metadata.recordId.entityId, | ||
| }; | ||
|
|
||
| await cache.set(cacheKey, flowEntityInfo); | ||
| return flowEntityInfo; | ||
| }; |
There was a problem hiding this comment.
The new getFlowEntityInfo function queries the database on every cache miss to find the flow entity by workflowId. However, there's a race condition: if an activity starts executing before the FlowRun entity has been persisted to the database (which happens asynchronously), this will throw an error.
The error message acknowledges this ("The flow entity may not have been persisted yet"), but the function still throws rather than retrying or waiting. This could cause spurious failures for fast-executing flows.
Consider:
- Adding retry logic with backoff for the initial entity query
- Or ensuring the FlowRun entity is persisted before any activities start
- Or falling back to the old
entityIdFromComponentsapproach as a secondary method
| return ( | ||
| payload.value as PayloadKindValues[(typeof browserInferenceFlowOutput)["payloadKind"]] | ||
| payload.value as unknown as PayloadKindValues[(typeof browserInferenceFlowOutput)["payloadKind"]] | ||
| ).persistedEntities; |
There was a problem hiding this comment.
The cast to unknown before casting to the final type suggests that the type system changes (with StoredPayloadRef) are creating type incompatibilities in existing code. This workaround using as unknown as bypasses TypeScript's type checking and could hide real type errors.
The comment on line 39-41 indicates this is needed because payload.value might now be a StoredPayloadRef in the type system, but at runtime in the browser context it's expected to be the resolved value (since the GraphQL layer should resolve stored refs before returning to clients).
Verify that:
- The GraphQL layer properly resolves all
StoredPayloadRefvalues before returning them to clients - The
ResolvedStepRunOutputtype is used correctly in the GraphQL schema - Consider if this cast can be removed by properly typing the GraphQL response
| const getAllHistoricalArrivals = async ( | ||
| params: Omit<HistoricalArrivalsRequestParams, "cursor">, | ||
| ): Promise<AeroApiScheduledFlight[]> => { | ||
| const allFlights: AeroApiScheduledFlight[] = []; | ||
|
|
||
| let response = await getHistoricalArrivals(params); | ||
| allFlights.push(...response.arrivals); | ||
|
|
||
| while (response.links?.next) { | ||
| response = await makeRequest<AeroApiHistoricalArrivalsResponse>( | ||
| `${baseUrl}${response.links.next}`, | ||
| ); | ||
| allFlights.push(...response.arrivals); | ||
| } | ||
|
|
||
| return allFlights; | ||
| }; |
There was a problem hiding this comment.
The pagination logic in getAllHistoricalArrivals doesn't have any safety limit on the number of pages to fetch. If the API returns malformed pagination data (e.g., a circular links.next reference), this will loop infinitely and potentially accumulate unbounded data in the allFlights array.
Add a safety limit on the maximum number of pages to fetch (e.g., 1000 pages) to prevent infinite loops and memory exhaustion.
| return { | ||
| failureMessage: workflowFailureMessage, | ||
| inputs: workflowInputs, | ||
| outputs: workflowOutputs, |
There was a problem hiding this comment.
The workflowOutputs on line 693 are returned directly without resolving any potential StoredPayloadRef values. While workflow outputs may not currently contain stored payload kinds, this creates an inconsistency with step outputs (which are resolved on lines 649-688).
If a workflow ever returns a stored payload kind as its final output, it will not be resolved for the GraphQL client. Consider either:
- Also resolving stored refs in workflow outputs for consistency
- Adding a comment explaining why workflow outputs don't need resolution
- Adding a type constraint that prevents workflow outputs from being stored payload kinds
...src/activities/flow-activities/integration-activities/persist-integration-entities-action.ts
Show resolved
Hide resolved
...src/activities/flow-activities/integration-activities/persist-integration-entities-action.ts
Show resolved
Hide resolved
| const executeInBatches = async <T, R>( | ||
| items: T[], | ||
| batchSize: number, | ||
| operation: (item: T) => Promise<R>, | ||
| ): Promise<R[]> => { | ||
| const results: R[] = []; | ||
|
|
||
| for (let i = 0; i < items.length; i += batchSize) { | ||
| Context.current().heartbeat(); | ||
| const batch = items.slice(i, i + batchSize); | ||
| const batchResults = await Promise.all(batch.map(operation)); | ||
| results.push(...batchResults); | ||
| } | ||
|
|
||
| return results; | ||
| }; |
There was a problem hiding this comment.
The executeInBatches function heartbeats at the start of each batch but doesn't handle potential errors in individual batch operations. If one promise in the batch fails, it will reject the entire Promise.all, potentially losing progress on successfully completed operations within that batch.
Consider wrapping each operation in a try-catch within the map to allow partial batch success, or document that batch atomicity is the intended behavior.
| // Throttle requests to once per second | ||
| const now = Date.now(); | ||
| const timeSinceLastRequest = now - lastRequestTime; | ||
| if (timeSinceLastRequest < REQUEST_INTERVAL_MS) { | ||
| await sleep(REQUEST_INTERVAL_MS - timeSinceLastRequest); | ||
| } | ||
| lastRequestTime = Date.now(); | ||
|
|
||
| const response = await fetch(url, { | ||
| headers: { | ||
| Accept: "application/json", | ||
| "x-apikey": apiKey, | ||
| }, | ||
| }); | ||
|
|
||
| // Handle rate limiting with retry | ||
| if (response.status === 429) { | ||
| await sleep(RATE_LIMIT_RETRY_DELAY_MS); | ||
| return makeRequest<T>(url); | ||
| } |
There was a problem hiding this comment.
The rate limiting implementation using a module-level lastRequestTime variable is not thread-safe in Node.js's async environment. While JavaScript is single-threaded, the async nature means multiple concurrent calls to makeRequest can interleave.
For example:
- Request A checks
timeSinceLastRequestat T=0, sleeps for 1000ms - Request B checks
timeSinceLastRequestat T=10ms, also sleeps for ~990ms - Both requests wake up around T=1000ms and fire nearly simultaneously
This defeats the rate limiting purpose. Consider using a proper queue or mutex to serialize requests, or use a library like bottleneck for rate limiting.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 6 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
🌟 What is the purpose of this PR?
This PR:
Pre-Merge Checklist 🚀
🚢 Has this modified a publishable library?
This PR:
📜 Does this require a change to the docs?
The changes in this PR:
🕸️ Does this require a change to the Turbo Graph?
The changes in this PR:
🐾 Next steps
🛡 What tests cover this?