Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ Discovered during CDC/SCD design review (see `docs/superpowers/specs/2026-04-13-
- Implemented in PR #61: self-read node detection via AST node identity, cycle-safe dependency resolution, query-scoped `{query_id}:self_read:{table}.{col}` naming, column-granular cross-query wiring, edge role/order annotations.
- Design: `docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md`

- [ ] **Gap 7. JOIN ON predicate columns not recorded in column lineage**
- Today: JOIN ON predicates produce **zero** column-lineage edges (no handling in `lineage_builder` for ON clause columns beyond the equi-join's identity resolution).
- Symptom: point-in-time joins like `o.order_ts BETWEEN d.start_time AND d.end_time` leave `start_time`/`end_time` invisible as influences on downstream columns.
- Needs its own design doc — new edge semantic for "predicate-conditional" columns.
- [x] **Gap 7. JOIN ON predicate columns not recorded in column lineage**
- Implemented: tagged `is_join_predicate=True` edges from ON-clause columns to right-side projected output columns. Supports equi-joins, range/BETWEEN, function-wrapped, multi-join chains, and Gap 4 self-read interaction.
- Design: `docs/superpowers/specs/2026-04-13-gap7-join-predicate-columns-design.md`
244 changes: 244 additions & 0 deletions examples/join_predicate_lineage.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "a1b2c3d4",
"metadata": {},
"source": [
"# JOIN Predicate Column Lineage\n",
"\n",
"**Example: Tracking JOIN ON Predicate Columns in Column Lineage (Gap 7)**\n",
"\n",
"\n",
"This example demonstrates how clgraph tracks JOIN ON predicate columns as\n",
"lineage edges. Before Gap 7, only value-flow columns (columns in SELECT)\n",
"appeared in the lineage graph. Now, columns used in JOIN ON clauses are\n",
"tracked as predicate edges, making previously invisible dependencies\n",
"visible for impact analysis.\n",
"\n",
"Key features demonstrated:\n",
"1. Basic equi-join predicate edges with metadata\n",
"2. Point-in-time / range join (BETWEEN) with 5 predicate columns\n",
"3. Multi-join chain with per-join scoped predicate edges\n",
"4. Impact analysis using predicate edges with SQLColumnTracer"
]
},
{
"cell_type": "markdown",
"id": "b2c3d4e5",
"metadata": {},
"source": [
"### Imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c3d4e5f6",
"metadata": {},
"outputs": [],
"source": [
"from clgraph import Pipeline, RecursiveLineageBuilder, SQLColumnTracer\n",
"\n",
"\n",
"def predicate_edges(graph):\n",
" \"\"\"Return only edges where is_join_predicate is True.\"\"\"\n",
" return [e for e in graph.edges if e.is_join_predicate]\n",
"\n",
"\n",
"def predicate_edges_to(graph, target):\n",
" \"\"\"Return predicate edges targeting a specific output column.\"\"\"\n",
" return [e for e in graph.edges if e.is_join_predicate and e.to_node.full_name == target]\n",
"\n",
"\n",
"# ============================================================\n",
"# Example 1: Basic Equi-Join Predicate Edges\n",
"# ============================================================\n",
"print(\"=\" * 60)\n",
"print(\"Example 1: Basic Equi-Join Predicate Edges\")\n",
"print(\"=\" * 60)\n",
"\n",
"sql_1 = \"\"\"\n",
"SELECT o.order_id, o.amount, d.city AS customer_city\n",
"FROM raw_orders o\n",
"LEFT JOIN dim_customer d ON o.customer_id = d.id\n",
"\"\"\"\n",
"\n",
"builder_1 = RecursiveLineageBuilder(sql_1, dialect=\"bigquery\")\n",
"graph_1 = builder_1.build()\n",
"\n",
"print(f\"\\nQuery:{sql_1}\")\n",
"print(\"1a. Value edges (standard lineage):\")\n",
"for edge in graph_1.edges:\n",
" if not edge.is_join_predicate:\n",
" print(f\" {edge.from_node.full_name} -> {edge.to_node.full_name}\")\n",
"\n",
"print(\"\\n1b. Predicate edges (NEW \\u2014 from JOIN ON clause):\")\n",
"for edge in predicate_edges(graph_1):\n",
" print(f\" {edge.from_node.full_name} -> {edge.to_node.full_name}\")\n",
" print(f\" \\u2022 edge_type = {edge.edge_type}\")\n",
" print(f\" \\u2022 join_side = {edge.join_side}\")\n",
" print(f\" \\u2022 join_condition = {edge.join_condition}\")\n",
"\n",
"print(\"\\n1c. Compare: d.city \\u2192 output.customer_city is a VALUE edge:\")\n",
"for edge in graph_1.edges:\n",
" if edge.from_node.full_name == \"dim_customer.city\":\n",
" print(f\" is_join_predicate = {edge.is_join_predicate} \\u2713 (value flow, not predicate)\")\n",
"\n",
"\n",
"# ============================================================\n",
"# Example 2: Point-in-Time / Range Join (BETWEEN)\n",
"# ============================================================\n",
"print(\"\\n\" + \"=\" * 60)\n",
"print(\"Example 2: Point-in-Time Join (BETWEEN)\")\n",
"print(\"=\" * 60)\n",
"\n",
"sql_2 = \"\"\"\n",
"SELECT o.order_id, o.customer_id, o.order_ts, o.amount,\n",
" d.city AS customer_city_at_order\n",
"FROM raw_orders o\n",
"LEFT JOIN dim_customer d\n",
" ON o.customer_id = d.id\n",
" AND o.order_ts BETWEEN d.start_time AND d.end_time\n",
"\"\"\"\n",
"\n",
"builder_2 = RecursiveLineageBuilder(sql_2, dialect=\"bigquery\")\n",
"graph_2 = builder_2.build()\n",
"\n",
"print(f\"\\nQuery:{sql_2}\")\n",
"print(\"2a. All predicate edges \\u2192 customer_city_at_order:\")\n",
"pred_edges_2 = predicate_edges_to(graph_2, \"output.customer_city_at_order\")\n",
"for edge in pred_edges_2:\n",
" print(f\" {edge.from_node.full_name:30s} (join_side={edge.join_side})\")\n",
"\n",
"print(f\"\\n \\u2192 {len(pred_edges_2)} predicate columns detected\")\n",
"print(\" \\u2192 d.start_time and d.end_time were previously INVISIBLE in lineage\")\n",
"\n",
"print(\"\\n2b. Value edge is unchanged:\")\n",
"for edge in graph_2.edges:\n",
" if edge.from_node.full_name == \"dim_customer.city\" and not edge.is_join_predicate:\n",
" print(f\" {edge.from_node.full_name} -> {edge.to_node.full_name} (value edge \\u2713)\")\n",
"\n",
"\n",
"# ============================================================\n",
"# Example 3: Multi-Join Chain with Per-Join Scoping\n",
"# ============================================================\n",
"print(\"\\n\" + \"=\" * 60)\n",
"print(\"Example 3: Multi-Join Chain \\u2014 Per-Join Scoping\")\n",
"print(\"=\" * 60)\n",
"\n",
"sql_3 = \"\"\"\n",
"SELECT a.id, b.val, c.label\n",
"FROM table_a a\n",
"INNER JOIN table_b b ON a.id = b.a_id\n",
"INNER JOIN table_c c ON b.id = c.b_id AND b.category = c.category\n",
"\"\"\"\n",
"\n",
"builder_3 = RecursiveLineageBuilder(sql_3, dialect=\"bigquery\")\n",
"graph_3 = builder_3.build()\n",
"\n",
"print(f\"\\nQuery:{sql_3}\")\n",
"print(\"3a. First join predicates (a.id = b.a_id) \\u2192 output.val ONLY:\")\n",
"for edge in predicate_edges_to(graph_3, \"output.val\"):\n",
" print(f\" {edge.from_node.full_name} -> output.val\")\n",
"\n",
"print(\n",
" \"\\n3b. Second join predicates (b.id = c.b_id AND b.category = c.category) \\u2192 output.label ONLY:\"\n",
")\n",
"for edge in predicate_edges_to(graph_3, \"output.label\"):\n",
" print(f\" {edge.from_node.full_name} -> output.label\")\n",
"\n",
"print(\"\\n3c. No cross-join leakage \\u2014 first join predicates do NOT target output.label:\")\n",
"label_pred_sources = {e.from_node.full_name for e in predicate_edges_to(graph_3, \"output.label\")}\n",
"assert \"table_a.id\" not in label_pred_sources, \"Cross-join leakage detected!\"\n",
"assert \"table_b.a_id\" not in label_pred_sources, \"Cross-join leakage detected!\"\n",
"print(\" \\u2713 table_a.id NOT in output.label predicates\")\n",
"print(\" \\u2713 table_b.a_id NOT in output.label predicates\")\n",
"\n",
"\n",
"# ============================================================\n",
"# Example 4: Impact Analysis with Predicate Edges\n",
"# ============================================================\n",
"print(\"\\n\" + \"=\" * 60)\n",
"print(\"Example 4: Impact Analysis with Predicate Edges\")\n",
"print(\"=\" * 60)\n",
"\n",
"print(\"\\nUsing the point-in-time join from Example 2.\")\n",
"\n",
"print(\"\\n4a. Forward trace from dim_customer.start_time (SQLColumnTracer):\")\n",
"tracer = SQLColumnTracer(sql_2, dialect=\"bigquery\")\n",
"forward = tracer.get_forward_lineage([\"dim_customer.start_time\"])\n",
"print(f\" Impacted outputs: {forward['impacted_outputs']}\")\n",
"print(\" \\u2192 customer_city_at_order is now reachable (was invisible before Gap 7)\")\n",
"\n",
"print(\"\\n4b. Filter predicate vs value edges using is_join_predicate:\")\n",
"value_edges_2 = [e for e in graph_2.edges if not e.is_join_predicate]\n",
"pred_edges_all_2 = [e for e in graph_2.edges if e.is_join_predicate]\n",
"print(f\" Value edges: {len(value_edges_2)}\")\n",
"print(f\" Predicate edges: {len(pred_edges_all_2)}\")\n",
"print(f\" Total edges: {len(graph_2.edges)}\")\n",
"\n",
"print(\"\\n4c. Existing value lineage is unchanged:\")\n",
"for edge in value_edges_2:\n",
" print(f\" {edge.from_node.full_name} -> {edge.to_node.full_name}\")\n",
"\n",
"print(\"\\n\" + \"=\" * 60)\n",
"print(\"JOIN Predicate Lineage Examples Complete!\")\n",
"print(\"=\" * 60)"
]
},
{
"cell_type": "markdown",
"id": "d4e5f6a7",
"metadata": {},
"source": [
"### Visualize Point-in-Time Join Lineage\n",
"\n",
"Display the column lineage graph for the point-in-time join (Example 2),\n",
"showing both value and predicate edges."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e5f6a7b8",
"metadata": {},
"outputs": [],
"source": [
"import shutil\n",
"\n",
"from clgraph import visualize_pipeline_lineage\n",
"\n",
"# Create pipeline for visualization using the point-in-time join\n",
"sql_pit = \"\"\"\n",
"SELECT o.order_id, o.customer_id, o.order_ts, o.amount,\n",
" d.city AS customer_city_at_order\n",
"FROM raw_orders o\n",
"LEFT JOIN dim_customer d\n",
" ON o.customer_id = d.id\n",
" AND o.order_ts BETWEEN d.start_time AND d.end_time\n",
"\"\"\"\n",
"pit_pipeline = Pipeline([(\"pit_join\", sql_pit)], dialect=\"bigquery\")\n",
"\n",
"if shutil.which(\"dot\") is None:\n",
" print(\"\\u26a0\\ufe0f Graphviz not installed. Install with: brew install graphviz\")\n",
"else:\n",
" print(\"Point-in-Time Join \\u2014 Column Lineage (value + predicate edges):\")\n",
" display(visualize_pipeline_lineage(pit_pipeline.column_graph.to_simplified()))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading
Loading