Skip to content

Commit fa045fd

Browse files
committed
feat: add client support for skipped step events
- Add FlowStepStatus.Skipped to enum - Add SkipReason type (condition_unmet|handler_failed|dependency_skipped) - Add BroadcastStepSkippedEvent type - Add skipped event handling to FlowStep class - Add skipped_at and skip_reason getters - Update waitForStatus to accept Skipped - Treat Skipped as terminal state - Update eventAdapters for skipped events - Add unit tests, type tests, and E2E tests - All 206 client tests pass
1 parent 2e12137 commit fa045fd

11 files changed

Lines changed: 894 additions & 109 deletions

File tree

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
import { describe, it, expect } from 'vitest';
2+
import { withPgNoTransaction } from '../helpers/db.js';
3+
import { createTestSupabaseClient } from '../helpers/setup.js';
4+
import { createTestFlow } from '../helpers/fixtures.js';
5+
import { grantMinimalPgflowPermissions } from '../helpers/permissions.js';
6+
import { PgflowClient } from '../../src/lib/PgflowClient.js';
7+
import { FlowStepStatus } from '../../src/lib/types.js';
8+
import { PgflowSqlClient } from '@pgflow/core';
9+
import { readAndStart } from '../helpers/polling.js';
10+
import { cleanupFlow } from '../helpers/cleanup.js';
11+
import { createEventTracker } from '../helpers/test-utils.js';
12+
import { skipStep } from '../helpers/skip-step.js';
13+
14+
/**
15+
* Tests for skipped step event handling in the client.
16+
*
17+
* Skipped steps can occur when:
18+
* - A step's condition evaluates to false (condition_unmet)
19+
* - A dependency was skipped, causing cascading skips (dependency_skipped)
20+
* - A handler fails during evaluation (handler_failed)
21+
*
22+
* These tests verify the client correctly:
23+
* - Receives and processes skipped broadcast events
24+
* - Updates step state with skipped_at and skip_reason
25+
* - Treats skipped as a terminal state
26+
* - Handles waitForStatus(Skipped) correctly
27+
*/
28+
describe('Skipped Step Handling', () => {
29+
it(
30+
'client handles skipped step state from database snapshot',
31+
withPgNoTransaction(async (sql) => {
32+
// This test verifies the client correctly handles skipped step state
33+
// when fetched from the database (e.g., on reconnect or late join)
34+
35+
const testFlow = createTestFlow('skip_snap');
36+
await cleanupFlow(sql, testFlow.slug);
37+
await grantMinimalPgflowPermissions(sql);
38+
39+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
40+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'will_skip_step')`;
41+
42+
const supabaseClient = createTestSupabaseClient();
43+
const pgflowClient = new PgflowClient(supabaseClient, {
44+
realtimeStabilizationDelayMs: 1000,
45+
});
46+
47+
// Start the flow
48+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
49+
const step = run.step('will_skip_step');
50+
51+
// Verify initial state is Started (root step)
52+
expect(step.status).toBe(FlowStepStatus.Started);
53+
54+
// Directly call pgflow.skip_step to simulate the step being skipped
55+
// This mimics what would happen when a condition evaluates to false
56+
await skipStep(sql, run.run_id, 'will_skip_step', 'condition_unmet');
57+
58+
// Wait for the skipped event to be received
59+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
60+
61+
// Verify skipped state
62+
expect(step.status).toBe(FlowStepStatus.Skipped);
63+
expect(step.skipped_at).toBeInstanceOf(Date);
64+
expect(step.skip_reason).toBe('condition_unmet');
65+
66+
// Verify output is null for skipped steps (per design decision Q1)
67+
expect(step.output).toBeNull();
68+
69+
await supabaseClient.removeAllChannels();
70+
}),
71+
{ timeout: 15000 }
72+
);
73+
74+
it(
75+
'receives skipped broadcast event and updates step state',
76+
withPgNoTransaction(async (sql) => {
77+
// This test verifies the client receives and processes skipped events
78+
// broadcast via Supabase realtime
79+
80+
const testFlow = createTestFlow('skip_broadcast');
81+
await cleanupFlow(sql, testFlow.slug);
82+
await grantMinimalPgflowPermissions(sql);
83+
84+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
85+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'skipped_step')`;
86+
87+
const supabaseClient = createTestSupabaseClient();
88+
const pgflowClient = new PgflowClient(supabaseClient, {
89+
realtimeStabilizationDelayMs: 1000,
90+
});
91+
92+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
93+
const step = run.step('skipped_step');
94+
95+
// Set up event tracking BEFORE the skip happens
96+
const tracker = createEventTracker();
97+
step.on('*', tracker.callback);
98+
99+
// Skip the step
100+
await skipStep(sql, run.run_id, 'skipped_step', 'handler_failed');
101+
102+
// Wait for the skipped status
103+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
104+
105+
// Verify we received the skipped event
106+
expect(tracker).toHaveReceivedEvent('step:skipped');
107+
expect(tracker).toHaveReceivedEvent('step:skipped', {
108+
run_id: run.run_id,
109+
step_slug: 'skipped_step',
110+
status: FlowStepStatus.Skipped,
111+
skip_reason: 'handler_failed',
112+
});
113+
114+
// Verify step state
115+
expect(step.status).toBe(FlowStepStatus.Skipped);
116+
expect(step.skip_reason).toBe('handler_failed');
117+
118+
await supabaseClient.removeAllChannels();
119+
}),
120+
{ timeout: 15000 }
121+
);
122+
123+
it(
124+
'skipped is a terminal state - no further status changes',
125+
withPgNoTransaction(async (sql) => {
126+
// Verify that once a step is skipped, it cannot transition to other states
127+
128+
const testFlow = createTestFlow('skip_terminal');
129+
await cleanupFlow(sql, testFlow.slug);
130+
await grantMinimalPgflowPermissions(sql);
131+
132+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
133+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'terminal_step')`;
134+
135+
const supabaseClient = createTestSupabaseClient();
136+
const pgflowClient = new PgflowClient(supabaseClient, {
137+
realtimeStabilizationDelayMs: 1000,
138+
});
139+
140+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
141+
const step = run.step('terminal_step');
142+
143+
// Skip the step
144+
await skipStep(sql, run.run_id, 'terminal_step', 'dependency_skipped');
145+
146+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
147+
148+
// Store original skipped_at
149+
const originalSkippedAt = step.skipped_at;
150+
151+
// Set up tracker for any subsequent events
152+
const tracker = createEventTracker();
153+
step.on('*', tracker.callback);
154+
155+
// Verify skipped steps don't produce tasks (nothing to read from queue)
156+
const sqlClient = new PgflowSqlClient(sql);
157+
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 1);
158+
expect(tasks).toHaveLength(0);
159+
160+
// Give time for any potential events
161+
await new Promise((resolve) => setTimeout(resolve, 1000));
162+
163+
// Verify step is still skipped with same timestamp
164+
expect(step.status).toBe(FlowStepStatus.Skipped);
165+
expect(step.skipped_at).toEqual(originalSkippedAt);
166+
167+
// Verify no additional events were processed
168+
expect(tracker).toHaveReceivedTotalEvents(0);
169+
170+
await supabaseClient.removeAllChannels();
171+
}),
172+
{ timeout: 15000 }
173+
);
174+
175+
it(
176+
'waitForStatus(Skipped) resolves when step is skipped',
177+
withPgNoTransaction(async (sql) => {
178+
// Verify waitForStatus works correctly with Skipped status
179+
180+
const testFlow = createTestFlow('wait_skip');
181+
await cleanupFlow(sql, testFlow.slug);
182+
await grantMinimalPgflowPermissions(sql);
183+
184+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
185+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'wait_step')`;
186+
187+
const supabaseClient = createTestSupabaseClient();
188+
const pgflowClient = new PgflowClient(supabaseClient, {
189+
realtimeStabilizationDelayMs: 1000,
190+
});
191+
192+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
193+
const step = run.step('wait_step');
194+
195+
// Start waiting for skipped BEFORE the skip happens
196+
const waitPromise = step.waitForStatus(FlowStepStatus.Skipped, {
197+
timeoutMs: 10000,
198+
});
199+
200+
// Skip the step after a small delay
201+
setTimeout(async () => {
202+
await skipStep(sql, run.run_id, 'wait_step', 'condition_unmet');
203+
}, 100);
204+
205+
// Wait should resolve with the step
206+
const result = await waitPromise;
207+
expect(result).toBe(step);
208+
expect(result.status).toBe(FlowStepStatus.Skipped);
209+
expect(result.skip_reason).toBe('condition_unmet');
210+
211+
await supabaseClient.removeAllChannels();
212+
}),
213+
{ timeout: 15000 }
214+
);
215+
216+
it(
217+
'handles all skip reasons correctly',
218+
withPgNoTransaction(async (sql) => {
219+
// Verify all three skip reasons are handled correctly
220+
221+
const skipReasons = [
222+
'condition_unmet',
223+
'handler_failed',
224+
'dependency_skipped',
225+
] as const;
226+
227+
for (const skipReason of skipReasons) {
228+
const testFlow = createTestFlow(`skip_${skipReason}`);
229+
await cleanupFlow(sql, testFlow.slug);
230+
await grantMinimalPgflowPermissions(sql);
231+
232+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
233+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'reason_step')`;
234+
235+
const supabaseClient = createTestSupabaseClient();
236+
const pgflowClient = new PgflowClient(supabaseClient, {
237+
realtimeStabilizationDelayMs: 1000,
238+
});
239+
240+
const run = await pgflowClient.startFlow(testFlow.slug, {
241+
test: 'data',
242+
});
243+
const step = run.step('reason_step');
244+
245+
// Skip with specific reason
246+
await skipStep(sql, run.run_id, 'reason_step', skipReason);
247+
248+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
249+
250+
// Verify the skip reason was captured correctly
251+
expect(step.status).toBe(FlowStepStatus.Skipped);
252+
expect(step.skip_reason).toBe(skipReason);
253+
254+
await supabaseClient.removeAllChannels();
255+
}
256+
}),
257+
{ timeout: 45000 }
258+
);
259+
});

pkgs/client/__tests__/helpers/event-factories.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
BroadcastStepStartedEvent,
88
BroadcastStepCompletedEvent,
99
BroadcastStepFailedEvent,
10+
BroadcastStepSkippedEvent,
1011
} from '../../src/lib/types';
1112

1213
/**
@@ -98,3 +99,17 @@ export function createStepFailedEvent(
9899
...overrides,
99100
};
100101
}
102+
103+
export function createStepSkippedEvent(
104+
overrides: Partial<BroadcastStepSkippedEvent> = {}
105+
): BroadcastStepSkippedEvent {
106+
return {
107+
event_type: 'step:skipped',
108+
run_id: '123e4567-e89b-12d3-a456-426614174000',
109+
step_slug: 'test-step',
110+
status: FlowStepStatus.Skipped,
111+
skipped_at: new Date().toISOString(),
112+
skip_reason: 'condition_unmet',
113+
...overrides,
114+
};
115+
}

pkgs/client/__tests__/helpers/fixtures.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ export function createTestFlow(flowSlug?: string) {
44
const uniqueSuffix = `${Date.now()}_${Math.random()
55
.toString(36)
66
.substr(2, 5)}`;
7+
8+
const maxBaseLength = 48 - uniqueSuffix.length - 1;
9+
const baseSlug = flowSlug ? flowSlug.slice(0, maxBaseLength) : 'test_flow';
10+
711
return {
8-
slug: flowSlug
9-
? `${flowSlug}_${uniqueSuffix}`
10-
: `test_flow_${uniqueSuffix}`,
12+
slug: `${baseSlug}_${uniqueSuffix}`,
1113
options: {},
1214
};
1315
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import type postgres from 'postgres';
2+
3+
/**
4+
* Skip a step using the internal _cascade_force_skip_steps function.
5+
* This is a test helper that wraps the internal function.
6+
* If pgflow.skip_step() is exposed publicly later, swap implementation here.
7+
*/
8+
export async function skipStep(
9+
sql: postgres.Sql,
10+
runId: string,
11+
stepSlug: string,
12+
skipReason: 'condition_unmet' | 'handler_failed' | 'dependency_skipped'
13+
): Promise<void> {
14+
await sql`SELECT pgflow._cascade_force_skip_steps(
15+
${runId}::uuid,
16+
${stepSlug}::text,
17+
${skipReason}::text
18+
)`;
19+
}

0 commit comments

Comments
 (0)