Skip to content

Commit 7a25036

Browse files
committed
Added preventMultipleWaits to prod
1 parent 60da979 commit 7a25036

File tree

1 file changed

+52
-43
lines changed

1 file changed

+52
-43
lines changed

packages/core/src/v3/runtime/prodRuntimeManager.ts

Lines changed: 52 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from "../schemas/index.js";
77
import { ExecutorToWorkerProcessConnection } from "../zodIpc.js";
88
import { RuntimeManager } from "./manager.js";
9+
import { preventMultipleWaits } from "./preventMultipleWaits.js";
910

1011
export type ProdRuntimeManagerOptions = {
1112
waitThresholdInMs?: number;
@@ -21,6 +22,8 @@ export class ProdRuntimeManager implements RuntimeManager {
2122

2223
_waitForDuration: { resolve: (value: void) => void; reject: (err?: any) => void } | undefined;
2324

25+
_preventMultipleWaits = preventMultipleWaits();
26+
2427
constructor(
2528
private ipc: ExecutorToWorkerProcessConnection,
2629
private options: ProdRuntimeManagerOptions = {}
@@ -31,19 +34,21 @@ export class ProdRuntimeManager implements RuntimeManager {
3134
}
3235

3336
async waitForDuration(ms: number): Promise<void> {
34-
const now = Date.now();
37+
return this._preventMultipleWaits(async () => {
38+
const now = Date.now();
3539

36-
const resume = new Promise<void>((resolve, reject) => {
37-
this._waitForDuration = { resolve, reject };
38-
});
40+
const resume = new Promise<void>((resolve, reject) => {
41+
this._waitForDuration = { resolve, reject };
42+
});
3943

40-
await this.ipc.send("WAIT_FOR_DURATION", {
41-
ms,
42-
now,
43-
waitThresholdInMs: this.waitThresholdInMs,
44-
});
44+
await this.ipc.send("WAIT_FOR_DURATION", {
45+
ms,
46+
now,
47+
waitThresholdInMs: this.waitThresholdInMs,
48+
});
4549

46-
await resume;
50+
await resume;
51+
});
4752
}
4853

4954
resumeAfterDuration(): void {
@@ -63,51 +68,55 @@ export class ProdRuntimeManager implements RuntimeManager {
6368
}
6469

6570
async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise<TaskRunExecutionResult> {
66-
const promise = new Promise<TaskRunExecutionResult>((resolve) => {
67-
this._taskWaits.set(params.id, { resolve });
68-
});
71+
return this._preventMultipleWaits(async () => {
72+
const promise = new Promise<TaskRunExecutionResult>((resolve) => {
73+
this._taskWaits.set(params.id, { resolve });
74+
});
6975

70-
await this.ipc.send("WAIT_FOR_TASK", {
71-
friendlyId: params.id,
72-
});
76+
await this.ipc.send("WAIT_FOR_TASK", {
77+
friendlyId: params.id,
78+
});
7379

74-
const result = await promise;
80+
const result = await promise;
7581

76-
clock.reset();
82+
clock.reset();
7783

78-
return result;
84+
return result;
85+
});
7986
}
8087

8188
async waitForBatch(params: {
8289
id: string;
8390
runs: string[];
8491
ctx: TaskRunContext;
8592
}): Promise<BatchTaskRunExecutionResult> {
86-
if (!params.runs.length) {
87-
return Promise.resolve({ id: params.id, items: [] });
88-
}
89-
90-
const promise = Promise.all(
91-
params.runs.map((runId) => {
92-
return new Promise<TaskRunExecutionResult>((resolve, reject) => {
93-
this._taskWaits.set(runId, { resolve });
94-
});
95-
})
96-
);
97-
98-
await this.ipc.send("WAIT_FOR_BATCH", {
99-
batchFriendlyId: params.id,
100-
runFriendlyIds: params.runs,
93+
return this._preventMultipleWaits(async () => {
94+
if (!params.runs.length) {
95+
return Promise.resolve({ id: params.id, items: [] });
96+
}
97+
98+
const promise = Promise.all(
99+
params.runs.map((runId) => {
100+
return new Promise<TaskRunExecutionResult>((resolve, reject) => {
101+
this._taskWaits.set(runId, { resolve });
102+
});
103+
})
104+
);
105+
106+
await this.ipc.send("WAIT_FOR_BATCH", {
107+
batchFriendlyId: params.id,
108+
runFriendlyIds: params.runs,
109+
});
110+
111+
const results = await promise;
112+
113+
clock.reset();
114+
115+
return {
116+
id: params.id,
117+
items: results,
118+
};
101119
});
102-
103-
const results = await promise;
104-
105-
clock.reset();
106-
107-
return {
108-
id: params.id,
109-
items: results,
110-
};
111120
}
112121

113122
resumeTask(completion: TaskRunExecutionResult): void {

0 commit comments

Comments
 (0)