Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/components/endpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ export type BaseEndpointTypes = {

Transport files extend BaseEndpointTypes and additionally provide DP specific information like DP request/response type or websocket message types are defined in corresponding transport files.

## Multiple transports and fallback

For endpoints registered with `transportRoutes`, routing still picks a **primary** transport using `customRouter`, the request `transport` field, or `defaultTransport`.

Optional **fallback** transports are configured per primary route name with `fallbackTransport`, a map from primary transport name to fallback transport name (both must exist on `transportRoutes`). This is **off by default** until the adapter setting `TRANSPORT_FALLBACK_ENABLED` is set to `true` (see [EA settings](../reference-tables/ea-settings.md)).

When enabled, the framework may start work on the fallback route in parallel with the primary, but the **response still prefers primary data**: the client waits for the primary cache / foreground / polling path to finish or time out before using a successful fallback result. Separate cache keys are used per transport so cached values do not collide.

Fallback is not supported together with a custom `cacheKeyGenerator` on the same endpoint; use the default cache key behavior if you need fallback.

```typescript
new AdapterEndpoint({
name: 'price',
inputParameters,
transportRoutes: new TransportRoutes<BaseEndpointTypes>()
.register('rest', httpTransport)
.register('ws', wsTransport),
defaultTransport: 'rest',
// When primary is `rest`, allow falling back to `ws`
fallbackTransport: { rest: 'ws' },
})
```

## Cache Key Generator

**Only use if absolutely necessary**
Expand Down
1 change: 1 addition & 0 deletions docs/reference-tables/ea-settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
| TLS_PASSPHRASE | string | | Password to be used to generate an encryption key | | |
| TLS_PRIVATE_KEY | string | undefined | Base64 Private Key of TSL/SSL certificate | - Value must be a valid base64 string | |
| TLS_PUBLIC_KEY | string | undefined | Base64 Public Key of TSL/SSL certificate | - Value must be a valid base64 string | |
| TRANSPORT_FALLBACK_ENABLED | boolean | false | Flag to enable endpoint fallback transports when configured | | |
| WARMUP_SUBSCRIPTION_TTL | number | 300000 | TTL for batch warmer subscriptions | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 0 | 3600000 |
| WS_CONNECTION_OPEN_TIMEOUT | number | 10000 | The maximum amount of time in milliseconds to wait for the websocket connection to open (including custom open handler) | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 500 | 30000 |
| WS_HEARTBEAT_INTERVAL_MS | number | 10000 | The number of ms between each hearbeat message that EA sends to server, only works if heartbeat handler is provided | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 5000 | 300000 |
Expand Down
39 changes: 39 additions & 0 deletions src/adapter/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
highestRateLimitTiers,
} from '../rate-limiting'
import { RateLimiterFactory, RateLimitingStrategy } from '../rate-limiting/factory'
import { Transport } from '../transports'
import {
AdapterRequest,
AdapterResponse,
Expand Down Expand Up @@ -449,6 +450,44 @@ export class Adapter<
const endpoint = this.endpointsMap[req.requestContext.endpointName]
const transport = endpoint.transportRoutes.get(req.requestContext.transportName)

const { fallback } = req.requestContext
if (!fallback) {
return this.handleSingleTransportRequest(req, replySent, transport)
}

const fallbackReq = {
...req,
requestContext: {
...req.requestContext,
transportName: fallback.transportName,
cacheKey: fallback.cacheKey,
},
} as AdapterRequest<EmptyInputParameters>

const fallbackResponsePromise = this.handleSingleTransportRequest(
Comment thread
mxiao-cll marked this conversation as resolved.
fallbackReq,
replySent,
endpoint.transportRoutes.get(fallback.transportName),
)
.then((response) => ({ response }))
.catch((error) => ({ error }))

try {
return await this.handleSingleTransportRequest(req, replySent, transport)
} catch (primaryError) {
const fallbackResponse = await fallbackResponsePromise
if ('response' in fallbackResponse) {
return fallbackResponse.response
}
throw primaryError
}
Comment thread
mxiao-cll marked this conversation as resolved.
}

private async handleSingleTransportRequest(
Comment thread
mxiao-cll marked this conversation as resolved.
req: AdapterRequest<EmptyInputParameters>,
replySent: Promise<unknown>,
transport: Transport<EndpointGenerics>,
): Promise<Readonly<AdapterResponse>> {
// First try to find the response in our cache, keep it ready
const cachedResponse = await this.findResponseInCache(req)

Expand Down
63 changes: 63 additions & 0 deletions src/adapter/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
settings: T['Settings'],
) => string
defaultTransport?: string
fallbackTransport?: Record<string, string>

constructor(params: AdapterEndpointParams<T>) {
this.name = params.name
Expand All @@ -55,6 +56,14 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
this.transportRoutes = params.transportRoutes
this.customRouter = params.customRouter
this.defaultTransport = params.defaultTransport
this.fallbackTransport = params.fallbackTransport
? Object.fromEntries(
Object.entries(params.fallbackTransport).map(([k, v]) => [
k.toLowerCase(),
Comment thread
mxiao-cll marked this conversation as resolved.
v.toLowerCase(),
]),
)
: undefined
} else {
this.transportRoutes = new TransportRoutes<T>().register(
DEFAULT_TRANSPORT_NAME,
Expand All @@ -69,6 +78,8 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
this.customOutputValidation = params.customOutputValidation
this.overrides = params.overrides
this.requestTransforms = [this.symbolOverrider.bind(this), ...(params.requestTransforms || [])]

this.validateFallbackTransport()
}

/**
Expand Down Expand Up @@ -186,6 +197,22 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
return transportName
}

getFallbackTransportNameForRequest(primaryTransportName: string, settings: T['Settings']) {
if (!settings.TRANSPORT_FALLBACK_ENABLED || !this.fallbackTransport) {
Comment thread
dskloetc marked this conversation as resolved.
logger.trace('TRANSPORT_FALLBACK_ENABLED is false or fallbackTransport is not set')
return
}

const fallbackTransportName = this.fallbackTransport[primaryTransportName.toLowerCase()]

if (fallbackTransportName) {
logger.debug(`Request can fall back to transport "${fallbackTransportName}"`)
return fallbackTransportName
} else {
logger.trace(`No fallback transport defined for "${primaryTransportName}"`)
}
}

/**
* Default routing strategy. Will try and use the transport override if present
* or transport input parameter in the request body.
Expand All @@ -205,4 +232,40 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
}
return rawRequestBody.data?.transport
}

private validateFallbackTransport() {
if (this.cacheKeyGenerator && this.fallbackTransport) {
throw new AdapterError({
message: 'fallbackTransport not allowed for endpoints with cacheKeyGenerator',
statusCode: 404,
})
}

Object.entries(this.fallbackTransport || {}).forEach(([primary, fallback]) => {
if (primary === fallback) {
throw new AdapterError({
statusCode: 400,
message: `Fallback transport "${fallback}" cannot be the same as primary transport.`,
})
}

if (!this.transportRoutes.get(primary)) {
throw new AdapterError({
statusCode: 400,
message: `No primary transport found for key "${primary}", must be one of ${JSON.stringify(
this.transportRoutes.routeNames(),
)}`,
})
}

if (!this.transportRoutes.get(fallback)) {
throw new AdapterError({
statusCode: 400,
message: `No fallback transport found for key "${fallback}", must be one of ${JSON.stringify(
this.transportRoutes.routeNames(),
)}`,
})
}
})
}
}
3 changes: 3 additions & 0 deletions src/adapter/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ type MultiTransportAdapterEndpointParams<T extends EndpointGenerics> = {

/** If no value is returned from the custom router or the default (transport param), which transport to use */
defaultTransport?: string

/** Primary transport mapped to backup transport to use when primary is unable to return data */
fallbackTransport?: Record<string, string>
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/cache/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class ResponseCache<
) {
response.meta = {
adapterName: calculateAdapterName(this.adapterName, r.params),
transportName,
Comment thread
mxiao-cll marked this conversation as resolved.
metrics: {
feedId: calculateFeedId(
{
Expand Down
5 changes: 5 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ export const BaseSettingsDefinition = {
default: 200,
validate: validator.integer({ min: 10, max: 1000 }),
},
TRANSPORT_FALLBACK_ENABLED: {
description: 'Flag to enable endpoint fallback transports when configured',
type: 'boolean',
default: false,
},
DEFAULT_CACHE_KEY: {
description: 'Default key to be used when one cannot be determined from request parameters',
type: 'string',
Expand Down
8 changes: 8 additions & 0 deletions src/util/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ export type AdapterRequestContext<T> = {
/** Precalculated cache key used to get and set corresponding values from the cache and subscription sets */
cacheKey: string

/** Fallback transport context to use if the primary transport is unable to return data */
fallback?: {
Comment thread
mxiao-cll marked this conversation as resolved.
transportName: string
cacheKey: string
}

/** Normalized and validated data coming from the request body */
data: T

Expand Down Expand Up @@ -86,6 +92,8 @@ export interface AdapterRequestMeta {
export interface AdapterResponseMeta extends AdapterRequestMeta {
/** Name of the adapter */
adapterName: string
/** Name of the transport */
transportName: string
}

/**
Expand Down
23 changes: 20 additions & 3 deletions src/validation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,31 @@ export const validatorMiddleware: AdapterMiddlewareBuilder =

req.requestContext.cacheKey = `${cachePrefix}${cacheKey}`
} else {
const transportName = req.requestContext.transportName
req.requestContext.cacheKey = calculateCacheKey({
const commonCacheKeyParams = {
data: req.requestContext.data,
adapterName: adapter.name,
endpointName: endpoint.name,
transportName,
adapterSettings: adapter.config.settings,
}

req.requestContext.cacheKey = calculateCacheKey({
...commonCacheKeyParams,
transportName: req.requestContext.transportName,
})

const fallbackTransportName = endpoint.getFallbackTransportNameForRequest(
req.requestContext.transportName,
adapter.config.settings,
)
if (fallbackTransportName) {
req.requestContext.fallback = {
transportName: fallbackTransportName,
cacheKey: calculateCacheKey({
...commonCacheKeyParams,
transportName: fallbackTransportName,
}),
}
}
}

done()
Expand Down
Loading
Loading