Skip to content

Commit bb25340

Browse files
ericallamclaude
andauthored
perf(runs-replication): Improve the CPU efficiency and throughput of the runs replication to clickhouse (#2866)
## Summary Optimizes the runs replication service for better CPU efficiency and throughput when inserting task runs into ClickHouse. ### Key Changes - **Switch to compact array format** - Uses `JSONCompactEachRowWithNames` instead of `JSONEachRow` for ClickHouse inserts, reducing JSON serialization overhead - **Type-safe tuple arrays** - Introduces `TaskRunInsertArray` and `PayloadInsertArray` tuple types with compile-time column order validation - **Pre-sorted batch inserts** - Sorts inserts by primary key before flushing for better ClickHouse insert performance - **Programmatic index generation** - `TASK_RUN_INDEX` and `PAYLOAD_INDEX` are generated from column arrays to prevent manual synchronization errors ### Files Changed - `runsReplicationService.server.ts` - Core optimization to use compact array inserts - `@internal/clickhouse` - Added `insertCompactRaw` method and tuple types - `taskRuns.ts` - Column definitions, index constants, and insert functions --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent c8686b5 commit bb25340

File tree

15 files changed

+1630
-493
lines changed

15 files changed

+1630
-493
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 148 additions & 156 deletions
Large diffs are not rendered by default.
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import { ConcurrentFlushScheduler } from "~/services/runsReplicationService.server";
2+
3+
vi.setConfig({ testTimeout: 10_000 });
4+
5+
type TestItem = {
6+
id: string;
7+
event: "insert" | "update";
8+
version: number;
9+
};
10+
11+
describe("ConcurrentFlushScheduler", () => {
12+
it("should deduplicate items by key, keeping the latest version", async () => {
13+
const flushedBatches: TestItem[][] = [];
14+
15+
const scheduler = new ConcurrentFlushScheduler<TestItem>({
16+
batchSize: 100,
17+
flushInterval: 50,
18+
maxConcurrency: 1,
19+
callback: async (_flushId, batch) => {
20+
flushedBatches.push([...batch]);
21+
},
22+
getKey: (item) => `${item.event}_${item.id}`,
23+
shouldReplace: (existing, incoming) => incoming.version >= existing.version,
24+
});
25+
26+
scheduler.start();
27+
28+
// Add items with duplicate keys but different versions
29+
scheduler.addToBatch([
30+
{ id: "run_1", event: "insert", version: 1 },
31+
{ id: "run_1", event: "update", version: 2 },
32+
{ id: "run_2", event: "insert", version: 1 },
33+
]);
34+
35+
// Add more items - should merge with existing
36+
scheduler.addToBatch([
37+
{ id: "run_1", event: "insert", version: 3 }, // Higher version, should replace
38+
{ id: "run_1", event: "update", version: 1 }, // Lower version, should NOT replace
39+
{ id: "run_2", event: "update", version: 4 },
40+
]);
41+
42+
// Wait for flush
43+
await new Promise((resolve) => setTimeout(resolve, 100));
44+
45+
scheduler.shutdown();
46+
47+
// Should have flushed once with deduplicated items
48+
expect(flushedBatches.length).toBeGreaterThanOrEqual(1);
49+
50+
const allFlushed = flushedBatches.flat();
51+
52+
// Find items by their key
53+
const insertRun1 = allFlushed.find((i) => i.id === "run_1" && i.event === "insert");
54+
const updateRun1 = allFlushed.find((i) => i.id === "run_1" && i.event === "update");
55+
const insertRun2 = allFlushed.find((i) => i.id === "run_2" && i.event === "insert");
56+
const updateRun2 = allFlushed.find((i) => i.id === "run_2" && i.event === "update");
57+
58+
// Verify correct versions were kept
59+
expect(insertRun1?.version).toBe(3); // Latest version for insert_run_1
60+
expect(updateRun1?.version).toBe(2); // Original update_run_1 (v1 didn't replace v2)
61+
expect(insertRun2?.version).toBe(1); // Only version for insert_run_2
62+
expect(updateRun2?.version).toBe(4); // Only version for update_run_2
63+
});
64+
65+
it("should skip items where getKey returns null", async () => {
66+
const flushedBatches: TestItem[][] = [];
67+
68+
const scheduler = new ConcurrentFlushScheduler<TestItem>({
69+
batchSize: 100,
70+
flushInterval: 50,
71+
maxConcurrency: 1,
72+
callback: async (_flushId, batch) => {
73+
flushedBatches.push([...batch]);
74+
},
75+
getKey: (item) => {
76+
if (!item.id) {
77+
return null;
78+
}
79+
return `${item.event}_${item.id}`;
80+
},
81+
shouldReplace: (existing, incoming) => incoming.version >= existing.version,
82+
});
83+
84+
scheduler.start();
85+
86+
scheduler.addToBatch([
87+
{ id: "run_1", event: "insert", version: 1 },
88+
{ id: "", event: "insert", version: 2 }, // Should be skipped (null key)
89+
{ id: "run_2", event: "insert", version: 1 },
90+
]);
91+
92+
await new Promise((resolve) => setTimeout(resolve, 100));
93+
94+
scheduler.shutdown();
95+
96+
const allFlushed = flushedBatches.flat();
97+
expect(allFlushed).toHaveLength(2);
98+
expect(allFlushed.map((i) => i.id).sort()).toEqual(["run_1", "run_2"]);
99+
});
100+
101+
it("should flush when batch size threshold is reached", async () => {
102+
const flushedBatches: TestItem[][] = [];
103+
104+
const scheduler = new ConcurrentFlushScheduler<TestItem>({
105+
batchSize: 3,
106+
flushInterval: 10000, // Long interval so timer doesn't trigger
107+
maxConcurrency: 1,
108+
callback: async (_flushId, batch) => {
109+
flushedBatches.push([...batch]);
110+
},
111+
getKey: (item) => `${item.event}_${item.id}`,
112+
shouldReplace: (existing, incoming) => incoming.version >= existing.version,
113+
});
114+
115+
scheduler.start();
116+
117+
// Add 3 unique items - should trigger flush
118+
scheduler.addToBatch([
119+
{ id: "run_1", event: "insert", version: 1 },
120+
{ id: "run_2", event: "insert", version: 1 },
121+
{ id: "run_3", event: "insert", version: 1 },
122+
]);
123+
124+
await new Promise((resolve) => setTimeout(resolve, 50));
125+
126+
expect(flushedBatches.length).toBe(1);
127+
expect(flushedBatches[0]).toHaveLength(3);
128+
129+
scheduler.shutdown();
130+
});
131+
132+
it("should respect shouldReplace returning false", async () => {
133+
const flushedBatches: TestItem[][] = [];
134+
135+
const scheduler = new ConcurrentFlushScheduler<TestItem>({
136+
batchSize: 100,
137+
flushInterval: 50,
138+
maxConcurrency: 1,
139+
callback: async (_flushId, batch) => {
140+
flushedBatches.push([...batch]);
141+
},
142+
getKey: (item) => `${item.event}_${item.id}`,
143+
// Never replace - first item wins
144+
shouldReplace: () => false,
145+
});
146+
147+
scheduler.start();
148+
149+
scheduler.addToBatch([{ id: "run_1", event: "insert", version: 10 }]);
150+
151+
scheduler.addToBatch([{ id: "run_1", event: "insert", version: 999 }]);
152+
153+
await new Promise((resolve) => setTimeout(resolve, 100));
154+
155+
scheduler.shutdown();
156+
157+
const allFlushed = flushedBatches.flat();
158+
const insertRun1 = allFlushed.find((i) => i.id === "run_1" && i.event === "insert");
159+
expect(insertRun1?.version).toBe(10); // First one wins
160+
});
161+
});

apps/webapp/test/runsReplicationService.part1.test.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { ClickHouse } from "@internal/clickhouse";
22
import { containerTest } from "@internal/testcontainers";
3-
import { Logger } from "@trigger.dev/core/logger";
43
import { setTimeout } from "node:timers/promises";
54
import { z } from "zod";
65
import { TaskRunStatus } from "~/database-types";
@@ -22,6 +21,7 @@ describe("RunsReplicationService (part 1/2)", () => {
2221
compression: {
2322
request: true,
2423
},
24+
logLevel: "warn",
2525
});
2626

2727
const { tracer, exporter } = createInMemoryTracing();
@@ -40,6 +40,7 @@ describe("RunsReplicationService (part 1/2)", () => {
4040
leaderLockExtendIntervalMs: 1000,
4141
ackIntervalSeconds: 5,
4242
tracer,
43+
logLevel: "warn",
4344
});
4445

4546
await runsReplicationService.start();
@@ -135,6 +136,7 @@ describe("RunsReplicationService (part 1/2)", () => {
135136
compression: {
136137
request: true,
137138
},
139+
logLevel: "warn",
138140
});
139141

140142
const { tracer, exporter } = createInMemoryTracing();
@@ -153,6 +155,7 @@ describe("RunsReplicationService (part 1/2)", () => {
153155
leaderLockExtendIntervalMs: 1000,
154156
ackIntervalSeconds: 5,
155157
tracer,
158+
logLevel: "warn",
156159
});
157160

158161
await runsReplicationService.start();
@@ -271,6 +274,7 @@ describe("RunsReplicationService (part 1/2)", () => {
271274
const clickhouse = new ClickHouse({
272275
url: clickhouseContainer.getConnectionUrl(),
273276
name: "runs-replication",
277+
logLevel: "warn",
274278
});
275279

276280
const { tracer, exporter } = createInMemoryTracing();
@@ -289,6 +293,7 @@ describe("RunsReplicationService (part 1/2)", () => {
289293
leaderLockExtendIntervalMs: 1000,
290294
ackIntervalSeconds: 5,
291295
tracer,
296+
logLevel: "warn",
292297
});
293298

294299
await runsReplicationService.start();
@@ -341,6 +346,7 @@ describe("RunsReplicationService (part 1/2)", () => {
341346
const clickhouse = new ClickHouse({
342347
url: clickhouseContainer.getConnectionUrl(),
343348
name: "runs-replication-batching",
349+
logLevel: "warn",
344350
});
345351

346352
const runsReplicationService = new RunsReplicationService({
@@ -356,6 +362,7 @@ describe("RunsReplicationService (part 1/2)", () => {
356362
leaderLockTimeoutMs: 5000,
357363
leaderLockExtendIntervalMs: 1000,
358364
ackIntervalSeconds: 5,
365+
logLevel: "warn",
359366
});
360367

361368
await runsReplicationService.start();
@@ -443,6 +450,7 @@ describe("RunsReplicationService (part 1/2)", () => {
443450
const clickhouse = new ClickHouse({
444451
url: clickhouseContainer.getConnectionUrl(),
445452
name: "runs-replication-payload",
453+
logLevel: "warn",
446454
});
447455

448456
const runsReplicationService = new RunsReplicationService({
@@ -458,6 +466,7 @@ describe("RunsReplicationService (part 1/2)", () => {
458466
leaderLockTimeoutMs: 5000,
459467
leaderLockExtendIntervalMs: 1000,
460468
ackIntervalSeconds: 5,
469+
logLevel: "warn",
461470
});
462471

463472
await runsReplicationService.start();
@@ -542,6 +551,7 @@ describe("RunsReplicationService (part 1/2)", () => {
542551
const clickhouse = new ClickHouse({
543552
url: clickhouseContainer.getConnectionUrl(),
544553
name: "runs-replication-payload",
554+
logLevel: "warn",
545555
});
546556

547557
const runsReplicationService = new RunsReplicationService({
@@ -557,6 +567,7 @@ describe("RunsReplicationService (part 1/2)", () => {
557567
leaderLockTimeoutMs: 5000,
558568
leaderLockExtendIntervalMs: 1000,
559569
ackIntervalSeconds: 5,
570+
logLevel: "warn",
560571
});
561572

562573
await runsReplicationService.start();
@@ -646,6 +657,7 @@ describe("RunsReplicationService (part 1/2)", () => {
646657
const clickhouse = new ClickHouse({
647658
url: clickhouseContainer.getConnectionUrl(),
648659
name: "runs-replication-update",
660+
logLevel: "warn",
649661
});
650662

651663
const runsReplicationService = new RunsReplicationService({
@@ -661,6 +673,7 @@ describe("RunsReplicationService (part 1/2)", () => {
661673
leaderLockTimeoutMs: 5000,
662674
leaderLockExtendIntervalMs: 1000,
663675
ackIntervalSeconds: 5,
676+
logLevel: "warn",
664677
});
665678

666679
await runsReplicationService.start();
@@ -751,6 +764,7 @@ describe("RunsReplicationService (part 1/2)", () => {
751764
const clickhouse = new ClickHouse({
752765
url: clickhouseContainer.getConnectionUrl(),
753766
name: "runs-replication-delete",
767+
logLevel: "warn",
754768
});
755769

756770
const runsReplicationService = new RunsReplicationService({
@@ -766,6 +780,7 @@ describe("RunsReplicationService (part 1/2)", () => {
766780
leaderLockTimeoutMs: 5000,
767781
leaderLockExtendIntervalMs: 1000,
768782
ackIntervalSeconds: 5,
783+
logLevel: "warn",
769784
});
770785

771786
await runsReplicationService.start();
@@ -849,6 +864,7 @@ describe("RunsReplicationService (part 1/2)", () => {
849864
const clickhouse = new ClickHouse({
850865
url: clickhouseContainer.getConnectionUrl(),
851866
name: "runs-replication-shutdown-handover",
867+
logLevel: "warn",
852868
});
853869

854870
// Service A
@@ -865,6 +881,7 @@ describe("RunsReplicationService (part 1/2)", () => {
865881
leaderLockTimeoutMs: 5000,
866882
leaderLockExtendIntervalMs: 1000,
867883
ackIntervalSeconds: 5,
884+
logLevel: "warn",
868885
});
869886

870887
await runsReplicationServiceA.start();
@@ -968,6 +985,7 @@ describe("RunsReplicationService (part 1/2)", () => {
968985
leaderLockTimeoutMs: 5000,
969986
leaderLockExtendIntervalMs: 1000,
970987
ackIntervalSeconds: 5,
988+
logLevel: "warn",
971989
});
972990

973991
await runsReplicationServiceB.start();
@@ -997,6 +1015,7 @@ describe("RunsReplicationService (part 1/2)", () => {
9971015
const clickhouse = new ClickHouse({
9981016
url: clickhouseContainer.getConnectionUrl(),
9991017
name: "runs-replication-shutdown-after-processed",
1018+
logLevel: "warn",
10001019
});
10011020

10021021
// Service A
@@ -1013,6 +1032,7 @@ describe("RunsReplicationService (part 1/2)", () => {
10131032
leaderLockTimeoutMs: 5000,
10141033
leaderLockExtendIntervalMs: 1000,
10151034
ackIntervalSeconds: 5,
1035+
logLevel: "warn",
10161036
});
10171037

10181038
await runsReplicationServiceA.start();
@@ -1114,6 +1134,7 @@ describe("RunsReplicationService (part 1/2)", () => {
11141134
leaderLockTimeoutMs: 5000,
11151135
leaderLockExtendIntervalMs: 1000,
11161136
ackIntervalSeconds: 5,
1137+
logLevel: "warn",
11171138
});
11181139

11191140
await runsReplicationServiceB.start();
@@ -1137,6 +1158,7 @@ describe("RunsReplicationService (part 1/2)", () => {
11371158
const clickhouse = new ClickHouse({
11381159
url: clickhouseContainer.getConnectionUrl(),
11391160
name: "runs-replication-metrics",
1161+
logLevel: "warn",
11401162
});
11411163

11421164
const { tracer } = createInMemoryTracing();
@@ -1157,6 +1179,7 @@ describe("RunsReplicationService (part 1/2)", () => {
11571179
ackIntervalSeconds: 5,
11581180
tracer,
11591181
meter: metricsHelper.meter,
1182+
logLevel: "warn",
11601183
});
11611184

11621185
await runsReplicationService.start();

0 commit comments

Comments
 (0)