Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c4dc7e5
Remove dbt-fusion version pin to use latest version
devin-ai-integration[bot] Jan 27, 2026
dce9786
Restore .dbt-fusion-version with 'latest' to use latest dbt-fusion ve…
devin-ai-integration[bot] Jan 27, 2026
ba6ad18
Update CI to use Python 3.10 and latest dbt-fusion version
devin-ai-integration[bot] Jan 27, 2026
f3a028c
Restore .dbt-fusion-version with version 2.0.0-preview.102
devin-ai-integration[bot] Jan 27, 2026
aee2e14
Fix: Handle None ignore_small_changes in dbt-fusion
devin-ai-integration[bot] Jan 27, 2026
8e97e3e
Add debug logs to investigate dbt-fusion test failures
devin-ai-integration[bot] Jan 27, 2026
c8f162c
Skip dbt-fusion incompatible tests and remove debug logs
devin-ai-integration[bot] Jan 27, 2026
026941a
Add debug log to find_normalized_data_type_for_column to investigate …
devin-ai-integration[bot] Jan 27, 2026
32ee916
Fix timestamp_ltz/timestamp_ntz recognition for Databricks/Spark
devin-ai-integration[bot] Jan 27, 2026
63b4f01
Fix test_sample_count_unlimited: put meta at top level for dbt-fusion…
devin-ai-integration[bot] Jan 28, 2026
0695ebc
Revert meta top-level change: dbt-fusion says it's deprecated, re-add…
devin-ai-integration[bot] Jan 28, 2026
fe287e2
fix test_override_samples_config and allow -1 for unlimited samples
haritamar Jan 29, 2026
395a1d1
dbt invocations fusion fixes
haritamar Jan 29, 2026
65b478a
update dbt fusion version
haritamar Jan 29, 2026
21ce063
exposure schema validity - works in fusion
haritamar Jan 29, 2026
0103674
error statuses now properly supported
haritamar Jan 29, 2026
9dbc59f
failed row count now works in fusion
haritamar Jan 29, 2026
1ab423e
Fix test_schema_changes: clear dbt-fusion schema cache after seeding
devin-ai-integration[bot] Jan 30, 2026
e8f8dcd
Add 'numeric' to data type lists for targets missing it (spark, athen…
devin-ai-integration[bot] Jan 30, 2026
abb8112
Fix race condition: remove global cache clearing that caused parallel…
devin-ai-integration[bot] Jan 30, 2026
62b1122
Workaround dbt-fusion 2.0.0-preview.104 Redshift temp table bug
haritamar Feb 4, 2026
378a175
Complete dbt-fusion Redshift temp table workaround
haritamar Feb 9, 2026
d01e198
Fix dbt-fusion Redshift temp table panic with comprehensive workaround
haritamar Feb 9, 2026
dd797c6
Fix dbt-fusion Redshift: temp tables incompatible with connection poo…
haritamar Feb 9, 2026
8d113bd
Remove unused .dbt-fusion-version file
haritamar Feb 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .dbt-fusion-version

This file was deleted.

3 changes: 1 addition & 2 deletions .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ jobs:
- name: Install dbt-fusion
if: inputs.dbt-version == 'fusion'
run: |
FUSION_VERSION=$(cat dbt-data-reliability/.dbt-fusion-version)
curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --version "$FUSION_VERSION"
curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s --

- name: Install Elementary
run: pip install "./elementary[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || inputs.warehouse-type }}]"
Expand Down
1 change: 0 additions & 1 deletion integration_tests/tests/test_collect_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def test_collect_group_by_metrics(test_id: str, dbt_project: DbtProject):

# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
@pytest.mark.skip_for_dbt_fusion
def test_collect_metrics_unique_metric_name(test_id: str, dbt_project: DbtProject):
args = DBT_TEST_ARGS.copy()
args["metrics"].append(args["metrics"][0])
Expand Down
68 changes: 52 additions & 16 deletions integration_tests/tests/test_exposure_schema_validity.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
import json

import pytest
from dbt_project import DbtProject

DBT_TEST_NAME = "elementary.exposure_schema_validity"


INVALID_EXPOSURES_QUERY = """
with latest_elementary_test_result as (
select id
from {{{{ ref("elementary_test_results") }}}}
where lower(table_name) = lower('{test_id}')
order by created_at desc
limit 1
)

select result_row
from {{{{ ref("test_result_rows") }}}}
where elementary_test_results_id in (select * from latest_elementary_test_result)
"""


def seed(dbt_project: DbtProject):
seed_result = dbt_project.dbt_runner.seed(full_refresh=True)
assert seed_result is True


@pytest.mark.skip_for_dbt_fusion
def test_exposure_schema_validity_existing_exposure_yml_invalid(
test_id: str, dbt_project: DbtProject
):
Expand All @@ -28,7 +44,6 @@ def test_exposure_schema_validity_existing_exposure_yml_invalid(
assert test_result.success is False


@pytest.mark.skip_for_dbt_fusion
def test_exposure_schema_validity_existing_exposure_yml_valid(
test_id: str, dbt_project: DbtProject
):
Expand All @@ -47,15 +62,13 @@ def test_exposure_schema_validity_existing_exposure_yml_valid(


@pytest.mark.skip_targets(["spark"])
@pytest.mark.skip_for_dbt_fusion
def test_exposure_schema_validity_no_exposures(test_id: str, dbt_project: DbtProject):
test_result = dbt_project.test(test_id, DBT_TEST_NAME)
assert test_result["status"] == "pass"


# Schema validity currently not supported on ClickHouse
@pytest.mark.skip_targets(["spark", "clickhouse"])
@pytest.mark.skip_for_dbt_fusion
def test_exposure_schema_validity_correct_columns_and_types(
test_id: str, dbt_project: DbtProject
):
Expand Down Expand Up @@ -88,7 +101,6 @@ def test_exposure_schema_validity_correct_columns_and_types(


@pytest.mark.skip_targets(["spark"])
@pytest.mark.skip_for_dbt_fusion
def test_exposure_schema_validity_correct_columns_and_invalid_type(
test_id: str, dbt_project: DbtProject
):
Expand All @@ -111,17 +123,25 @@ def test_exposure_schema_validity_correct_columns_and_invalid_type(
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, DBT_TEST_ARGS, columns=[dict(name="bla")], as_model=True
)
assert test_result["status"] == "fail"

invalid_exposures = [
json.loads(row["result_row"])
for row in dbt_project.run_query(
INVALID_EXPOSURES_QUERY.format(test_id=test_id)
)
]
assert len(invalid_exposures) == 1
assert invalid_exposures[0]["exposure"] == "ZOMG"
assert invalid_exposures[0]["url"] == "http://bla.com"
assert (
"different data type for the column order_id string vs"
in test_result["test_results_query"]
invalid_exposures[0]["error"]
== "different data type for the column order_id string vs numeric"
)
assert test_result["status"] == "fail"


# Schema validity currently not supported on ClickHouse
@pytest.mark.skip_targets(["spark", "clickhouse"])
@pytest.mark.skip_for_dbt_fusion
def test_exposure_schema_validity_invalid_type_name_present_in_error(
test_id: str, dbt_project: DbtProject
):
Expand Down Expand Up @@ -155,16 +175,24 @@ def test_exposure_schema_validity_invalid_type_name_present_in_error(
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, DBT_TEST_ARGS, columns=[dict(name="bla")], as_model=True
)
assert test_result["status"] == "fail"

invalid_exposures = [
json.loads(row["result_row"])
for row in dbt_project.run_query(
INVALID_EXPOSURES_QUERY.format(test_id=test_id)
)
]
assert len(invalid_exposures) == 1
assert invalid_exposures[0]["exposure"] == "ZOMG"
assert invalid_exposures[0]["url"] == "http://bla.com"
assert (
"different data type for the column order_id string vs numeric"
in test_result["test_results_query"]
invalid_exposures[0]["error"]
== "different data type for the column order_id string vs numeric"
)
assert test_result["status"] == "fail"


@pytest.mark.skip_targets(["spark"])
@pytest.mark.skip_for_dbt_fusion
def test_exposure_schema_validity_correct_columns_and_missing_type(
test_id: str, dbt_project: DbtProject
):
Expand All @@ -188,7 +216,6 @@ def test_exposure_schema_validity_correct_columns_and_missing_type(


@pytest.mark.skip_targets(["spark"])
@pytest.mark.skip_for_dbt_fusion
def test_exposure_schema_validity_missing_columns(
test_id: str, dbt_project: DbtProject
):
Expand All @@ -211,6 +238,15 @@ def test_exposure_schema_validity_missing_columns(
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, DBT_TEST_ARGS, columns=[dict(name="bla")], as_model=True
)

assert "order_id column missing in the model" in test_result["test_results_query"]
assert test_result["status"] == "fail"

invalid_exposures = [
json.loads(row["result_row"])
for row in dbt_project.run_query(
INVALID_EXPOSURES_QUERY.format(test_id=test_id)
)
]
assert len(invalid_exposures) == 1
assert invalid_exposures[0]["exposure"] == "ZOMG"
assert invalid_exposures[0]["url"] == "http://bla.com"
assert invalid_exposures[0]["error"] == "order_id column missing in the model"
4 changes: 0 additions & 4 deletions integration_tests/tests/test_failed_row_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

# Failed row count currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
@pytest.mark.skip_for_dbt_fusion
def test_count_failed_row_count(test_id: str, dbt_project: DbtProject):
null_count = 50
data = [{COLUMN_NAME: None} for _ in range(null_count)]
Expand All @@ -24,7 +23,6 @@ def test_count_failed_row_count(test_id: str, dbt_project: DbtProject):
) # when the failed_row_count_calc is count(*), these should be equal


@pytest.mark.skip_for_dbt_fusion
def test_sum_failed_row_count(test_id: str, dbt_project: DbtProject):
non_unique_count = 50
data = [{COLUMN_NAME: 5} for _ in range(non_unique_count)]
Expand All @@ -44,7 +42,6 @@ def test_sum_failed_row_count(test_id: str, dbt_project: DbtProject):

# Failed row count currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
@pytest.mark.skip_for_dbt_fusion
def test_custom_failed_row_count(test_id: str, dbt_project: DbtProject):
null_count = 50
overwrite_failed_row_count = 5
Expand All @@ -64,7 +61,6 @@ def test_custom_failed_row_count(test_id: str, dbt_project: DbtProject):
assert test_result["failed_row_count"] == overwrite_failed_row_count


@pytest.mark.skip_for_dbt_fusion
def test_warn_if_0(test_id: str, dbt_project: DbtProject):
# Edge case that we want to verify

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/tests/test_override_samples_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_sample_count_unlimited(test_id: str, dbt_project: DbtProject):
"enable_elementary_test_materialization": True,
"test_sample_row_count": 5,
},
test_config={"meta": {"test_sample_row_count": None}},
test_config={"meta": {"test_sample_row_count": -1}},
)
assert test_result["status"] == "fail"

Expand Down
2 changes: 2 additions & 0 deletions integration_tests/tests/test_schema_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def assert_test_results(test_results: List[dict]):


# Schema changes currently not supported on targets
# dbt-fusion caches column information and doesn't refresh when tables are recreated
@pytest.mark.skip_targets(["databricks", "spark", "athena", "trino", "clickhouse"])
@pytest.mark.skip_for_dbt_fusion
def test_schema_changes(test_id: str, dbt_project: DbtProject):
dbt_test_name = "elementary.schema_changes"
test_result = dbt_project.test(test_id, dbt_test_name, data=DATASET1)
Expand Down
14 changes: 4 additions & 10 deletions macros/edr/dbt_artifacts/upload_dbt_invocation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,8 @@

{%- macro get_invocation_select_filter() -%}
{% set config = elementary.get_runtime_config() %}
{%- if invocation_args_dict -%}
{%- if invocation_args_dict.select -%}
{{- return(invocation_args_dict.select) -}}
{%- elif invocation_args_dict.SELECT -%}
{{- return(invocation_args_dict.SELECT) -}}
{%- endif -%}
{%- if invocation_args_dict and invocation_args_dict.select -%}
{{- return(invocation_args_dict.select) -}}
{%- elif config.args and config.args.select -%}
{{- return(config.args.select) -}}
{%- else -%}
Expand All @@ -106,10 +102,10 @@
{% do return(invocation_args_dict.selector) %}
{% elif invocation_args_dict and invocation_args_dict.selector_name %}
{% do return(invocation_args_dict.selector_name) %}
{% elif invocation_args_dict and invocation_args_dict.INVOCATION_COMMAND %}
{% elif invocation_args_dict and invocation_args_dict.invocation_command %}
{% set match = modules.re.search(
"--selector(?:\s+|=)(\S+)",
invocation_args_dict.INVOCATION_COMMAND
invocation_args_dict.invocation_command
) %}
{% if match %}
{% do return(match.group(1)) %}
Expand All @@ -132,8 +128,6 @@
{% else %}
{% set invocation_vars = fromyaml(invocation_args_dict.vars) %}
{% endif %}
{% elif invocation_args_dict and invocation_args_dict.VARS %}
{% set invocation_vars = invocation_args_dict.VARS %}
{% elif config.cli_vars %}
{% set invocation_vars = config.cli_vars %}
{% endif %}
Expand Down
17 changes: 6 additions & 11 deletions macros/edr/dbt_artifacts/upload_dbt_tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,20 @@

{% set default_description = elementary.get_default_description(test_original_name, test_namespace) %}

{% set config_meta_dict = elementary.safe_get_with_default(config_dict, 'meta', {}) %}
{% set meta_dict = elementary.safe_get_with_default(node_dict, 'meta', {}) %}

{% set unified_meta = {} %}
{% do unified_meta.update(config_meta_dict) %}
{% do unified_meta.update(meta_dict) %}
{% set meta = elementary.get_node_meta(node_dict) %}

{% set description = none %}
{% if dbt_version >= '1.9.0' and node_dict.get('description') %}
{% set description = node_dict.get('description') %}
{% elif unified_meta.get('description') %}
{% set description = unified_meta.pop('description') %}
{% elif meta.get('description') %}
{% set description = meta.pop('description') %}
{% elif default_description %}
{% set description = default_description %}
{% endif %}

{% set config_tags = elementary.safe_get_with_default(config_dict, 'tags', []) %}
{% set global_tags = elementary.safe_get_with_default(node_dict, 'tags', []) %}
{% set meta_tags = elementary.safe_get_with_default(unified_meta, 'tags', []) %}
{% set meta_tags = elementary.safe_get_with_default(meta, 'tags', []) %}
{% set tags = elementary.union_lists(config_tags, global_tags) %}
{% set tags = elementary.union_lists(tags, meta_tags) %}

Expand Down Expand Up @@ -167,7 +162,7 @@
'tags': elementary.filter_none_and_sort(tags),
'model_tags': elementary.filter_none_and_sort(test_models_tags),
'model_owners': elementary.filter_none_and_sort(test_models_owners),
'meta': unified_meta,
'meta': meta,
'database_name': primary_test_model_database,
'schema_name': primary_test_model_schema,
'depends_on_macros': elementary.filter_none_and_sort(depends_on_dict.get('macros', [])),
Expand All @@ -181,7 +176,7 @@
'compiled_code': elementary.get_compiled_code(node_dict),
'path': node_dict.get('path'),
'generated_at': elementary.datetime_now_utc_as_string(),
'quality_dimension': unified_meta.get('quality_dimension') or elementary.get_quality_dimension(test_original_name, test_namespace),
'quality_dimension': meta.get('quality_dimension') or elementary.get_quality_dimension(test_original_name, test_namespace),
'group_name': group_name,
}%}
{% do flatten_test_metadata_dict.update({"metadata_hash": elementary.get_artifact_metadata_hash(flatten_test_metadata_dict)}) %}
Expand Down
6 changes: 6 additions & 0 deletions macros/edr/materializations/test/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@
{% if sample_limit == 0 %} {# performance: no need to run a sql query that we know returns an empty list #}
{% do return([]) %}
{% endif %}

{# Allow setting -1 for unlimited, as none values are stripped from meta in dbt-fusion #}
{% if sample_limit == -1 %}
{% set sample_limit = none %}
{% endif %}

{% if ignore_passed_tests and elementary.did_test_pass() %}
{% do elementary.debug_log("Skipping sample query because the test passed.") %}
{% do return([]) %}
Expand Down
10 changes: 0 additions & 10 deletions macros/edr/tests/on_run_end/handle_tests_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@
{% set status = "pass" %}
{% endif %}

{% if elementary.is_dbt_fusion() %}
{% if status == 'error' %}
{# dbt-fusion currently doesn't distinguish between failure and error #}
{% set status = "fail" %}
{% elif status == 'success' %}
{# dbt-fusion seems to sometime return 'pass' and sometimes 'success', so we normalize to 'pass' #}
{% set status = "pass" %}
{% endif %}
{% endif %}

{% do return(status) %}
{% endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
{%- set detection_delay = elementary.get_detection_delay(detection_delay, model_graph_node) %}

{%- set ignore_small_changes = elementary.get_test_argument('ignore_small_changes', ignore_small_changes, model_graph_node) %}
{# Validate ignore_small_changes #}
{# Normalize ignore_small_changes to ensure it's always a dict with expected keys #}
{%- if ignore_small_changes is none %}
{%- set ignore_small_changes = {"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none} %}
{%- endif %}

{% set anomaly_exclude_metrics = elementary.get_test_argument('anomaly_exclude_metrics', anomaly_exclude_metrics, model_graph_node) %}
{% set exclude_final_results = elementary.get_exclude_final_results(exclude_final_results) %}
Expand Down Expand Up @@ -109,6 +112,9 @@
{% endmacro %}

{% macro validate_ignore_small_changes(test_configuration) %}
{% if test_configuration.ignore_small_changes is none %}
{% do return(none) %}
{% endif %}
{% for key, value in test_configuration.ignore_small_changes.items() %}
{% if key not in ['spike_failure_percent_threshold', 'drop_failure_percent_threshold'] %}
{% do exceptions.raise_compiler_error('Illegal configuration key: {}'.format(key)) %}
Expand Down
Loading