Skip to content

Commit c376d70

Browse files
committed
pgf-3hs.13: Unify Flow Compilation Paths via FlowShape
1 parent 1889d80 commit c376d70

19 files changed

Lines changed: 749 additions & 140 deletions

pkgs/core/schemas/0100_function_compare_flow_shapes.sql

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,32 @@ BEGIN
107107
)
108108
);
109109
END IF;
110+
111+
-- Compare whenUnmet (structural - affects DAG execution semantics)
112+
IF v_local_step->>'whenUnmet' != v_db_step->>'whenUnmet' THEN
113+
v_differences := array_append(
114+
v_differences,
115+
format(
116+
$$Step at index %s: whenUnmet differs '%s' vs '%s'$$,
117+
v_idx,
118+
v_local_step->>'whenUnmet',
119+
v_db_step->>'whenUnmet'
120+
)
121+
);
122+
END IF;
123+
124+
-- Compare whenFailed (structural - affects DAG execution semantics)
125+
IF v_local_step->>'whenFailed' != v_db_step->>'whenFailed' THEN
126+
v_differences := array_append(
127+
v_differences,
128+
format(
129+
$$Step at index %s: whenFailed differs '%s' vs '%s'$$,
130+
v_idx,
131+
v_local_step->>'whenFailed',
132+
v_db_step->>'whenFailed'
133+
)
134+
);
135+
END IF;
110136
END IF;
111137
END LOOP;
112138

pkgs/core/schemas/0100_function_create_flow_from_shape.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ BEGIN
4747
base_delay => (v_step_options->>'baseDelay')::int,
4848
timeout => (v_step_options->>'timeout')::int,
4949
start_delay => (v_step_options->>'startDelay')::int,
50-
step_type => v_step->>'stepType'
50+
step_type => v_step->>'stepType',
51+
when_unmet => v_step->>'whenUnmet',
52+
when_failed => v_step->>'whenFailed'
5153
);
5254
END LOOP;
5355
END;

pkgs/core/schemas/0100_function_get_flow_shape.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ as $$
2222
AND dep.step_slug = step.step_slug
2323
),
2424
'[]'::jsonb
25-
)
25+
),
26+
'whenUnmet', step.when_unmet,
27+
'whenFailed', step.when_failed
2628
)
2729
ORDER BY step.step_index
2830
),

pkgs/core/supabase/migrations/20260108131350_pgflow_step_conditions.sql

Lines changed: 264 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,270 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp
1616
CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text);
1717
-- Modify "steps" table
1818
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "condition_not_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail';
19+
-- Modify "_compare_flow_shapes" function
20+
CREATE OR REPLACE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$
21+
DECLARE
22+
v_differences text[] := '{}';
23+
v_local_steps jsonb;
24+
v_db_steps jsonb;
25+
v_local_count int;
26+
v_db_count int;
27+
v_max_count int;
28+
v_idx int;
29+
v_local_step jsonb;
30+
v_db_step jsonb;
31+
v_local_deps text;
32+
v_db_deps text;
33+
BEGIN
34+
v_local_steps := p_local->'steps';
35+
v_db_steps := p_db->'steps';
36+
v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb));
37+
v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb));
38+
39+
-- Compare step counts
40+
IF v_local_count != v_db_count THEN
41+
v_differences := array_append(
42+
v_differences,
43+
format('Step count differs: %s vs %s', v_local_count, v_db_count)
44+
);
45+
END IF;
46+
47+
-- Compare steps by index
48+
v_max_count := GREATEST(v_local_count, v_db_count);
49+
50+
FOR v_idx IN 0..(v_max_count - 1) LOOP
51+
v_local_step := v_local_steps->v_idx;
52+
v_db_step := v_db_steps->v_idx;
53+
54+
IF v_local_step IS NULL THEN
55+
v_differences := array_append(
56+
v_differences,
57+
format(
58+
$$Step at index %s: missing in first shape (second has '%s')$$,
59+
v_idx,
60+
v_db_step->>'slug'
61+
)
62+
);
63+
ELSIF v_db_step IS NULL THEN
64+
v_differences := array_append(
65+
v_differences,
66+
format(
67+
$$Step at index %s: missing in second shape (first has '%s')$$,
68+
v_idx,
69+
v_local_step->>'slug'
70+
)
71+
);
72+
ELSE
73+
-- Compare slug
74+
IF v_local_step->>'slug' != v_db_step->>'slug' THEN
75+
v_differences := array_append(
76+
v_differences,
77+
format(
78+
$$Step at index %s: slug differs '%s' vs '%s'$$,
79+
v_idx,
80+
v_local_step->>'slug',
81+
v_db_step->>'slug'
82+
)
83+
);
84+
END IF;
85+
86+
-- Compare step type
87+
IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN
88+
v_differences := array_append(
89+
v_differences,
90+
format(
91+
$$Step at index %s: type differs '%s' vs '%s'$$,
92+
v_idx,
93+
v_local_step->>'stepType',
94+
v_db_step->>'stepType'
95+
)
96+
);
97+
END IF;
98+
99+
-- Compare dependencies (convert arrays to comma-separated strings)
100+
SELECT string_agg(dep, ', ' ORDER BY dep)
101+
INTO v_local_deps
102+
FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep;
103+
104+
SELECT string_agg(dep, ', ' ORDER BY dep)
105+
INTO v_db_deps
106+
FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep;
107+
108+
IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN
109+
v_differences := array_append(
110+
v_differences,
111+
format(
112+
$$Step at index %s: dependencies differ [%s] vs [%s]$$,
113+
v_idx,
114+
COALESCE(v_local_deps, ''),
115+
COALESCE(v_db_deps, '')
116+
)
117+
);
118+
END IF;
119+
120+
-- Compare whenUnmet (structural - affects DAG execution semantics)
121+
IF v_local_step->>'whenUnmet' != v_db_step->>'whenUnmet' THEN
122+
v_differences := array_append(
123+
v_differences,
124+
format(
125+
$$Step at index %s: whenUnmet differs '%s' vs '%s'$$,
126+
v_idx,
127+
v_local_step->>'whenUnmet',
128+
v_db_step->>'whenUnmet'
129+
)
130+
);
131+
END IF;
132+
133+
-- Compare whenFailed (structural - affects DAG execution semantics)
134+
IF v_local_step->>'whenFailed' != v_db_step->>'whenFailed' THEN
135+
v_differences := array_append(
136+
v_differences,
137+
format(
138+
$$Step at index %s: whenFailed differs '%s' vs '%s'$$,
139+
v_idx,
140+
v_local_step->>'whenFailed',
141+
v_db_step->>'whenFailed'
142+
)
143+
);
144+
END IF;
145+
END IF;
146+
END LOOP;
147+
148+
RETURN v_differences;
149+
END;
150+
$BODY$;
151+
-- Create "add_step" function
152+
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
153+
DECLARE
154+
result_step pgflow.steps;
155+
next_idx int;
156+
BEGIN
157+
-- Validate map step constraints
158+
-- Map steps can have either:
159+
-- 0 dependencies (root map - maps over flow input array)
160+
-- 1 dependency (dependent map - maps over dependency output array)
161+
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
162+
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
163+
add_step.step_slug,
164+
COALESCE(array_length(add_step.deps_slugs, 1), 0),
165+
array_to_string(add_step.deps_slugs, ', ');
166+
END IF;
167+
168+
-- Get next step index
169+
SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx
170+
FROM pgflow.steps s
171+
WHERE s.flow_slug = add_step.flow_slug;
172+
173+
-- Create the step
174+
INSERT INTO pgflow.steps (
175+
flow_slug, step_slug, step_type, step_index, deps_count,
176+
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
177+
condition_pattern, condition_not_pattern, when_unmet, when_failed
178+
)
179+
VALUES (
180+
add_step.flow_slug,
181+
add_step.step_slug,
182+
COALESCE(add_step.step_type, 'single'),
183+
next_idx,
184+
COALESCE(array_length(add_step.deps_slugs, 1), 0),
185+
add_step.max_attempts,
186+
add_step.base_delay,
187+
add_step.timeout,
188+
add_step.start_delay,
189+
add_step.condition_pattern,
190+
add_step.condition_not_pattern,
191+
add_step.when_unmet,
192+
add_step.when_failed
193+
)
194+
ON CONFLICT ON CONSTRAINT steps_pkey
195+
DO UPDATE SET step_slug = EXCLUDED.step_slug
196+
RETURNING * INTO result_step;
197+
198+
-- Insert dependencies
199+
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
200+
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
201+
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
202+
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
203+
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;
204+
205+
RETURN result_step;
206+
END;
207+
$$;
208+
-- Modify "_create_flow_from_shape" function
209+
CREATE OR REPLACE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
210+
DECLARE
211+
v_step jsonb;
212+
v_deps text[];
213+
v_flow_options jsonb;
214+
v_step_options jsonb;
215+
BEGIN
216+
-- Extract flow-level options (may be null)
217+
v_flow_options := p_shape->'options';
218+
219+
-- Create the flow with options (NULL = use default)
220+
PERFORM pgflow.create_flow(
221+
p_flow_slug,
222+
(v_flow_options->>'maxAttempts')::int,
223+
(v_flow_options->>'baseDelay')::int,
224+
(v_flow_options->>'timeout')::int
225+
);
226+
227+
-- Iterate over steps in order and add each one
228+
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
229+
LOOP
230+
-- Convert dependencies jsonb array to text array
231+
SELECT COALESCE(array_agg(dep), '{}')
232+
INTO v_deps
233+
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;
234+
235+
-- Extract step options (may be null)
236+
v_step_options := v_step->'options';
237+
238+
-- Add the step with options (NULL = use default/inherit)
239+
PERFORM pgflow.add_step(
240+
flow_slug => p_flow_slug,
241+
step_slug => v_step->>'slug',
242+
deps_slugs => v_deps,
243+
max_attempts => (v_step_options->>'maxAttempts')::int,
244+
base_delay => (v_step_options->>'baseDelay')::int,
245+
timeout => (v_step_options->>'timeout')::int,
246+
start_delay => (v_step_options->>'startDelay')::int,
247+
step_type => v_step->>'stepType',
248+
when_unmet => v_step->>'whenUnmet',
249+
when_failed => v_step->>'whenFailed'
250+
);
251+
END LOOP;
252+
END;
253+
$$;
254+
-- Modify "_get_flow_shape" function
255+
CREATE OR REPLACE FUNCTION "pgflow"."_get_flow_shape" ("p_flow_slug" text) RETURNS jsonb LANGUAGE sql STABLE SET "search_path" = '' AS $$
256+
SELECT jsonb_build_object(
257+
'steps',
258+
COALESCE(
259+
jsonb_agg(
260+
jsonb_build_object(
261+
'slug', step.step_slug,
262+
'stepType', step.step_type,
263+
'dependencies', COALESCE(
264+
(
265+
SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug)
266+
FROM pgflow.deps AS dep
267+
WHERE dep.flow_slug = step.flow_slug
268+
AND dep.step_slug = step.step_slug
269+
),
270+
'[]'::jsonb
271+
),
272+
'whenUnmet', step.when_unmet,
273+
'whenFailed', step.when_failed
274+
)
275+
ORDER BY step.step_index
276+
),
277+
'[]'::jsonb
278+
)
279+
)
280+
FROM pgflow.steps AS step
281+
WHERE step.flow_slug = p_flow_slug;
282+
$$;
19283
-- Create "_cascade_force_skip_steps" function
20284
CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$
21285
DECLARE
@@ -1454,62 +1718,5 @@ with tasks as (
14541718
dep_out.run_id = st.run_id and
14551719
dep_out.step_slug = st.step_slug
14561720
$$;
1457-
-- Create "add_step" function
1458-
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
1459-
DECLARE
1460-
result_step pgflow.steps;
1461-
next_idx int;
1462-
BEGIN
1463-
-- Validate map step constraints
1464-
-- Map steps can have either:
1465-
-- 0 dependencies (root map - maps over flow input array)
1466-
-- 1 dependency (dependent map - maps over dependency output array)
1467-
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
1468-
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
1469-
add_step.step_slug,
1470-
COALESCE(array_length(add_step.deps_slugs, 1), 0),
1471-
array_to_string(add_step.deps_slugs, ', ');
1472-
END IF;
1473-
1474-
-- Get next step index
1475-
SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx
1476-
FROM pgflow.steps s
1477-
WHERE s.flow_slug = add_step.flow_slug;
1478-
1479-
-- Create the step
1480-
INSERT INTO pgflow.steps (
1481-
flow_slug, step_slug, step_type, step_index, deps_count,
1482-
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
1483-
condition_pattern, condition_not_pattern, when_unmet, when_failed
1484-
)
1485-
VALUES (
1486-
add_step.flow_slug,
1487-
add_step.step_slug,
1488-
COALESCE(add_step.step_type, 'single'),
1489-
next_idx,
1490-
COALESCE(array_length(add_step.deps_slugs, 1), 0),
1491-
add_step.max_attempts,
1492-
add_step.base_delay,
1493-
add_step.timeout,
1494-
add_step.start_delay,
1495-
add_step.condition_pattern,
1496-
add_step.condition_not_pattern,
1497-
add_step.when_unmet,
1498-
add_step.when_failed
1499-
)
1500-
ON CONFLICT ON CONSTRAINT steps_pkey
1501-
DO UPDATE SET step_slug = EXCLUDED.step_slug
1502-
RETURNING * INTO result_step;
1503-
1504-
-- Insert dependencies
1505-
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
1506-
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
1507-
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
1508-
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
1509-
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;
1510-
1511-
RETURN result_step;
1512-
END;
1513-
$$;
15141721
-- Drop "add_step" function
15151722
DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer, text);

0 commit comments

Comments
 (0)