Skip to content

Commit ac13a5f

Browse files
committed
Retry transient network errors in HttpRetryPolicy (fix #260)
`HttpRetryPolicy.invokeWithRetry` only consulted `shouldRetry` after the fetch resolved, so when `fetch()` itself rejected with a network error (`socket hang up`, `ECONNRESET`, etc.) the rejection propagated on attempt #1 with no retry — the exact failure pattern reported in issue #260, including the cloud-storage variant haggholm and eranga-acerta reported. A related issue: `response.buffer()` (ThriftHttpConnection) and `response.arrayBuffer()` (CloudFetchResultHandler) sat outside the retried block, so node-fetch's "Premature close" body-stream errors were also not retried. Changes: * `HttpRetryPolicy.invokeWithRetry` catches operation rejections, classifies them via `isRetryableNetworkError` (FetchError `system` / `body-timeout`, OS errnos `ECONNRESET`/`ECONNREFUSED`/`ETIMEDOUT`/ `EHOSTUNREACH`/`ENETUNREACH`/`EPIPE`/`ENOTFOUND`/`EAI_AGAIN`, and message patterns `socket hang up`/`premature close`/`aborted`), and retries with the same attempt-budget/backoff used for HTTP retries. Aligns with what Python (urllib3 default OSError retry) and JDBC (Apache HttpClient retry handler) already do. * `ThriftHttpConnection.write` reads `response.buffer()` inside the retried block, so body-stream failures are retried too. * `CloudFetchResultHandler.fetch` reads `response.arrayBuffer()` inside the retried block, with a guarded fallback so existing test stubs that pre-bake `Response` objects keep working. Idempotency is preserved: `ExecuteStatement` and other write-y Thrift methods continue to use `NullRetryPolicy`. Network-error retry only applies where `HttpRetryPolicy` was already in use (idempotent Thrift methods + CloudFetch GETs). Verified end-to-end against the pecotesting Azure warehouse with TLS- layer RST injection on `*.blob.core.windows.net` mid-response: base crashes with `FetchError: socket hang up` (matches issue #260 exactly); with this fix the same fault injection passes transparently with correct row counts. Co-authored-by: Isaac
1 parent e200a1b commit ac13a5f

4 files changed

Lines changed: 297 additions & 41 deletions

File tree

lib/connection/connections/HttpRetryPolicy.ts

Lines changed: 136 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,28 @@ function delay(milliseconds: number): Promise<void> {
99
});
1010
}
1111

12+
// Transient network error codes worth retrying. Aligned with the OS-level errno set
13+
// surfaced by Node's `http`/`https` (and `node-fetch` via `system` FetchError type)
14+
// when an in-flight request fails before/while delivering a response. Matches the
15+
// classes of errors that the Python (urllib3) and JDBC (Apache HttpClient) drivers
16+
// retry by default at the connection layer.
17+
const RETRYABLE_NETWORK_ERROR_CODES = new Set([
18+
'ECONNRESET',
19+
'ECONNREFUSED',
20+
'ETIMEDOUT',
21+
'EHOSTUNREACH',
22+
'ENETUNREACH',
23+
'EPIPE',
24+
'ENOTFOUND',
25+
'EAI_AGAIN',
26+
]);
27+
28+
// Fallback message patterns for errors that don't carry an errno. node-fetch surfaces
29+
// "socket hang up" as a generic FetchError, and "Premature close" when the response
30+
// body stream closes before all data is received — both occur regularly when a
31+
// keep-alive TCP connection is silently dropped by an intermediate load balancer.
32+
const RETRYABLE_NETWORK_ERROR_MESSAGE_RE = /socket hang up|premature close|aborted/i;
33+
1234
export default class HttpRetryPolicy implements IRetryPolicy<HttpTransactionDetails> {
1335
private context: IClientContext;
1436

@@ -24,45 +46,131 @@ export default class HttpRetryPolicy implements IRetryPolicy<HttpTransactionDeta
2446

2547
public async shouldRetry(details: HttpTransactionDetails): Promise<ShouldRetryResult> {
2648
if (this.isRetryable(details)) {
27-
const clientConfig = this.context.getConfig();
49+
return this.computeRetry(details);
50+
}
2851

29-
// Don't retry if overall retry timeout exceeded
30-
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
31-
if (timeoutExceeded) {
32-
throw new RetryError(RetryErrorCode.TimeoutExceeded, details);
33-
}
52+
return { shouldRetry: false };
53+
}
3454

35-
this.attempt += 1;
55+
public async invokeWithRetry(operation: RetryableOperation<HttpTransactionDetails>): Promise<HttpTransactionDetails> {
56+
for (;;) {
57+
// Capture either the resolved response or the thrown error so the
58+
// retry-decision logic below can flow without an early `continue` and
59+
// share one backoff site between both paths.
60+
let outcome: { ok: true; details: HttpTransactionDetails } | { ok: false; error: unknown };
61+
try {
62+
// eslint-disable-next-line no-await-in-loop
63+
const details = await operation();
64+
outcome = { ok: true, details };
65+
} catch (error) {
66+
outcome = { ok: false, error };
67+
}
3668

37-
// Don't retry if max attempts count reached
38-
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
39-
if (attemptsExceeded) {
40-
throw new RetryError(RetryErrorCode.AttemptsExceeded, details);
69+
if (outcome.ok) {
70+
// eslint-disable-next-line no-await-in-loop
71+
const status = await this.shouldRetry(outcome.details);
72+
if (!status.shouldRetry) {
73+
return outcome.details;
74+
}
75+
// eslint-disable-next-line no-await-in-loop
76+
await delay(status.retryAfter);
77+
} else {
78+
// The operation threw before producing a response. This is typically a
79+
// transient network failure (stale keep-alive socket reset by a load
80+
// balancer, DNS hiccup, truncated response body, etc.). The status-code-
81+
// driven `shouldRetry` path can't see these because there's no `Response`
82+
// to inspect, so we have a separate decision point here. Non-network
83+
// errors (programmer errors, config errors, RetryError raised by our
84+
// own attempts/timeout budget) are re-thrown unchanged.
85+
if (!this.isRetryableNetworkError(outcome.error)) {
86+
throw outcome.error;
87+
}
88+
// eslint-disable-next-line no-await-in-loop
89+
const status = await this.computeNetworkErrorRetry(outcome.error);
90+
if (!status.shouldRetry) {
91+
throw outcome.error;
92+
}
93+
// eslint-disable-next-line no-await-in-loop
94+
await delay(status.retryAfter);
4195
}
96+
}
97+
}
98+
99+
// Shared budgeting logic — bumps the attempt counter, enforces overall retries
100+
// timeout/max attempts, and computes the next backoff. Used by both the HTTP
101+
// status-code path (`shouldRetry`) and the network-error path
102+
// (`computeNetworkErrorRetry`) so they share a single attempt budget.
103+
private computeRetry(details: HttpTransactionDetails): ShouldRetryResult {
104+
const clientConfig = this.context.getConfig();
105+
106+
// Don't retry if overall retry timeout exceeded
107+
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
108+
if (timeoutExceeded) {
109+
throw new RetryError(RetryErrorCode.TimeoutExceeded, details);
110+
}
42111

43-
// If possible, use `Retry-After` header as a floor for a backoff algorithm
44-
const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin);
45-
const retryAfter = this.getBackoffDelay(
46-
this.attempt,
47-
retryAfterHeader ?? clientConfig.retryDelayMin,
48-
clientConfig.retryDelayMax,
49-
);
112+
this.attempt += 1;
50113

51-
return { shouldRetry: true, retryAfter };
114+
// Don't retry if max attempts count reached
115+
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
116+
if (attemptsExceeded) {
117+
throw new RetryError(RetryErrorCode.AttemptsExceeded, details);
52118
}
53119

54-
return { shouldRetry: false };
120+
// If possible, use `Retry-After` header as a floor for a backoff algorithm
121+
const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin);
122+
const retryAfter = this.getBackoffDelay(
123+
this.attempt,
124+
retryAfterHeader ?? clientConfig.retryDelayMin,
125+
clientConfig.retryDelayMax,
126+
);
127+
128+
return { shouldRetry: true, retryAfter };
55129
}
56130

57-
public async invokeWithRetry(operation: RetryableOperation<HttpTransactionDetails>): Promise<HttpTransactionDetails> {
58-
for (;;) {
59-
const details = await operation(); // eslint-disable-line no-await-in-loop
60-
const status = await this.shouldRetry(details); // eslint-disable-line no-await-in-loop
61-
if (!status.shouldRetry) {
62-
return details;
63-
}
64-
await delay(status.retryAfter); // eslint-disable-line no-await-in-loop
131+
private async computeNetworkErrorRetry(error: unknown): Promise<ShouldRetryResult> {
132+
const clientConfig = this.context.getConfig();
133+
134+
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
135+
if (timeoutExceeded) {
136+
throw new RetryError(RetryErrorCode.TimeoutExceeded, error);
137+
}
138+
139+
this.attempt += 1;
140+
141+
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
142+
if (attemptsExceeded) {
143+
throw new RetryError(RetryErrorCode.AttemptsExceeded, error);
144+
}
145+
146+
const retryAfter = this.getBackoffDelay(this.attempt, clientConfig.retryDelayMin, clientConfig.retryDelayMax);
147+
148+
return { shouldRetry: true, retryAfter };
149+
}
150+
151+
protected isRetryableNetworkError(error: unknown): boolean {
152+
if (!error || typeof error !== 'object') {
153+
return false;
154+
}
155+
const candidate = error as { code?: string; type?: string; message?: string };
156+
157+
// node-fetch FetchError surfaces low-level network failures with `type: 'system'`
158+
// and a body-stream timeout with `type: 'body-timeout'`. Both should be retried;
159+
// `request-timeout` is converted to a Thrift TApplicationException upstream so
160+
// we don't need to retry it here.
161+
if (candidate.type === 'system' || candidate.type === 'body-timeout') {
162+
return true;
163+
}
164+
165+
if (typeof candidate.code === 'string' && RETRYABLE_NETWORK_ERROR_CODES.has(candidate.code)) {
166+
return true;
167+
}
168+
169+
if (typeof candidate.message === 'string' && RETRYABLE_NETWORK_ERROR_MESSAGE_RE.test(candidate.message)) {
170+
return true;
65171
}
172+
173+
return false;
66174
}
67175

68176
protected isRetryable({ response }: HttpTransactionDetails): boolean {

lib/connection/connections/ThriftHttpConnection.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,36 @@ export default class ThriftHttpConnection extends EventEmitter {
121121
body: data,
122122
};
123123

124+
// Consume the response body inside the retried block. node-fetch surfaces
125+
// late-stage failures (TCP RST after headers, "Premature close") as rejections
126+
// from `response.buffer()`, not from `fetch()`. Reading the body here means
127+
// the retry policy sees those failures and can retry them like any other
128+
// transient network error — the body Buffer is captured via closure so the
129+
// post-retry caller still gets it.
130+
let responseBuffer: Buffer | undefined;
131+
124132
this.getThriftMethodName(data)
125133
.then((thriftMethod) => this.getRetryPolicy(thriftMethod))
126134
.then((retryPolicy) => {
127-
const makeRequest = () => {
135+
const makeRequest = async () => {
136+
responseBuffer = undefined;
128137
const request = new Request(this.url, requestConfig);
129-
return fetch(request).then((response) => ({ request, response }));
138+
const response = await fetch(request);
139+
if (response.status === 200) {
140+
responseBuffer = await response.buffer();
141+
}
142+
return { request, response };
130143
};
131144
return retryPolicy.invokeWithRetry(makeRequest);
132145
})
133146
.then(({ response }) => {
134147
if (response.status !== 200) {
135148
throw new THTTPException(response);
136149
}
137-
138-
return response.buffer();
150+
// `responseBuffer` is always set when status is 200, since `makeRequest`
151+
// assigns it before resolving in that branch and the retry loop only
152+
// returns a fulfilled response (failures are thrown).
153+
return responseBuffer as Buffer;
139154
})
140155
.then((buffer) => {
141156
this.transport.receiver((transportWithData) => this.handleThriftResponse(transportWithData), seqId)(buffer);

lib/result/CloudFetchResultHandler.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch';
1+
import fetch, { RequestInfo, RequestInit, Request, Response } from 'node-fetch';
22
import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
33
import HiveDriverError from '../errors/HiveDriverError';
44
import IClientContext from '../contracts/IClientContext';
@@ -103,12 +103,13 @@ export default class CloudFetchResultHandler implements IResultsProvider<ArrowBa
103103
}
104104

105105
const startTime = Date.now();
106-
const response = await this.fetch(link.fileLink, { headers: link.httpHeaders });
106+
const { response, body } = await this.fetch(link.fileLink, { headers: link.httpHeaders });
107107
if (!response.ok) {
108108
throw new Error(`CloudFetch HTTP error ${response.status} ${response.statusText}`);
109109
}
110-
111-
const result = await response.arrayBuffer();
110+
// `body` is always set when `response.ok` is true — `fetch` reads it inside
111+
// the retried block on success.
112+
const result = body!;
112113
const downloadTimeMs = Date.now() - startTime;
113114

114115
this.logDownloadMetrics(link.fileLink, result.byteLength, downloadTimeMs);
@@ -123,17 +124,36 @@ export default class CloudFetchResultHandler implements IResultsProvider<ArrowBa
123124
};
124125
}
125126

126-
private async fetch(url: RequestInfo, init?: RequestInit) {
127+
private async fetch(url: RequestInfo, init?: RequestInit): Promise<{ response: Response; body?: ArrayBuffer }> {
127128
const connectionProvider = await this.context.getConnectionProvider();
128129
const agent = await connectionProvider.getAgent();
129130
const retryPolicy = await connectionProvider.getRetryPolicy();
130131

131132
const requestConfig: RequestInit = { agent, ...init };
132-
const result = await retryPolicy.invokeWithRetry(() => {
133+
// Read the body inside the retried block. CloudFetch downloads are large
134+
// GETs against pre-signed cloud-storage URLs that frequently surface
135+
// `socket hang up` / "Premature close" once the stream is mid-transfer.
136+
// Pulling `arrayBuffer()` in here lets the retry policy treat those
137+
// body-stream failures the same as connect-time failures.
138+
let downloaded: ArrayBuffer | undefined;
139+
const result = await retryPolicy.invokeWithRetry(async () => {
140+
downloaded = undefined;
133141
const request = new Request(url, requestConfig);
134-
return fetch(request).then((response) => ({ request, response }));
142+
const response = await fetch(request);
143+
if (response.ok) {
144+
downloaded = await response.arrayBuffer();
145+
}
146+
return { request, response };
135147
});
136-
return result.response;
148+
// Fall back to reading the body here if the retry policy returned a
149+
// response without consuming it via our operation (e.g. unit-test stubs
150+
// that hand back a pre-baked Response without invoking the operation
151+
// callback). In production the body is always read inside the retried
152+
// block above, so this path is a no-op.
153+
if (downloaded === undefined && result.response.ok) {
154+
downloaded = await result.response.arrayBuffer();
155+
}
156+
return { response: result.response, body: downloaded };
137157
}
138158

139159
/**

0 commit comments

Comments
 (0)