Skip to content

Commit ab7fb4f

Browse files
committed
feat: Search pipeline name in pipeline run API
1 parent 5a2958c commit ab7fb4f

6 files changed

Lines changed: 1119 additions & 116 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from . import backend_types_sql as bts
1111
from . import component_structures as structures
12+
from . import database_ops
1213
from . import errors
1314
from . import filter_query_sql
1415

@@ -32,6 +33,27 @@ def _get_current_time() -> datetime.datetime:
3233
return datetime.datetime.now(tz=datetime.timezone.utc)
3334

3435

36+
def _get_pipeline_name_from_task_spec(
37+
*,
38+
task_spec_dict: dict[str, Any],
39+
) -> str | None:
40+
"""Extract pipeline name from a task_spec dict via component_ref.spec.name.
41+
42+
Traversal path:
43+
task_spec_dict -> TaskSpec -> component_ref -> spec -> name
44+
45+
Returns None if any step in the chain is missing or parsing fails.
46+
"""
47+
try:
48+
task_spec = structures.TaskSpec.from_json_dict(task_spec_dict)
49+
except Exception:
50+
return None
51+
spec = task_spec.component_ref.spec
52+
if spec is None:
53+
return None
54+
return spec.name or None
55+
56+
3557
# ==== PipelineJobService
3658
@dataclasses.dataclass(kw_only=True)
3759
class PipelineRunResponse:
@@ -113,19 +135,15 @@ def create(
113135
},
114136
)
115137
session.add(pipeline_run)
116-
# Mirror created_by into the annotations table so it's searchable
117-
# via filter_query like any other annotation.
118-
if created_by is not None:
119-
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
120-
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
121-
session.flush()
122-
session.add(
123-
bts.PipelineRunAnnotation(
124-
pipeline_run_id=pipeline_run.id,
125-
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
126-
value=created_by,
127-
)
128-
)
138+
# Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs.
139+
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
140+
session.flush()
141+
_mirror_system_annotations(
142+
session=session,
143+
pipeline_run_id=pipeline_run.id,
144+
created_by=created_by,
145+
pipeline_name=pipeline_name,
146+
)
129147
session.commit()
130148

131149
session.refresh(pipeline_run)
@@ -244,12 +262,9 @@ def _create_pipeline_run_response(
244262
bts.ExecutionNode, pipeline_run.root_execution_id
245263
)
246264
if execution_node:
247-
task_spec = structures.TaskSpec.from_json_dict(
248-
execution_node.task_spec
265+
pipeline_name = _get_pipeline_name_from_task_spec(
266+
task_spec_dict=execution_node.task_spec
249267
)
250-
component_spec = task_spec.component_ref.spec
251-
if component_spec:
252-
pipeline_name = component_spec.name
253268
response.pipeline_name = pipeline_name
254269
if include_execution_stats:
255270
execution_status_stats = self._calculate_execution_status_stats(
@@ -1153,6 +1168,32 @@ def list_secrets(
11531168
]
11541169

11551170

1171+
def _mirror_system_annotations(
1172+
*,
1173+
session: orm.Session,
1174+
pipeline_run_id: bts.IdType,
1175+
created_by: str | None,
1176+
pipeline_name: str | None,
1177+
) -> None:
1178+
"""Mirror pipeline run fields as system annotations for filter_query search."""
1179+
if created_by:
1180+
session.add(
1181+
bts.PipelineRunAnnotation(
1182+
pipeline_run_id=pipeline_run_id,
1183+
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
1184+
value=created_by,
1185+
)
1186+
)
1187+
if pipeline_name:
1188+
session.add(
1189+
bts.PipelineRunAnnotation(
1190+
pipeline_run_id=pipeline_run_id,
1191+
key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
1192+
value=pipeline_name,
1193+
)
1194+
)
1195+
1196+
11561197
def _recursively_create_all_executions_and_artifacts_root(
11571198
session: orm.Session,
11581199
root_task_spec: structures.TaskSpec,

cloud_pipelines_backend/database_ops.py

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Any
2+
13
import sqlalchemy
24
from sqlalchemy import orm
35

@@ -87,6 +89,7 @@ def migrate_db(db_engine: sqlalchemy.Engine):
8789
break
8890

8991
_backfill_pipeline_run_created_by_annotations(db_engine=db_engine)
92+
_backfill_pipeline_run_name_annotations(db_engine=db_engine)
9093

9194

9295
def _is_pipeline_run_annotation_key_already_backfilled(
@@ -142,3 +145,232 @@ def _backfill_pipeline_run_created_by_annotations(
142145
)
143146
session.execute(stmt)
144147
session.commit()
148+
149+
150+
def _backfill_pipeline_names_from_extra_data(
151+
*,
152+
session: orm.Session,
153+
) -> None:
154+
"""Phase 1: bulk SQL backfill from extra_data['pipeline_name'].
155+
156+
INSERT INTO pipeline_run_annotation
157+
SELECT id, key, json_extract(extra_data, '$.pipeline_name')
158+
FROM pipeline_run
159+
WHERE json_extract(...) IS NOT NULL
160+
161+
Valid (creates annotation row):
162+
extra_data = {"pipeline_name": "my-pipeline"} -> value = "my-pipeline"
163+
extra_data = {"pipeline_name": ""} -> value = ""
164+
165+
Skipped (no annotation row):
166+
extra_data = NULL -> JSON_EXTRACT = NULL
167+
extra_data = {} -> key absent, NULL
168+
extra_data = {"pipeline_name": null} -> JSON_EXTRACT = NULL
169+
170+
SQLAlchemy's JSON path extraction is NULL-safe: returns SQL NULL
171+
when extra_data is NULL or the key is absent (no Python error).
172+
"""
173+
pipeline_name_expr = bts.PipelineRun.extra_data["pipeline_name"].as_string()
174+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
175+
["pipeline_run_id", "key", "value"],
176+
sqlalchemy.select(
177+
bts.PipelineRun.id,
178+
sqlalchemy.literal(
179+
filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
180+
),
181+
pipeline_name_expr,
182+
).where(
183+
pipeline_name_expr.isnot(None),
184+
),
185+
)
186+
session.execute(stmt)
187+
188+
189+
def _backfill_pipeline_names_from_component_spec(
190+
*,
191+
session: orm.Session,
192+
) -> None:
193+
"""Phase 2: Bulk SQL fallback for runs still missing a name annotation.
194+
195+
Extracts the pipeline name from each run's ExecutionNode via the
196+
JSON path:
197+
198+
task_spec -> 'componentRef' -> 'spec' ->> 'name'
199+
200+
Starting tables:
201+
202+
pipeline_run execution_node
203+
+----+------------------+ +--------+-------------------------------------------+
204+
| id | root_execution_id| | id | task_spec (JSON) |
205+
+----+------------------+ +--------+-------------------------------------------+
206+
| 1 | exec_1 | | exec_1 | {"componentRef":{"spec":{"name":"A"}}} |
207+
| 2 | exec_2 | | exec_2 | {"componentRef":{"spec":null}} |
208+
| 3 | exec_3 | | exec_3 | {"componentRef":{"spec":{"name":""}}} |
209+
| 4 | exec_4 | | exec_4 | {"componentRef":{"spec":{"name":"B"}}} |
210+
| 5 | exec_99 | +--------+-------------------------------------------+
211+
+----+------------------+ (no exec_99 row)
212+
213+
pipeline_run_annotation (pre-existing)
214+
+--------+---------------------------+-------+
215+
| run_id | key | value |
216+
+--------+---------------------------+-------+
217+
| 1 | system/pipeline_run.name | A |
218+
| 3 | user/custom_tag | hello |
219+
+--------+---------------------------+-------+
220+
221+
Step 1 -- JOIN execution_node (INNER JOIN):
222+
Attaches task_spec to each run. Drops runs with no execution_node.
223+
224+
FROM pipeline_run pr
225+
JOIN execution_node en ON en.id = pr.root_execution_id
226+
227+
+----+--------+-------------------------------------------+
228+
| id | en.id | en.task_spec |
229+
+----+--------+-------------------------------------------+
230+
| 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} |
231+
| 2 | exec_2 | {"componentRef":{"spec":null}} |
232+
| 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} |
233+
| 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} |
234+
+----+--------+-------------------------------------------+
235+
(run 5 dropped -- exec_99 doesn't exist)
236+
237+
Step 2a -- LEFT JOIN annotation:
238+
Attempts to match each run to an existing name annotation.
239+
240+
LEFT JOIN pipeline_run_annotation ann
241+
ON ann.pipeline_run_id = pr.id
242+
AND ann.key = 'system/pipeline_run.name'
243+
244+
+----+--------+------------------------------------------+------------------+----------+
245+
| id | en.id | en.task_spec | ann.run_id | ann.key |
246+
+----+--------+------------------------------------------+------------------+----------+
247+
| 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | 1 | sys/name |
248+
| 2 | exec_2 | {"componentRef":{"spec":null}} | NULL | NULL |
249+
| 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | NULL | NULL |
250+
| 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | NULL | NULL |
251+
+----+--------+------------------------------------------+------------------+----------+
252+
(run 1 matched -- has 'system/pipeline_run.name' annotation)
253+
(run 3 NULL -- has 'user/custom_tag' but ON requires key = 'system/pipeline_run.name')
254+
255+
Step 2b -- WHERE ann.pipeline_run_id IS NULL (anti-join filter):
256+
Keeps only runs where the LEFT JOIN found no match.
257+
258+
WHERE ann.pipeline_run_id IS NULL
259+
260+
+----+--------+-------------------------------------------+
261+
| id | en.id | en.task_spec |
262+
+----+--------+-------------------------------------------+
263+
| 2 | exec_2 | {"componentRef":{"spec":null}} |
264+
| 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} |
265+
| 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} |
266+
+----+--------+-------------------------------------------+
267+
(run 1 dropped -- ann.run_id was 1, not NULL)
268+
269+
Step 3 -- JSON extraction + NULL filter:
270+
Extracts name from JSON path, keeps only non-null (empty string is allowed).
271+
272+
WHERE task_spec->'componentRef'->'spec'->>'name' IS NOT NULL
273+
274+
+----+-------------------------------------------+-----------+
275+
| id | en.task_spec | name_expr |
276+
+----+-------------------------------------------+-----------+
277+
| 2 | {"componentRef":{"spec":null}} | NULL | <- dropped
278+
| 3 | {"componentRef":{"spec":{"name":""}}} | "" | <- kept (empty string OK)
279+
| 4 | {"componentRef":{"spec":{"name":"B"}}} | "B" | <- kept
280+
+----+-------------------------------------------+-----------+
281+
282+
Step 4 -- INSERT INTO pipeline_run_annotation:
283+
Inserts one row per surviving run.
284+
285+
INSERT INTO pipeline_run_annotation (pipeline_run_id, key, value)
286+
+--------+---------------------------+-------+
287+
| run_id | key | value |
288+
+--------+---------------------------+-------+
289+
| 3 | system/pipeline_run.name | |
290+
| 4 | system/pipeline_run.name | B |
291+
+--------+---------------------------+-------+
292+
293+
The JSON path is portable across databases via SQLAlchemy:
294+
- SQLite: JSON_EXTRACT(task_spec, '$.componentRef.spec.name')
295+
- MySQL: JSON_UNQUOTE(JSON_EXTRACT(...))
296+
- PostgreSQL: task_spec -> 'componentRef' -> 'spec' ->> 'name'
297+
298+
Any null at any depth (task_spec NULL, componentRef missing,
299+
spec null, name missing) produces SQL NULL, filtered out by
300+
IS NOT NULL. Empty string is allowed and will be inserted.
301+
"""
302+
key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME
303+
name_expr = bts.ExecutionNode.task_spec[
304+
("componentRef", "spec", "name")
305+
].as_string()
306+
existing_ann = orm.aliased(bts.PipelineRunAnnotation)
307+
308+
# Step 4: INSERT INTO pipeline_run_annotation
309+
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
310+
["pipeline_run_id", "key", "value"],
311+
sqlalchemy.select(
312+
bts.PipelineRun.id,
313+
sqlalchemy.literal(str(key)),
314+
name_expr,
315+
)
316+
# Step 1: INNER JOIN execution_node
317+
.join(
318+
bts.ExecutionNode,
319+
bts.ExecutionNode.id == bts.PipelineRun.root_execution_id,
320+
)
321+
# Step 2a: LEFT JOIN existing annotation
322+
.outerjoin(
323+
existing_ann,
324+
sqlalchemy.and_(
325+
existing_ann.pipeline_run_id == bts.PipelineRun.id,
326+
existing_ann.key == key,
327+
),
328+
).where(
329+
# Step 2b: Anti-join — keep only runs with no existing annotation
330+
existing_ann.pipeline_run_id.is_(None),
331+
# Step 3: JSON extraction — keep only non-NULL names
332+
name_expr.isnot(None),
333+
),
334+
)
335+
session.execute(stmt)
336+
337+
338+
def _backfill_pipeline_run_name_annotations(
339+
*,
340+
db_engine: sqlalchemy.Engine,
341+
) -> None:
342+
"""Backfill pipeline_run_annotation with pipeline names.
343+
344+
The check and both inserts run in a single session/transaction to
345+
avoid TOCTOU races between concurrent startup processes. If anything
346+
fails, the entire transaction rolls back automatically.
347+
348+
Skips entirely if any name annotation already exists (i.e. the
349+
write-path is populating them, so the backfill has already run or is
350+
no longer needed).
351+
352+
Phase 1 -- _backfill_pipeline_names_from_extra_data:
353+
Bulk SQL insert from extra_data['pipeline_name'].
354+
355+
Phase 2 -- _backfill_pipeline_names_from_component_spec:
356+
Bulk SQL fallback for runs Phase 1 missed (extra_data is NULL or
357+
missing the key). Extracts name via JSON path
358+
task_spec -> componentRef -> spec -> name.
359+
360+
Annotation creation rules (same for both phases):
361+
Creates row: any non-NULL string, including empty string ""
362+
Skips row: NULL at any depth in the JSON path
363+
"""
364+
with orm.Session(db_engine) as session:
365+
if _is_pipeline_run_annotation_key_already_backfilled(
366+
session=session,
367+
key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
368+
):
369+
return
370+
371+
# execute() - rows in DB buffer for Phase 2
372+
_backfill_pipeline_names_from_extra_data(session=session)
373+
# Phase 2 sees Phase 1's rows via the shared transaction buffer.
374+
_backfill_pipeline_names_from_component_spec(session=session)
375+
# Both phases become permanent atomically.
376+
session.commit()

0 commit comments

Comments
 (0)