Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 41 additions & 72 deletions async/unstable_circuit_breaker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ interface CircuitBreakerStateBase {
readonly halfOpenInFlight: number;
}

/** Internal state managed by the circuit breaker (discriminated union). */
/** Internal state managed by the circuit breaker */
type CircuitBreakerState =
| (CircuitBreakerStateBase & {
readonly state: "closed";
Expand Down Expand Up @@ -236,6 +236,18 @@ function pruneOldFailures(
return timestamps.filter((ts) => ts > cutoff);
}

/** Validates a numeric option for the circuit breaker constructor. */
function validateOption(name: string, value: number, min: number): void {
if (!Number.isFinite(value) || value < min) {
const constraint = min === 0
? "a finite non-negative number"
: `a finite number >= ${min}`;
throw new RangeError(
`Cannot create circuit breaker as '${name}' must be ${constraint}: received ${value}`,
);
}
}

/**
* A circuit breaker that wraps async operations to prevent cascading failures.
*
Expand Down Expand Up @@ -363,31 +375,11 @@ export class CircuitBreaker<T = unknown> {
onClose,
} = options;

if (!Number.isFinite(failureThreshold) || failureThreshold < 1) {
throw new RangeError(
`Cannot create circuit breaker as 'failureThreshold' must be a finite number >= 1: received ${failureThreshold}`,
);
}
if (!Number.isFinite(cooldownMs) || cooldownMs < 0) {
throw new RangeError(
`Cannot create circuit breaker as 'cooldownMs' must be a finite non-negative number: received ${cooldownMs}`,
);
}
if (!Number.isFinite(successThreshold) || successThreshold < 1) {
throw new RangeError(
`Cannot create circuit breaker as 'successThreshold' must be a finite number >= 1: received ${successThreshold}`,
);
}
if (!Number.isFinite(halfOpenMaxConcurrent) || halfOpenMaxConcurrent < 1) {
throw new RangeError(
`Cannot create circuit breaker as 'halfOpenMaxConcurrent' must be a finite number >= 1: received ${halfOpenMaxConcurrent}`,
);
}
if (!Number.isFinite(failureWindowMs) || failureWindowMs < 0) {
throw new RangeError(
`Cannot create circuit breaker as 'failureWindowMs' must be a finite non-negative number: received ${failureWindowMs}`,
);
}
validateOption("failureThreshold", failureThreshold, 1);
validateOption("cooldownMs", cooldownMs, 0);
validateOption("successThreshold", successThreshold, 1);
validateOption("halfOpenMaxConcurrent", halfOpenMaxConcurrent, 1);
validateOption("failureWindowMs", failureWindowMs, 0);

this.#failureThreshold = failureThreshold;
this.#cooldownMs = cooldownMs;
Expand Down Expand Up @@ -454,16 +446,6 @@ export class CircuitBreaker<T = unknown> {
* );
* ```
*
* @example Usage with sync function
* ```ts
* import { CircuitBreaker } from "@std/async/unstable-circuit-breaker";
* import { assertEquals } from "@std/assert";
*
* const breaker = new CircuitBreaker({ failureThreshold: 5 });
* const result = await breaker.execute(() => "sync result");
* assertEquals(result, "sync result");
* ```
*
* @typeParam R The return type of the function, must extend T.
* @param fn The function to execute (sync or async).
* @param options Optional execution options including an abort signal.
Expand Down Expand Up @@ -543,18 +525,19 @@ export class CircuitBreaker<T = unknown> {
forceOpen(): void {
const previous = this.#state.state;
const failureTimestamps = this.#state.failureTimestamps;
const now = Date.now();
this.#state = {
...this.#state,
state: "open",
openedAt: Date.now(),
openedAt: now,
consecutiveSuccesses: 0,
};
if (previous !== "open") {
this.#onStateChange?.(previous, "open");
const failureCount = pruneOldFailures(
failureTimestamps,
this.#failureWindowMs,
Date.now(),
now,
).length;
this.#onOpen?.(failureCount);
}
Expand Down Expand Up @@ -650,65 +633,51 @@ export class CircuitBreaker<T = unknown> {
previousState: CircuitState,
now: number,
): void {
// Check if this error should count as a failure
if (!this.#isFailure(error)) {
return;
}
if (!this.#isFailure(error)) return;

// Prune old failures and add new one
const prunedFailures = pruneOldFailures(
this.#state.failureTimestamps,
this.#failureWindowMs,
const newFailures = [
...pruneOldFailures(
this.#state.failureTimestamps,
this.#failureWindowMs,
now,
),
now,
);
const newFailures = [...prunedFailures, now];
];

this.#onFailure?.(error, newFailures.length);
const shouldOpen = previousState === "half_open" ||
newFailures.length >= this.#failureThreshold;

// In half-open, any failure reopens the circuit
if (previousState === "half_open") {
// State mutations first (before any callbacks)
if (shouldOpen) {
this.#state = {
...this.#state,
state: "open",
failureTimestamps: newFailures,
openedAt: now,
consecutiveSuccesses: 0,
};
this.#onStateChange?.("half_open", "open");
this.#onOpen?.(newFailures.length);
return;
}

// In closed state, check threshold
if (newFailures.length >= this.#failureThreshold) {
this.#state = {
...this.#state,
state: "open",
failureTimestamps: newFailures,
openedAt: now,
consecutiveSuccesses: 0,
};
this.#onStateChange?.("closed", "open");
this.#onOpen?.(newFailures.length);
} else {
this.#state = {
...this.#state,
failureTimestamps: newFailures,
consecutiveSuccesses: 0,
};
}

// Callbacks last (state is consistent even if these throw)
this.#onFailure?.(error, newFailures.length);
if (shouldOpen) {
this.#onStateChange?.(previousState, "open");
this.#onOpen?.(newFailures.length);
}
}

/** Records a success and potentially closes the circuit from half-open. */
#handleSuccess(previousState: CircuitState): void {
if (previousState === "closed") {
this.#state = { ...this.#state, consecutiveSuccesses: 0 };
return;
}
if (previousState === "closed") return;

const newSuccessCount = this.#state.consecutiveSuccesses + 1;
if (newSuccessCount >= this.#successThreshold) {
// Recovered! Close the circuit
this.#state = createInitialState();
this.#onStateChange?.("half_open", "closed");
this.#onClose?.();
Expand Down
Loading