Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/dbt_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ models:
+file_format: "{{ 'delta' if target.type in ['spark', 'fabricspark'] else none }}"

flags:
require_batched_execution_for_custom_microbatch_strategy: True
require_batched_execution_for_custom_microbatch_strategy: True
23 changes: 18 additions & 5 deletions integration_tests/dbt_project/macros/get_anomaly_config.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
{% macro get_anomaly_config(model_config, config) %}
{% macro get_anomaly_config(model_config, config, source_meta_config=none) %}
{{
return(
adapter.dispatch("get_anomaly_config", "elementary")(model_config, config)
adapter.dispatch("get_anomaly_config", "elementary")(
model_config, config, source_meta_config
)
)
}}
{% endmacro %}

{% macro default__get_anomaly_config(model_config, config) %}
{% macro default__get_anomaly_config(model_config, config, source_meta_config=none) %}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
{% set mock_model = {
"alias": "mock_model",
"config": {"elementary": model_config},
} %}
{% if source_meta_config is not none %}
{% do mock_model.update({"source_meta": {"elementary": source_meta_config}}) %}
{% endif %}
{# trick elementary into thinking this is the running model #}
{% do context.update(
{
Expand All @@ -25,11 +30,14 @@
) %}
{% endmacro %}

{% macro spark__get_anomaly_config(model_config, config) %}
{% macro spark__get_anomaly_config(model_config, config, source_meta_config=none) %}
{% set mock_model = {
"alias": "mock_model",
"config": {"elementary": model_config},
} %}
{% if source_meta_config is not none %}
{% do mock_model.update({"source_meta": {"elementary": source_meta_config}}) %}
{% endif %}
{# trick elementary into thinking this is the running model #}
{% do context.update(
{
Expand All @@ -44,11 +52,16 @@
) %}
{% endmacro %}

{% macro clickhouse__get_anomaly_config(model_config, config) %}
{% macro clickhouse__get_anomaly_config(
model_config, config, source_meta_config=none
) %}
{% set mock_model = {
"alias": "mock_model",
"config": {"elementary": model_config},
} %}
{% if source_meta_config is not none %}
{% do mock_model.update({"source_meta": {"elementary": source_meta_config}}) %}
{% endif %}
{# trick elementary into thinking this is the running model #}
{% do context.update(
{
Expand Down
34 changes: 34 additions & 0 deletions integration_tests/tests/test_anomaly_test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,37 @@ def test_anomaly_test_configuration(
)
adapted_config = json.loads(result[0])
assert adapted_config == expected_config


@pytest.mark.skip_for_dbt_fusion
def test_source_meta_config_is_picked_up(dbt_project: DbtProject):
"""source-level meta.elementary is inherited when no model/test-level config overrides it."""
result = dbt_project.dbt_runner.run_operation(
"elementary_tests.get_anomaly_config",
macro_args={
"model_config": {},
"config": {},
"source_meta_config": {
"timestamp_column": "source_ts",
"where_expression": "is_deleted = false",
},
},
)
config = json.loads(result[0])
assert config["timestamp_column"] == "source_ts"
assert config["where_expression"] == "is_deleted = false"


@pytest.mark.skip_for_dbt_fusion
def test_model_meta_overrides_source_meta_config(dbt_project: DbtProject):
"""Table-level meta.elementary takes precedence over source-level meta.elementary."""
result = dbt_project.dbt_runner.run_operation(
"elementary_tests.get_anomaly_config",
macro_args={
"model_config": {"timestamp_column": "model_ts"},
"config": {},
"source_meta_config": {"timestamp_column": "source_ts"},
},
)
config = json.loads(result[0])
assert config["timestamp_column"] == "model_ts"
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from contextlib import contextmanager

import pytest

from dbt_project import DbtProject


Expand Down Expand Up @@ -44,20 +43,30 @@ def _microbatch_model_sql(source_model_name: str) -> str:
amount,
order_date
from {{ ref('__MICROBATCH_SOURCE_MODEL__') }}
""".replace("__MICROBATCH_SOURCE_MODEL__", source_model_name)
""".replace(
"__MICROBATCH_SOURCE_MODEL__", source_model_name
)


@contextmanager
def _with_microbatch_test_models(dbt_project: DbtProject, model_suffix: str):
source_model_name = f"mb_src_{model_suffix}"
target_model_name = f"mb_tgt_{model_suffix}"
source_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{source_model_name}.sql")
target_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{target_model_name}.sql")
source_model_path = dbt_project.tmp_models_dir_path.joinpath(
f"{source_model_name}.sql"
)
target_model_path = dbt_project.tmp_models_dir_path.joinpath(
f"{target_model_name}.sql"
)

source_model_path.write_text(_microbatch_source_model_sql())
target_model_path.write_text(_microbatch_model_sql(source_model_name))
relative_source_model_path = source_model_path.relative_to(dbt_project.project_dir_path)
relative_target_model_path = target_model_path.relative_to(dbt_project.project_dir_path)
relative_source_model_path = source_model_path.relative_to(
dbt_project.project_dir_path
)
relative_target_model_path = target_model_path.relative_to(
dbt_project.project_dir_path
)
try:
yield relative_source_model_path, relative_target_model_path, target_model_name
finally:
Expand All @@ -75,9 +84,7 @@ def _run_microbatch_model_and_get_latest_success_result(
model_path,
target_model_name,
):
dbt_project.dbt_runner.run(
select=f"{source_model_path} {model_path}"
)
dbt_project.dbt_runner.run(select=f"{source_model_path} {model_path}")

unique_id = f"model.elementary_tests.{target_model_name}"
run_results = dbt_project.read_table(
Expand All @@ -91,14 +98,14 @@ def _run_microbatch_model_and_get_latest_success_result(

@contextmanager
def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str):
macro_path = (
dbt_project.project_dir_path / "macros" / "microbatch.sql"
)
macro_path = dbt_project.project_dir_path / "macros" / "microbatch.sql"
macro_sql = """
{% macro __MACRO_NAME__(arg_dict) %}
{{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }}
{% endmacro %}
""".replace("__MACRO_NAME__", macro_name)
""".replace(
"__MACRO_NAME__", macro_name
)
if macro_path.exists():
raise FileExistsError(f"Expected no macro file at {macro_path}")

Expand All @@ -110,7 +117,9 @@ def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str):
macro_path.unlink()


@pytest.mark.skip_targets(["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"])
@pytest.mark.skip_targets(
["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"]
)
@pytest.mark.skip_for_dbt_fusion
@pytest.mark.parametrize(
"macro_name,expected_compiled_code,model_suffix",
Expand All @@ -134,10 +143,10 @@ def test_microbatch_run_results_compiled_code_behavior(
)
assert run_results, "Expected a successful run result row for microbatch model"
if expected_compiled_code:
assert run_results[0]["compiled_code"], (
"Expected compiled_code to be populated when override macro is present"
)
assert run_results[0][
"compiled_code"
], "Expected compiled_code to be populated when override macro is present"
else:
assert not run_results[0]["compiled_code"], (
"Expected compiled_code to stay empty when override macro is absent"
)
assert not run_results[0][
"compiled_code"
], "Expected compiled_code to stay empty when override macro is absent"
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@
model.get("unique_id") if model is mapping else model.unique_id
) | default(none, true) %}
{% set model_compiled_code = (
model.get("compiled_code") if model is mapping else model.compiled_code
model.get("compiled_code")
if model is mapping
else model.compiled_code
) | default(none, true) %}
{% if model_unique_id is none %}
{{ return(none) }}
{% endif %}
{% if not model_compiled_code %}
{{ return(none) }}
{% endif %}
{% if model_unique_id is none %} {{ return(none) }} {% endif %}
{% if not model_compiled_code %} {{ return(none) }} {% endif %}

{% set compiled_code_by_unique_id = elementary.get_cache(
"microbatch_compiled_code_by_unique_id"
Expand Down
4 changes: 3 additions & 1 deletion macros/utils/graph/get_compiled_code.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
"microbatch_compiled_code_by_unique_id", {}
).get(node.get("unique_id")) %}
{% endif %}
{% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")(compiled_code) %}
{% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")(
compiled_code
) %}

{% set max_column_size = elementary.get_column_size() %}
{% if as_column_value and max_column_size and compiled_code and compiled_code | length > max_column_size %}
Expand Down
33 changes: 16 additions & 17 deletions macros/utils/graph/get_elementary_config_from_node.sql
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
{% macro get_elementary_config_from_node(node) %}
{% set res = {} %}
{% set node_config = node.get("config") %}
{% if node_config %}
{% set elementary_config = node.config.get("elementary") %}
{% macro _merge_elementary_from_meta(res, meta_dict) %}
{% if meta_dict and meta_dict is mapping %}
{% set elementary_config = meta_dict.get("elementary") %}
{% if elementary_config and elementary_config is mapping %}
{% do res.update(elementary_config) %}
{% endif %}
{% set config_meta = node.config.get("meta") %}
{% if config_meta and config_meta is mapping %}
{% set elementary_config = config_meta.get("elementary") %}
{% if elementary_config and elementary_config is mapping %}
{% do res.update(elementary_config) %}
{% endif %}
{% endif %}
{% endif %}
{% set node_meta = node.get("meta") %}
{% if node_meta and node_meta is mapping %}
{% set elementary_config = node_meta.get("elementary") %}
{% if elementary_config and elementary_config is mapping %}
{% do res.update(elementary_config) %}
{% endmacro %}

{% macro get_elementary_config_from_node(node) %}
{% set res = {} %}
{% set node_config = node.get("config") %}
{% if node_config %}
{# config.elementary is already the elementary config dict itself (not nested) #}
{% set config_elementary = node.config.get("elementary") %}
{% if config_elementary and config_elementary is mapping %}
{% do res.update(config_elementary) %}
{% endif %}
{% do elementary._merge_elementary_from_meta(res, node.config.get("meta")) %}
{% endif %}
{% do elementary._merge_elementary_from_meta(res, node.get("source_meta")) %}
{% do elementary._merge_elementary_from_meta(res, node.get("meta")) %}
{{ return(res) }}
{% endmacro %}
Loading