Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 136 additions & 28 deletions lib/connection/connections/HttpRetryPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,28 @@ function delay(milliseconds: number): Promise<void> {
});
}

// Transient network error codes worth retrying. Aligned with the OS-level errno set
// surfaced by Node's `http`/`https` (and `node-fetch` via `system` FetchError type)
// when an in-flight request fails before/while delivering a response. Matches the
// classes of errors that the Python (urllib3) and JDBC (Apache HttpClient) drivers
// retry by default at the connection layer.
const RETRYABLE_NETWORK_ERROR_CODES = new Set([
'ECONNRESET',
'ECONNREFUSED',
'ETIMEDOUT',
'EHOSTUNREACH',
'ENETUNREACH',
'EPIPE',
'ENOTFOUND',
'EAI_AGAIN',
]);

// Fallback message patterns for errors that don't carry an errno. node-fetch surfaces
// "socket hang up" as a generic FetchError, and "Premature close" when the response
// body stream closes before all data is received — both occur regularly when a
// keep-alive TCP connection is silently dropped by an intermediate load balancer.
const RETRYABLE_NETWORK_ERROR_MESSAGE_RE = /socket hang up|premature close|aborted/i;

export default class HttpRetryPolicy implements IRetryPolicy<HttpTransactionDetails> {
private context: IClientContext;

Expand All @@ -24,45 +46,131 @@ export default class HttpRetryPolicy implements IRetryPolicy<HttpTransactionDeta

public async shouldRetry(details: HttpTransactionDetails): Promise<ShouldRetryResult> {
if (this.isRetryable(details)) {
const clientConfig = this.context.getConfig();
return this.computeRetry(details);
}

// Don't retry if overall retry timeout exceeded
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, details);
}
return { shouldRetry: false };
}

this.attempt += 1;
public async invokeWithRetry(operation: RetryableOperation<HttpTransactionDetails>): Promise<HttpTransactionDetails> {
for (;;) {
// Capture either the resolved response or the thrown error so the
// retry-decision logic below can flow without an early `continue` and
// share one backoff site between both paths.
let outcome: { ok: true; details: HttpTransactionDetails } | { ok: false; error: unknown };
try {
// eslint-disable-next-line no-await-in-loop
const details = await operation();
outcome = { ok: true, details };
} catch (error) {
outcome = { ok: false, error };
}

// Don't retry if max attempts count reached
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, details);
if (outcome.ok) {
// eslint-disable-next-line no-await-in-loop
const status = await this.shouldRetry(outcome.details);
if (!status.shouldRetry) {
return outcome.details;
}
// eslint-disable-next-line no-await-in-loop
await delay(status.retryAfter);
} else {
// The operation threw before producing a response. This is typically a
// transient network failure (stale keep-alive socket reset by a load
// balancer, DNS hiccup, truncated response body, etc.). The status-code-
// driven `shouldRetry` path can't see these because there's no `Response`
// to inspect, so we have a separate decision point here. Non-network
// errors (programmer errors, config errors, RetryError raised by our
// own attempts/timeout budget) are re-thrown unchanged.
if (!this.isRetryableNetworkError(outcome.error)) {
throw outcome.error;
}
// eslint-disable-next-line no-await-in-loop
const status = await this.computeNetworkErrorRetry(outcome.error);
if (!status.shouldRetry) {
throw outcome.error;
}
// eslint-disable-next-line no-await-in-loop
await delay(status.retryAfter);
}
}
}

// Shared budgeting logic — bumps the attempt counter, enforces overall retries
// timeout/max attempts, and computes the next backoff. Used by both the HTTP
// status-code path (`shouldRetry`) and the network-error path
// (`computeNetworkErrorRetry`) so they share a single attempt budget.
private computeRetry(details: HttpTransactionDetails): ShouldRetryResult {
const clientConfig = this.context.getConfig();

// Don't retry if overall retry timeout exceeded
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, details);
}

// If possible, use `Retry-After` header as a floor for a backoff algorithm
const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin);
const retryAfter = this.getBackoffDelay(
this.attempt,
retryAfterHeader ?? clientConfig.retryDelayMin,
clientConfig.retryDelayMax,
);
this.attempt += 1;

return { shouldRetry: true, retryAfter };
// Don't retry if max attempts count reached
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, details);
}

return { shouldRetry: false };
// If possible, use `Retry-After` header as a floor for a backoff algorithm
const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin);
const retryAfter = this.getBackoffDelay(
this.attempt,
retryAfterHeader ?? clientConfig.retryDelayMin,
clientConfig.retryDelayMax,
);

return { shouldRetry: true, retryAfter };
}

public async invokeWithRetry(operation: RetryableOperation<HttpTransactionDetails>): Promise<HttpTransactionDetails> {
for (;;) {
const details = await operation(); // eslint-disable-line no-await-in-loop
const status = await this.shouldRetry(details); // eslint-disable-line no-await-in-loop
if (!status.shouldRetry) {
return details;
}
await delay(status.retryAfter); // eslint-disable-line no-await-in-loop
private async computeNetworkErrorRetry(error: unknown): Promise<ShouldRetryResult> {
const clientConfig = this.context.getConfig();

const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, error);
}

this.attempt += 1;

const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, error);
}

const retryAfter = this.getBackoffDelay(this.attempt, clientConfig.retryDelayMin, clientConfig.retryDelayMax);

return { shouldRetry: true, retryAfter };
}

protected isRetryableNetworkError(error: unknown): boolean {
if (!error || typeof error !== 'object') {
return false;
}
const candidate = error as { code?: string; type?: string; message?: string };

// node-fetch FetchError surfaces low-level network failures with `type: 'system'`
// and a body-stream timeout with `type: 'body-timeout'`. Both should be retried;
// `request-timeout` is converted to a Thrift TApplicationException upstream so
// we don't need to retry it here.
if (candidate.type === 'system' || candidate.type === 'body-timeout') {
return true;
}

if (typeof candidate.code === 'string' && RETRYABLE_NETWORK_ERROR_CODES.has(candidate.code)) {
return true;
}

if (typeof candidate.message === 'string' && RETRYABLE_NETWORK_ERROR_MESSAGE_RE.test(candidate.message)) {
return true;
}

return false;
}

protected isRetryable({ response }: HttpTransactionDetails): boolean {
Expand Down
23 changes: 19 additions & 4 deletions lib/connection/connections/ThriftHttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,36 @@ export default class ThriftHttpConnection extends EventEmitter {
body: data,
};

// Consume the response body inside the retried block. node-fetch surfaces
// late-stage failures (TCP RST after headers, "Premature close") as rejections
// from `response.buffer()`, not from `fetch()`. Reading the body here means
// the retry policy sees those failures and can retry them like any other
// transient network error — the body Buffer is captured via closure so the
// post-retry caller still gets it.
let responseBuffer: Buffer | undefined;

this.getThriftMethodName(data)
.then((thriftMethod) => this.getRetryPolicy(thriftMethod))
.then((retryPolicy) => {
const makeRequest = () => {
const makeRequest = async () => {
responseBuffer = undefined;
const request = new Request(this.url, requestConfig);
return fetch(request).then((response) => ({ request, response }));
const response = await fetch(request);
if (response.status === 200) {
responseBuffer = await response.buffer();
}
return { request, response };
};
return retryPolicy.invokeWithRetry(makeRequest);
})
.then(({ response }) => {
if (response.status !== 200) {
throw new THTTPException(response);
}

return response.buffer();
// `responseBuffer` is always set when status is 200, since `makeRequest`
// assigns it before resolving in that branch and the retry loop only
// returns a fulfilled response (failures are thrown).
return responseBuffer as Buffer;
})
.then((buffer) => {
this.transport.receiver((transportWithData) => this.handleThriftResponse(transportWithData), seqId)(buffer);
Expand Down
36 changes: 28 additions & 8 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch';
import fetch, { RequestInfo, RequestInit, Request, Response } from 'node-fetch';
import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import HiveDriverError from '../errors/HiveDriverError';
import IClientContext from '../contracts/IClientContext';
Expand Down Expand Up @@ -103,12 +103,13 @@ export default class CloudFetchResultHandler implements IResultsProvider<ArrowBa
}

const startTime = Date.now();
const response = await this.fetch(link.fileLink, { headers: link.httpHeaders });
const { response, body } = await this.fetch(link.fileLink, { headers: link.httpHeaders });
if (!response.ok) {
throw new Error(`CloudFetch HTTP error ${response.status} ${response.statusText}`);
}

const result = await response.arrayBuffer();
// `body` is always set when `response.ok` is true — `fetch` reads it inside
// the retried block on success.
const result = body!;
const downloadTimeMs = Date.now() - startTime;

this.logDownloadMetrics(link.fileLink, result.byteLength, downloadTimeMs);
Expand All @@ -123,17 +124,36 @@ export default class CloudFetchResultHandler implements IResultsProvider<ArrowBa
};
}

private async fetch(url: RequestInfo, init?: RequestInit) {
private async fetch(url: RequestInfo, init?: RequestInit): Promise<{ response: Response; body?: ArrayBuffer }> {
const connectionProvider = await this.context.getConnectionProvider();
const agent = await connectionProvider.getAgent();
const retryPolicy = await connectionProvider.getRetryPolicy();

const requestConfig: RequestInit = { agent, ...init };
const result = await retryPolicy.invokeWithRetry(() => {
// Read the body inside the retried block. CloudFetch downloads are large
// GETs against pre-signed cloud-storage URLs that frequently surface
// `socket hang up` / "Premature close" once the stream is mid-transfer.
// Pulling `arrayBuffer()` in here lets the retry policy treat those
// body-stream failures the same as connect-time failures.
let downloaded: ArrayBuffer | undefined;
const result = await retryPolicy.invokeWithRetry(async () => {
downloaded = undefined;
const request = new Request(url, requestConfig);
return fetch(request).then((response) => ({ request, response }));
const response = await fetch(request);
if (response.ok) {
downloaded = await response.arrayBuffer();
}
return { request, response };
});
return result.response;
// Fall back to reading the body here if the retry policy returned a
// response without consuming it via our operation (e.g. unit-test stubs
// that hand back a pre-baked Response without invoking the operation
// callback). In production the body is always read inside the retried
// block above, so this path is a no-op.
if (downloaded === undefined && result.response.ok) {
downloaded = await result.response.arrayBuffer();
}
return { response: result.response, body: downloaded };
}

/**
Expand Down
Loading
Loading