Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions js/packages/ui/src/api/flag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { AxiosInstance, AxiosResponse } from "axios";
export interface RecceServerFlags {
single_env_onboarding: boolean;
show_relaunch_hint: boolean;
disable_cll_cache: boolean;
}

/**
Expand Down
216 changes: 171 additions & 45 deletions recce/adapter/dbt_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ class DbtAdapter(BaseAdapter):
manifest: Manifest = None
previous_state: PreviousState = None
target_path: str = None
_full_cll_map: Optional["CllData"] = None
curr_manifest: WritableManifest = None
curr_catalog: CatalogArtifact = None
base_path: str = None
Expand Down Expand Up @@ -535,6 +536,9 @@ def load_artifacts(self):
self.base_manifest = base_manifest
self.base_catalog = base_catalog

# clear cached CLL data
self._full_cll_map = None

# set the manifest
self.manifest = as_manifest(curr_manifest)
self.previous_state = previous_state(
Expand Down Expand Up @@ -985,6 +989,75 @@ def _get_schema(lineage):
node_diff.change = change
return node_diff

def build_full_cll_map(self) -> CllData:
"""Build the full CLL map for every node in the manifest. Result is cached."""
if self._full_cll_map is not None:
return self._full_cll_map

manifest = self.curr_manifest

# Collect all node IDs from all resource types
all_node_ids = set()
for key in ["sources", "nodes", "exposures", "metrics"]:
attr = getattr(manifest, key)
all_node_ids.update(set(attr.keys()))
if hasattr(manifest, "semantic_models"):
attr = getattr(manifest, "semantic_models")
all_node_ids.update(set(attr.keys()))

nodes = {}
columns = {}
parent_map = {}

for node_id in all_node_ids:
cll_data_one = deepcopy(self.get_cll_cached(node_id, base=False))
if cll_data_one is None:
continue

nodes[node_id] = cll_data_one.nodes.get(node_id)
for c_id, c in cll_data_one.columns.items():
columns[c_id] = c
for p_id, parents in cll_data_one.parent_map.items():
parent_map[p_id] = parents

# Build the child map from parent_map
child_map = {}
for parent_id, parents in parent_map.items():
for parent in parents:
if parent not in child_map:
child_map[parent] = set()
child_map[parent].add(parent_id)

# Add change analysis for all nodes
for node_id, node in nodes.items():
node_diff = self.get_change_analysis_cached(node_id)
if node_diff is not None:
node.change_status = node_diff.change_status
if node_diff.change is not None:
node.change_category = node_diff.change.category
# Annotate columns with change status
for c_name, col in node.columns.items():
col_id = f"{node_id}_{c_name}"
col_obj = columns.get(col_id)
if col_obj is None:
continue
if node_diff.change_status == "added":
col_obj.change_status = "added"
elif node_diff.change_status == "removed":
col_obj.change_status = "removed"
elif node_diff.change is not None and node_diff.change.columns is not None:
column_diff = node_diff.change.columns.get(c_name)
if column_diff:
col_obj.change_status = column_diff

self._full_cll_map = CllData(
nodes=nodes,
columns=columns,
parent_map=parent_map,
child_map=child_map,
)
return self._full_cll_map

def get_cll(
self,
node_id: Optional[str] = None,
Expand All @@ -994,6 +1067,8 @@ def get_cll(
no_upstream: Optional[bool] = False,
no_downstream: Optional[bool] = False,
no_filter: Optional[bool] = False,
full_map: Optional[bool] = False,
disable_cll_cache: Optional[bool] = False,
) -> CllData:
cll_tracker = LineagePerfTracker()
cll_tracker.set_params(
Expand All @@ -1006,6 +1081,17 @@ def get_cll(
)
cll_tracker.start_column_lineage()

# Full map mode: return the complete pre-computed CLL map.
# Note: change_analysis flag is ignored — full map always includes
# change metadata since build_full_cll_map() computes it unconditionally.
if full_map:
result = deepcopy(self.build_full_cll_map())
cll_tracker.end_column_lineage()
cll_tracker.set_total_nodes(len(result.nodes) + len(result.columns))
log_performance("column level lineage [full_map]", cll_tracker.to_dict())
cll_tracker.reset()
return result

manifest = self.curr_manifest
manifest_dict = manifest.to_dict()

Expand All @@ -1029,44 +1115,75 @@ def get_cll(
cll_node_ids = cll_node_ids.union(find_downstream(cll_node_ids, manifest_dict.get("child_map")))

if not no_cll:
allowed_related_nodes = set()
for key in ["sources", "nodes", "exposures", "metrics"]:
attr = getattr(manifest, key)
allowed_related_nodes.update(set(attr.keys()))
if hasattr(manifest, "semantic_models"):
attr = getattr(manifest, "semantic_models")
allowed_related_nodes.update(set(attr.keys()))
for cll_node_id in cll_node_ids:
if cll_node_id not in allowed_related_nodes:
continue
cll_data_one = deepcopy(self.get_cll_cached(cll_node_id, base=False))
cll_tracker.increment_cll_nodes()
if cll_data_one is None:
continue
if not disable_cll_cache:
# Full map path: build entire CLL map once (cached), then slice
full_map_data = self.build_full_cll_map()

nodes[cll_node_id] = cll_data_one.nodes.get(cll_node_id)
node_diff = None
if change_analysis:
node_diff = self.get_change_analysis_cached(cll_node_id)
cll_tracker.increment_change_analysis_nodes()
if node_diff is not None:
nodes[cll_node_id].change_status = node_diff.change_status
if node_diff.change is not None:
nodes[cll_node_id].change_category = node_diff.change.category
for c_id, c in cll_data_one.columns.items():
columns[c_id] = c
for cll_node_id in cll_node_ids:
if cll_node_id not in full_map_data.nodes:
continue
cll_tracker.increment_cll_nodes()
nodes[cll_node_id] = deepcopy(full_map_data.nodes[cll_node_id])

for c_name in nodes[cll_node_id].columns:
col_id = f"{cll_node_id}_{c_name}"
if col_id in full_map_data.columns:
columns[col_id] = deepcopy(full_map_data.columns[col_id])

for key in list(nodes.keys()) + list(columns.keys()):
if key in full_map_data.parent_map:
parent_map[key] = set(full_map_data.parent_map[key])

if not change_analysis:
for node in nodes.values():
node.change_status = None
node.change_category = None
node.impacted = None
for col in columns.values():
col.change_status = None
else:
for _ in nodes:
cll_tracker.increment_change_analysis_nodes()
else:
# Per-node path: compute CLL individually per node (original behavior)
allowed_related_nodes = set()
for key in ["sources", "nodes", "exposures", "metrics"]:
attr = getattr(manifest, key)
allowed_related_nodes.update(set(attr.keys()))
if hasattr(manifest, "semantic_models"):
attr = getattr(manifest, "semantic_models")
allowed_related_nodes.update(set(attr.keys()))
for cll_node_id in cll_node_ids:
if cll_node_id not in allowed_related_nodes:
continue
cll_data_one = deepcopy(self.get_cll_cached(cll_node_id, base=False))
cll_tracker.increment_cll_nodes()
if cll_data_one is None:
continue

nodes[cll_node_id] = cll_data_one.nodes.get(cll_node_id)
node_diff = None
if change_analysis:
node_diff = self.get_change_analysis_cached(cll_node_id)
cll_tracker.increment_change_analysis_nodes()
if node_diff is not None:
if node_diff.change_status == "added":
c.change_status = "added"
elif node_diff.change_status == "removed":
c.change_status = "removed"
elif node_diff.change is not None and node_diff.change.columns is not None:
column_diff = node_diff.change.columns.get(c.name)
if column_diff:
c.change_status = column_diff

for p_id, parents in cll_data_one.parent_map.items():
parent_map[p_id] = parents
nodes[cll_node_id].change_status = node_diff.change_status
if node_diff.change is not None:
nodes[cll_node_id].change_category = node_diff.change.category
for c_id, c in cll_data_one.columns.items():
columns[c_id] = c
if node_diff is not None:
if node_diff.change_status == "added":
c.change_status = "added"
elif node_diff.change_status == "removed":
c.change_status = "removed"
elif node_diff.change is not None and node_diff.change.columns is not None:
column_diff = node_diff.change.columns.get(c.name)
if column_diff:
c.change_status = column_diff

for p_id, parents in cll_data_one.parent_map.items():
parent_map[p_id] = parents
else:
for cll_node_id in cll_node_ids:
cll_node = None
Expand Down Expand Up @@ -1216,7 +1333,10 @@ def get_cll(

cll_tracker.end_column_lineage()
cll_tracker.set_total_nodes(len(nodes) + len(columns))
log_performance("column level lineage", cll_tracker.to_dict())
if not disable_cll_cache:
log_performance("column level lineage [cached_full_map]", cll_tracker.to_dict())
else:
log_performance("column level lineage", cll_tracker.to_dict())
cll_tracker.reset()

return CllData(
Expand Down Expand Up @@ -1580,23 +1700,29 @@ def refresh(self, refresh_file_path: str = None):
# In single environment mode (target_path is equal to base_path),
# we capture the original manifest as base and only update the current
target_type = os.path.basename(os.path.dirname(refresh_file_path))
if self.target_path and target_type == os.path.basename(self.target_path):
is_curr = self.target_path and target_type == os.path.basename(self.target_path)
is_base = self.base_path and target_type == os.path.basename(self.base_path)

if not is_curr and not is_base:
return

if is_curr:
if refresh_file_path.endswith("manifest.json"):
self.curr_manifest = load_manifest(path=refresh_file_path)
self.manifest = as_manifest(self.curr_manifest)
self.get_cll_cached.cache_clear()
self.get_change_analysis_cached.cache_clear()
elif refresh_file_path.endswith("catalog.json"):
self.curr_catalog = load_catalog(path=refresh_file_path)
self.get_cll_cached.cache_clear()
self.get_change_analysis_cached.cache_clear()
elif self.base_path and target_type == os.path.basename(self.base_path):
# Current artifact changes invalidate per-node CLL cache
self.get_cll_cached.cache_clear()
elif is_base:
if refresh_file_path.endswith("manifest.json"):
self.base_manifest = load_manifest(path=refresh_file_path)
self.get_change_analysis_cached.cache_clear()
elif refresh_file_path.endswith("catalog.json"):
self.base_catalog = load_catalog(path=refresh_file_path)
self.get_change_analysis_cached.cache_clear()

# Any artifact change invalidates change analysis and full map
self.get_change_analysis_cached.cache_clear()
self._full_cll_map = None

def create_relation(self, model, base=False):
node = self.find_node_by_name(model, base)
Expand Down
5 changes: 5 additions & 0 deletions recce/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ def diff(sql, primary_keys: List[str] = None, keep_shape: bool = False, keep_equ
)
@click.option("--review", is_flag=True, help="Open the state file in the review mode.")
@click.option("--single-env", is_flag=True, help="Launch in single environment mode directly.")
@click.option("--disable-cll-cache", is_flag=True, help="Disable the pre-cached full column-level lineage map.", envvar="DISABLE_CLL_CACHE")
@add_options(dbt_related_options)
@add_options(sqlmesh_related_options)
@add_options(recce_options)
Expand Down Expand Up @@ -555,6 +556,7 @@ def server(host, port, lifetime, idle_timeout=0, state_file=None, **kwargs):
"show_relaunch_hint": False,
"preview": False,
"read_only": False,
"disable_cll_cache": False,
}
console = Console()

Expand Down Expand Up @@ -591,6 +593,9 @@ def server(host, port, lifetime, idle_timeout=0, state_file=None, **kwargs):
elif server_mode == RecceServerMode.read_only:
flag["read_only"] = True

if kwargs.get("disable_cll_cache", False):
flag["disable_cll_cache"] = True

# Create state loader using shared function
from recce.util.startup_perf import get_startup_tracker

Expand Down
1 change: 1 addition & 0 deletions recce/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
RecceStateLoader,
RecceStateMetadata,
)

logger = logging.getLogger("uvicorn")


Expand Down
6 changes: 6 additions & 0 deletions recce/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ class CllIn(BaseModel):
no_cll: Optional[bool] = False
no_upstream: Optional[bool] = False
no_downstream: Optional[bool] = False
full_map: Optional[bool] = False


class CllOutput(BaseModel):
Expand All @@ -640,6 +641,9 @@ class CllOutput(BaseModel):
async def column_level_lineage_by_node(cll_input: CllIn):
from recce.adapter.dbt_adapter import DbtAdapter

app_state: AppState = app.state
disable_cll_cache = app_state.flag.get("disable_cll_cache", False) if app_state.flag else False

dbt_adapter: DbtAdapter = default_context().adapter
cll = dbt_adapter.get_cll(
node_id=cll_input.node_id,
Expand All @@ -648,6 +652,8 @@ async def column_level_lineage_by_node(cll_input: CllIn):
no_upstream=cll_input.no_upstream,
no_downstream=cll_input.no_downstream,
no_cll=cll_input.no_cll,
full_map=cll_input.full_map,
disable_cll_cache=disable_cll_cache,
)

return CllOutput(current=cll)
Expand Down
18 changes: 18 additions & 0 deletions tests/adapter/dbt_adapter/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import wraps
from unittest.mock import patch

import pytest
Expand All @@ -15,3 +16,20 @@ def dbt_test_helper():
set_default_context(context)
yield helper
helper.cleanup()


@pytest.fixture(params=[False, True], ids=["cll_cached", "cll_no_cache"])
def disable_cll_cache(request, dbt_test_helper):
"""Parametrized fixture that patches get_cll to inject disable_cll_cache."""
disabled = request.param
adapter = dbt_test_helper.context.adapter
original_get_cll = adapter.get_cll.__func__

@wraps(original_get_cll)
def patched_get_cll(self, *args, **kwargs):
kwargs.setdefault("disable_cll_cache", disabled)
return original_get_cll(self, *args, **kwargs)

adapter.get_cll = patched_get_cll.__get__(adapter, type(adapter))
yield disabled
adapter.get_cll = original_get_cll.__get__(adapter, type(adapter))
Loading
Loading