Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datajunction-server/datajunction_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ class Settings(BaseSettings): # pragma: no cover
# $dj_logical_timestamp
dj_logical_timestamp_format: Optional[str] = "${dj_logical_timestamp}"

# Prefix applied to Druid datasource names built by ``build_druid_spec``.
# All DJ envs share a single Druid cluster; the prefix env-tags datasources
# so test/prod cubes with the same definition don't collide. Default is the
# prod value; set ``DRUID_DATASOURCE_PREFIX=dj_test__`` in the test deploy.
druid_datasource_prefix: str = "dj__"

# DJ UI host, used for OAuth redirection
frontend_host: Optional[str] = "http://localhost:3000"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""Helper functions related to cube materializations."""

import itertools
from types import SimpleNamespace

from sqlalchemy.ext.asyncio import AsyncSession

from datajunction_server.sql.parsing.backends.antlr4 import parse
from datajunction_server.internal.sql import get_measures_query
from datajunction_server.construction.build_v3.builder import build_measures_sql
from datajunction_server.construction.build_v3.types import (
BuildContext,
DecomposedMetricInfo,
GrainGroupSQL,
)
from datajunction_server.database.node import Column, NodeRevision
from datajunction_server.errors import DJInvalidInputException
from datajunction_server.models.column import SemanticType
Expand All @@ -17,9 +23,13 @@
MeasuresMaterialization,
UpsertCubeMaterialization,
)
from datajunction_server.models.dialect import Dialect
from datajunction_server.models.materialization import MaterializationStrategy
from datajunction_server.models.node_type import NodeNameVersion
from datajunction_server.models.partition import Granularity
from datajunction_server.models.query import ColumnMetadata
from datajunction_server.sql.parsing import ast
from datajunction_server.utils import SEPARATOR


def generate_partition_filter_sql(
Expand Down Expand Up @@ -152,6 +162,172 @@ def _extract_expression(metric_query: str) -> str:
)


def _v3_col_to_model_column(col) -> ColumnMetadata:
"""
Convert a v3 ``ColumnMetadata`` (dataclass keyed on ``semantic_name``) into
the persisted ``models.query.ColumnMetadata`` (BaseModel keyed on
``semantic_entity``) shape that ``MeasuresMaterialization.from_measures_query``
consumes. v3's ``"metric"`` / ``"metric_component"`` / ``"metric_input"``
semantic types collapse to v2's ``"measure"`` so the downstream
``SemanticType.DIMENSION`` / ``"measure"`` branching keeps working.
"""
semantic_entity = col.semantic_name
column_name = None
node_name = None
if semantic_entity and SEPARATOR in semantic_entity:
column_name = semantic_entity.rsplit(SEPARATOR, 1)[-1]
node_name = semantic_entity.rsplit(SEPARATOR, 1)[0]
v3_type = col.semantic_type
semantic_type = (
"measure"
if v3_type in ("metric", "metric_component", "metric_input")
else v3_type
)
# v3's ``col.name`` is already the SQL alias in the generated query.
# Downstream ``MeasuresMaterialization.from_measures_query`` matches partition
# columns by ``semantic_entity`` (not by ``name``), so we just pass the v3
# alias through unchanged for both dims and measures.
return ColumnMetadata(
name=col.name,
type=col.type,
column=column_name,
node=node_name,
semantic_entity=semantic_entity,
semantic_type=semantic_type,
)


async def _v3_grain_group_to_measures_query(
session: AsyncSession,
gg: GrainGroupSQL,
ctx: BuildContext,
decomposed_metrics: dict[str, DecomposedMetricInfo],
):
"""
Adapt a v3 ``GrainGroupSQL`` into the v2 measures-query shape so the
rest of ``build_cube_materialization`` (and
``MeasuresMaterialization.from_measures_query``) can stay unchanged.
Async because we may need to refresh expired ORM attributes on the parent
fact / source nodes that v3 left lazily-loaded.
"""
from datajunction_server.models.node_type import NodeType # noqa: PLC0415
from datajunction_server.utils import refresh_if_needed # noqa: PLC0415

parent_node = ctx.nodes.get(gg.parent_name)
if not parent_node or not parent_node.current: # pragma: no cover
raise DJInvalidInputException(
f"Parent node `{gg.parent_name}` not found in build context",
)
rev = parent_node.current
# v3's ``ctx.nodes`` may carry NodeRevisions whose scalar attrs got
# expired by an earlier ``session.commit()`` in the calling endpoint.
# Pull what we need before reading.
await refresh_if_needed(session, rev, ["name", "version", "display_name"])

# v3 doesn't expose the per-grain-group source set on the public
# ``GrainGroupSQL`` (it's tracked internally for scan-estimation only).
# Walk ``ctx.parent_map`` from this grain group's parent fact and
# collect any ``SOURCE`` ancestors so the cube workflow gets the same
# ``catalog.schema.table`` upstream list v2 produced.
upstream_tables: list[str] = []
seen_sources: set[str] = set()
visited: set[str] = set()

async def _walk(node_name: str) -> None:
if node_name in visited:
return
visited.add(node_name)
node = ctx.nodes.get(node_name)
if not node or not node.current:
return
if node.type == NodeType.SOURCE:
srev = node.current
await refresh_if_needed(
session,
srev,
["catalog", "schema_", "table"],
)
if (
node.name not in seen_sources
and srev.catalog
and srev.schema_
and srev.table
):
seen_sources.add(node.name)
upstream_tables.append(
f"{srev.catalog.name}.{srev.schema_}.{srev.table}",
)
return
for parent_name in ctx.parent_map.get(node_name, []):
await _walk(parent_name)

await _walk(gg.parent_name)
# Dimension columns are joined in via dimension links rather than parent
# edges, so their source tables don't appear under ``gg.parent_name``'s
# ``parent_map``. Walk each dimension column's owning node too so the
# upstream-table list reflects everything the materialization SQL reads.
for col in gg.columns:
if col.semantic_type != "dimension":
continue
sem = col.semantic_name
if not sem or SEPARATOR not in sem:
continue
# Strip optional ``[role]`` suffix to get the bare ``node.column``.
bare_sem = sem.split("[", 1)[0]
dim_node_name = bare_sem.rsplit(SEPARATOR, 1)[0]
await _walk(dim_node_name)

metrics: dict[str, tuple[list, str]] = {}
for metric_name in gg.metrics:
info = decomposed_metrics.get(metric_name)
if info is None: # pragma: no cover
continue
# v2's "combiner" is the full derived SELECT statement (e.g. ``SELECT
# SUM(...) FROM ...``); ``_extract_expression`` and the V1 cube config's
# ``derived_expression`` field both rely on that shape. v3 exposes the
# full query AST separately on ``derived_ast``; stringify it so the
# downstream parser-based extraction stays happy.
metrics[metric_name] = (
info.components,
" ".join(str(info.derived_ast).split()),
)

columns_raw = [_v3_col_to_model_column(c) for c in gg.columns]
# For measure columns v3's semantic_name is "namespace.metric:component" which
# gives the wrong node/column when split on ".". Normalize them to the v2 shape
# so downstream consumers see "parent_fact_node.component_name".
parent_node_name = gg.parent_name
columns = []
for col in columns_raw:
if col.semantic_type == "measure":
columns.append(
ColumnMetadata(
name=col.name,
type=col.type,
column=col.name,
node=parent_node_name,
semantic_entity=f"{parent_node_name}.{col.name}",
semantic_type=col.semantic_type,
),
)
else:
columns.append(col)
grain = list(gg.grain)
return SimpleNamespace(
node=NodeNameVersion(
name=rev.name,
version=rev.version,
display_name=rev.display_name,
),
grain=grain,
columns=columns,
metrics=metrics,
sql=gg.sql,
spark_conf=None,
upstream_tables=sorted(upstream_tables),
)


async def build_cube_materialization(
session: AsyncSession,
current_revision: NodeRevision,
Expand All @@ -162,20 +338,36 @@ async def build_cube_materialization(
"""
temporal_partitions = current_revision.temporal_partition_columns()
temporal_partition = temporal_partitions[0]
measures_queries = await get_measures_query(
session=session,
metrics=current_revision.cube_node_metrics,
dimensions=current_revision.cube_node_dimensions,
filters=(current_revision.cube_filters or [])
+ [
incremental = upsert_input.strategy == MaterializationStrategy.INCREMENTAL_TIME
extra_filters = (
[
generate_partition_filter_sql(
temporal_partition,
upsert_input.lookback_window, # type: ignore
),
],
include_all_columns=False,
]
if incremental
else []
)
result = await build_measures_sql(
session=session,
metrics=current_revision.cube_node_metrics,
dimensions=current_revision.cube_node_dimensions,
filters=(current_revision.cube_filters or []) + extra_filters,
dialect=Dialect.SPARK,
use_materialized=True,
preagg_requested=True,
)
measures_queries = sorted(
[
await _v3_grain_group_to_measures_query(
session,
gg,
result.ctx,
result.decomposed_metrics,
)
for gg in result.grain_groups
],
key=lambda q: (-len(q.metrics), q.node.name),
)
query_grains = {
k: [q.node.name for q in queries]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ def from_measures_query(cls, measures_query, temporal_partition):
"""
Builds a MeasuresMaterialization object from a measures query.
"""
metric_components = list(
metric_components = sorted(
{
component.name: component
for metric, (components, combiner) in measures_query.metrics.items()
for component in components
}.values(),
key=lambda c: c.name,
)
dimensional_metric_components = [
component.name
Expand Down Expand Up @@ -345,7 +346,11 @@ def build_druid_spec(self):
" on this cube or it cannot be materialized to Druid.",
)

druid_datasource_name = f"dj__{self.output_table_name}"
from datajunction_server.utils import get_settings # noqa: PLC0415

druid_datasource_name = (
f"{get_settings().druid_datasource_prefix}{self.output_table_name}"
)

# if there are categorical partitions, we can additionally include one of them
# in the partitionDimension field under partitionsSpec
Expand Down
Loading
Loading