Skip to content

H-3848, H-6154: Handle larger Flow payloads, historical flight data Flows#8356

Open
CiaranMn wants to merge 5 commits intomainfrom
cm/historical-flights-and-large-flow-improvements
Open

H-3848, H-6154: Handle larger Flow payloads, historical flight data Flows#8356
CiaranMn wants to merge 5 commits intomainfrom
cm/historical-flights-and-large-flow-improvements

Conversation

@CiaranMn
Copy link
Member

@CiaranMn CiaranMn commented Feb 4, 2026

🌟 What is the purpose of this PR?

This PR:

  1. Adds the ability to retrieve historical flight arrival information over a time span for a given airport
  2. Given that this can result in a lot of data (~30k entities for 1 month of London Gatwick arrivals), the PR also improves handling of large Flows by:
  • Offloading payloads to S3 for both persisted and proposed entities – activities store data in S3 and output object paths, and consuming activities download the object.
  • Rate limiting handling for the flight APIs we're using.
  • Better timeout / heartbeat configuration for affected Flow activities.
  1. Drive-bys H-3848, where enum data types were not being generated correctly.

Pre-Merge Checklist 🚀

🚢 Has this modified a publishable library?

This PR:

  • does not modify any publishable blocks or libraries, or modifications do not need publishing

📜 Does this require a change to the docs?

The changes in this PR:

  • are internal and do not require a docs change

🕸️ Does this require a change to the Turbo Graph?

The changes in this PR:

  • do not affect the execution graph

🐾 Next steps

  • Custom dashboards including LLM generation assistance, with flight data as a test case.

🛡 What tests cover this?

  • None.

@CiaranMn CiaranMn requested a review from Copilot February 4, 2026 18:18
@CiaranMn CiaranMn self-assigned this Feb 4, 2026
@vercel
Copy link

vercel bot commented Feb 4, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

4 Skipped Deployments
Project Deployment Actions Updated (UTC)
hash Ignored Ignored Preview Feb 5, 2026 8:47am
hashdotdesign Ignored Ignored Preview Feb 5, 2026 8:47am
hashdotdesign-tokens Ignored Ignored Preview Feb 5, 2026 8:47am
petrinaut Skipped Skipped Feb 5, 2026 8:47am

@cursor
Copy link

cursor bot commented Feb 4, 2026

PR Summary

High Risk
Changes core Flow execution/data plumbing by replacing in-Temporal payload passing with stored references resolved via S3/local storage, plus new integration activities that batch-create/patch entities and call external rate-limited APIs. Risk is mainly around payload resolution correctness, storage permissions/config, and Temporal timeouts/heartbeating affecting long-running workflows.

Overview
Enables large Flow steps to offload heavy outputs/inputs to external storage (S3/local) by introducing StoredPayloadRef handling: activities now storePayload/resolvePayloadValue instead of passing full ProposedEntity/PersistedEntitiesMetadata payloads through Temporal, and the GraphQL/API layer resolves stored refs when returning Flow run details.

Updates storage providers to support flow-output keys plus direct upload/download, refactors flow-context lookup to query FlowRun entities by workflowId (with caching/backoff), and adjusts Temporal workflow timeouts/heartbeats (e.g. persistEntitiesAction, persistIntegrationEntitiesAction) and guards against parallelizing/aggregating stored refs.

Adds new aviation integration capabilities: a historical-arrivals flow (getHistoricalFlightArrivals) and live-position updates (getLiveFlightPositions), extends scheduled-flights flow to persist position updates, adds API rate limiting/retry utilities for AeroAPI/FlightRadar24, and improves integration entity persistence with batching (createMultiple), non-destructive patch generation, and better error reporting.

Written by Cursor Bugbot for commit 02a7f30. This will update automatically on new commits. Configure here.

@github-actions github-actions bot added area/apps > hash* Affects HASH (a `hash-*` app) area/apps > hash-api Affects the HASH API (app) area/libs Relates to first-party libraries/crates/packages (area) type/eng > backend Owned by the @backend team area/tests New or updated tests area/tests > integration New or updated integration tests area/apps labels Feb 4, 2026
@codecov
Copy link

codecov bot commented Feb 4, 2026

Codecov Report

❌ Patch coverage is 0% with 69 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.10%. Comparing base (1492bce) to head (02a7f30).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...orker-ts/src/activities/shared/get-flow-context.ts 0.00% 13 Missing ⚠️
apps/hash-api/src/storage/local-file-storage.ts 0.00% 12 Missing ⚠️
...ities/flow-activities/write-google-sheet-action.ts 0.00% 8 Missing ⚠️
...ivities/flow-activities/persist-entities-action.ts 0.00% 7 Missing ⚠️
...ctivities/flow-activities/persist-entity-action.ts 0.00% 7 Missing ⚠️
...bs/@local/hash-isomorphic-utils/src/flows/types.ts 0.00% 6 Missing ⚠️
...ies/research-entities-action/coordinating-agent.ts 0.00% 4 Missing ⚠️
...w-activities/shared/create-file-entity-from-url.ts 0.00% 3 Missing ⚠️
apps/hash-api/src/storage/index.ts 0.00% 2 Missing ⚠️
...tivities/flow-activities/answer-question-action.ts 0.00% 1 Missing ⚠️
... and 6 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #8356      +/-   ##
==========================================
- Coverage   60.10%   60.10%   -0.01%     
==========================================
  Files        1235     1234       -1     
  Lines      118201   118217      +16     
  Branches     5180     5184       +4     
==========================================
  Hits        71050    71050              
- Misses      46324    46340      +16     
  Partials      827      827              
Flag Coverage Δ
apps.hash-ai-worker-ts 1.41% <0.00%> (-0.01%) ⬇️
apps.hash-api 0.00% <0.00%> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@augmentcode
Copy link

augmentcode bot commented Feb 4, 2026

🤖 Augment PR Summary

Summary: This PR adds support for larger Flow payloads by offloading certain activity inputs/outputs to storage, and introduces new aviation integration flows for historical flight arrivals and live flight position updates.

Changes:

  • Added integration actions/flow definitions for fetching historical flight arrivals (AeroAPI) and live flight positions (FlightRadar24).
  • Introduced StoredPayloadRef and a new payload-storage utility to store/retrieve large payloads (e.g. ProposedEntities, PersistedEntitiesMetadata) outside Temporal.
  • Updated multiple Flow activities to store large outputs and resolve stored refs when consuming inputs, reducing Temporal payload size.
  • Enhanced persistence activities with batching and more frequent Temporal heartbeats/timeouts for long-running work.
  • Updated Flow run detail retrieval to resolve stored payload refs in step outputs when reading Temporal history.
  • Extended the file storage provider interface to support direct upload/download and flow-output storage key generation (S3 + local FS implementations updated).
  • Fixed TypeScript codegen for enum-constrained data types by removing redundant inheritance in schema preprocessing.

Technical Notes: Stored payload kinds are now represented as references in Flow I/O types and must be resolved by activities (and by backend APIs when returning Flow run details to clients).

🤖 Was this summary useful? React with 👍 or 👎

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 7 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

arrivesAtLink.properties,
)
) {
const primaryKeyPropertyMetadata = entity.propertyMetadata([
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

primaryKeyPropertyMetadata is fetched only for flightNumber but then reused (via casts) for flightDate too; if this metadata includes a property-specific dataTypeId, this can make the proposed flight-date metadata invalid and cause downstream validation/patch failures.

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

step.logs.sort((a, b) => a.recordedAt.localeCompare(b.recordedAt));
}

// Resolve any stored payload references in step outputs
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This resolves stored payload refs inside per-step outputs, but workflowOutputs parsed from the workflow completion event are still returned unchanged; if those outputs include StoredPayloadRefs, GraphQL clients expecting resolved values may still receive refs here.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

*/
query += ` ORDER BY StartTime DESC`;

const workflowIterable = temporalClient.workflow.list({ query });
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the explicit ORDER BY StartTime DESC removed, the reset de-dupe logic later in this function can become order-dependent; if Temporal returns runs out of order, callers may see multiple runs for the same workflowId.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

* Get a singleton instance of the S3 storage provider.
* This is shared across all activities in a worker.
*/
export const getStorageProvider = (): FileStorageProvider => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getStorageProvider() always instantiates AwsS3StorageProvider (and requires AWS config); if a deployment uses LOCAL_FILE_SYSTEM/other storage backends, flow payload offloading will still try S3 and may fail unexpectedly.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

});

// Handle rate limiting with retry
if (response.status === 429) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 429 handler retries via recursion with no max attempts/backoff escalation, so a sustained rate-limit condition could loop indefinitely (and potentially blow the call stack) rather than failing fast with a clearer error.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

/**
* If set when proxying an activity, the period of time without a heartbeat after which the activity is considered to have failed.
*/
export const heartbeatTimeoutSeconds = 10;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A heartbeatTimeoutSeconds of 10s seems very tight given some operations (e.g., batch Graph API calls) can plausibly take >10s between Context.current().heartbeat() calls, which could cause spurious activity timeouts.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

body: string | Buffer;
contentType?: string;
}): Promise<void> {
const filePath = path.join(this.fileUploadPath, path.normalize(key));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path traversal guard uses filePath.startsWith(this.fileUploadPath), which can be bypassed by prefix collisions (e.g. base /tmp/uploads vs resolved /tmp/uploads2/...); using a resolved path comparison with a path separator boundary would be safer.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for retrieving historical flight arrival information over time spans and improves handling of large Flow payloads by offloading them to S3 storage. It also fixes enum data type generation issues (H-3848).

Changes:

  • Introduced a StoredPayloadRef system that stores large payloads (ProposedEntity, ProposedEntityWithResolvedLinks, PersistedEntitiesMetadata) in S3 instead of passing them through Temporal activities
  • Added historical flight data retrieval capabilities with automatic 24-hour chunking to handle API limitations
  • Implemented rate limiting for AeroAPI (5 requests/second) and retry logic for 429 errors
  • Added live flight position tracking via FlightRadar24 integration
  • Fixed enum data type code generation by removing redundant allOf inheritance
  • Extended activity timeouts and added heartbeat support for long-running batch operations

Reviewed changes

Copilot reviewed 56 out of 56 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
libs/@local/hash-isomorphic-utils/src/flows/types.ts Core type system changes introducing StoredPayloadRef, StoredPayloadKind, and separated Payload/ResolvedPayload types
libs/@local/hash-backend-utils/src/flows/payload-storage.ts New payload storage infrastructure for S3 offloading with store/retrieve/resolve functions
libs/@local/hash-backend-utils/src/integrations/aviation/aero-api/client.ts Added historical arrivals API integration with date chunking and rate limiting
apps/hash-integration-worker/src/activities/flow-activities/integration-activities/persist-integration-entities-action.ts Refactored to batch operations and resolve stored payloads, with heartbeat support
apps/hash-integration-worker/src/activities/flow-activities/aviation-activities/get-historical-flight-arrivals-action.ts New action for fetching historical flight data over date ranges
apps/hash-integration-worker/src/activities/flow-activities/aviation-activities/get-live-flight-positions-action.ts New action for fetching live flight positions from FlightRadar24
libs/@blockprotocol/graph/src/codegen/preprocess/remove-redundant-data-type-inheritance.ts New preprocessing step to fix enum data type generation
libs/@local/hash-backend-utils/src/flows/get-flow-run-details.ts Added StoredPayloadRef resolution for step outputs before returning to GraphQL clients
apps/hash-integration-worker/src/workflows/run-flow-workflow.ts Extended activity timeouts to 10 hours with 10-second heartbeat timeouts for batch operations
libs/@local/hash-backend-utils/src/file-storage.ts Unified FileStorageProvider interface with direct upload/download methods
apps/hash-ai-worker-ts/src/activities/shared/get-flow-context.ts Changed flow entity lookup from UUID-based to property query-based

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 94 to 98
// Handle rate limiting with retry
if (response.status === 429) {
await sleep(RATE_LIMIT_RETRY_DELAY_MS);
return makeRequest<T>(url);
}
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 429 retry logic performs unbounded recursive retries without any exponential backoff or maximum retry limit. If the API continues to return 429s, this will cause infinite recursion until the call stack overflows.

Add:

  1. A maximum retry count
  2. Exponential backoff for retries
  3. Return an error after max retries exceeded

Copilot uses AI. Check for mistakes.
Comment on lines 325 to +335
} catch (error) {
// If batch creation fails, add all entities in this batch to failed proposals
const errorMessage =
error instanceof Error ? error.message : "Unknown error";
failedEntityProposals.push({
proposedEntity,
message: `Failed to persist entity: ${errorMessage}`,
});
for (const { proposedEntity } of batch) {
failedEntityProposals.push({
proposedEntity,
message: `Failed to create entity in batch: ${errorMessage}. ${stringifyError(error)}`,
});
}
}
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a batch creation fails, all entities in that batch are marked as failed, even though some may have been successfully created before the error occurred. The HashEntity.createMultiple method may perform partial creation before failing.

This could lead to:

  1. Entities being created in the database but marked as failed in the flow output
  2. Retry attempts creating duplicate entities

Consider checking which entities were actually created (if the API provides that info) or handling partial batch failures more granularly.

Copilot uses AI. Check for mistakes.
Comment on lines 61 to 119
const getFlowEntityInfo = async (params: {
workflowId: string;
userAuthentication: { actorId: UserId };
}): Promise<FlowEntityInfo> => {
const { workflowId, userAuthentication } = params;

const cache = await getCache();
const cacheKey = `flowEntity-${workflowId}`;

const cachedInfo = await cache.get<FlowEntityInfo>(cacheKey);
if (cachedInfo) {
return cachedInfo;
}

// Query for the flow entity using the workflowId property
const {
entities: [flowEntity],
} = await queryEntities<FlowRunEntity>(
{ graphApi: graphApiClient },
userAuthentication,
{
filter: {
all: [
{
equal: [
{
path: [
"properties",
systemPropertyTypes.workflowId.propertyTypeBaseUrl,
],
},
{ parameter: workflowId },
],
},
generateVersionedUrlMatchingFilter(
systemEntityTypes.flowRun.entityTypeId,
{ ignoreParents: true },
),
],
},
temporalAxes: currentTimeInstantTemporalAxes,
includeDrafts: false,
includePermissions: false,
},
);

if (!flowEntity) {
throw new Error(
`Flow entity not found for workflowId ${workflowId}. The flow entity may not have been persisted yet.`,
);
}

const flowEntityInfo: FlowEntityInfo = {
flowEntityId: flowEntity.metadata.recordId.entityId,
};

await cache.set(cacheKey, flowEntityInfo);
return flowEntityInfo;
};
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new getFlowEntityInfo function queries the database on every cache miss to find the flow entity by workflowId. However, there's a race condition: if an activity starts executing before the FlowRun entity has been persisted to the database (which happens asynchronously), this will throw an error.

The error message acknowledges this ("The flow entity may not have been persisted yet"), but the function still throws rather than retrying or waiting. This could cause spurious failures for fast-executing flows.

Consider:

  1. Adding retry logic with backoff for the initial entity query
  2. Or ensuring the FlowRun entity is persisted before any activities start
  3. Or falling back to the old entityIdFromComponents approach as a secondary method

Copilot uses AI. Check for mistakes.
Comment on lines 39 to 41
return (
payload.value as PayloadKindValues[(typeof browserInferenceFlowOutput)["payloadKind"]]
payload.value as unknown as PayloadKindValues[(typeof browserInferenceFlowOutput)["payloadKind"]]
).persistedEntities;
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cast to unknown before casting to the final type suggests that the type system changes (with StoredPayloadRef) are creating type incompatibilities in existing code. This workaround using as unknown as bypasses TypeScript's type checking and could hide real type errors.

The comment on line 39-41 indicates this is needed because payload.value might now be a StoredPayloadRef in the type system, but at runtime in the browser context it's expected to be the resolved value (since the GraphQL layer should resolve stored refs before returning to clients).

Verify that:

  1. The GraphQL layer properly resolves all StoredPayloadRef values before returning them to clients
  2. The ResolvedStepRunOutput type is used correctly in the GraphQL schema
  3. Consider if this cast can be removed by properly typing the GraphQL response

Copilot uses AI. Check for mistakes.
Comment on lines +202 to +218
const getAllHistoricalArrivals = async (
params: Omit<HistoricalArrivalsRequestParams, "cursor">,
): Promise<AeroApiScheduledFlight[]> => {
const allFlights: AeroApiScheduledFlight[] = [];

let response = await getHistoricalArrivals(params);
allFlights.push(...response.arrivals);

while (response.links?.next) {
response = await makeRequest<AeroApiHistoricalArrivalsResponse>(
`${baseUrl}${response.links.next}`,
);
allFlights.push(...response.arrivals);
}

return allFlights;
};
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pagination logic in getAllHistoricalArrivals doesn't have any safety limit on the number of pages to fetch. If the API returns malformed pagination data (e.g., a circular links.next reference), this will loop infinitely and potentially accumulate unbounded data in the allFlights array.

Add a safety limit on the maximum number of pages to fetch (e.g., 1000 pages) to prevent infinite loops and memory exhaustion.

Copilot uses AI. Check for mistakes.
return {
failureMessage: workflowFailureMessage,
inputs: workflowInputs,
outputs: workflowOutputs,
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflowOutputs on line 693 are returned directly without resolving any potential StoredPayloadRef values. While workflow outputs may not currently contain stored payload kinds, this creates an inconsistency with step outputs (which are resolved on lines 649-688).

If a workflow ever returns a stored payload kind as its final output, it will not be resolved for the GraphQL client. Consider either:

  1. Also resolving stored refs in workflow outputs for consistency
  2. Adding a comment explaining why workflow outputs don't need resolution
  3. Adding a type constraint that prevents workflow outputs from being stored payload kinds

Copilot uses AI. Check for mistakes.
Comment on lines +92 to +107
const executeInBatches = async <T, R>(
items: T[],
batchSize: number,
operation: (item: T) => Promise<R>,
): Promise<R[]> => {
const results: R[] = [];

for (let i = 0; i < items.length; i += batchSize) {
Context.current().heartbeat();
const batch = items.slice(i, i + batchSize);
const batchResults = await Promise.all(batch.map(operation));
results.push(...batchResults);
}

return results;
};
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executeInBatches function heartbeats at the start of each batch but doesn't handle potential errors in individual batch operations. If one promise in the batch fails, it will reject the entire Promise.all, potentially losing progress on successfully completed operations within that batch.

Consider wrapping each operation in a try-catch within the map to allow partial batch success, or document that batch atomicity is the intended behavior.

Copilot uses AI. Check for mistakes.
Comment on lines 79 to 98
// Throttle requests to once per second
const now = Date.now();
const timeSinceLastRequest = now - lastRequestTime;
if (timeSinceLastRequest < REQUEST_INTERVAL_MS) {
await sleep(REQUEST_INTERVAL_MS - timeSinceLastRequest);
}
lastRequestTime = Date.now();

const response = await fetch(url, {
headers: {
Accept: "application/json",
"x-apikey": apiKey,
},
});

// Handle rate limiting with retry
if (response.status === 429) {
await sleep(RATE_LIMIT_RETRY_DELAY_MS);
return makeRequest<T>(url);
}
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rate limiting implementation using a module-level lastRequestTime variable is not thread-safe in Node.js's async environment. While JavaScript is single-threaded, the async nature means multiple concurrent calls to makeRequest can interleave.

For example:

  1. Request A checks timeSinceLastRequest at T=0, sleeps for 1000ms
  2. Request B checks timeSinceLastRequest at T=10ms, also sleeps for ~990ms
  3. Both requests wake up around T=1000ms and fire nearly simultaneously

This defeats the rate limiting purpose. Consider using a proper queue or mutex to serialize requests, or use a library like bottleneck for rate limiting.

Copilot uses AI. Check for mistakes.
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 6 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

@vercel vercel bot temporarily deployed to Preview – petrinaut February 4, 2026 19:14 Inactive
@vercel vercel bot temporarily deployed to Preview – petrinaut February 5, 2026 08:46 Inactive
@github-actions github-actions bot added the area/deps Relates to third-party dependencies (area) label Feb 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/apps > hash* Affects HASH (a `hash-*` app) area/apps > hash-api Affects the HASH API (app) area/apps area/deps Relates to third-party dependencies (area) area/libs Relates to first-party libraries/crates/packages (area) area/tests > integration New or updated integration tests area/tests New or updated tests type/eng > backend Owned by the @backend team

Development

Successfully merging this pull request may close these issues.

1 participant