Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 147 additions & 10 deletions ingestion/src/metadata/utils/source_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,22 @@

"""
Source hash utils module

This module provides utilities for generating stable, deterministic hashes
from create request models. The hash is used to detect changes in metadata
between ingestion runs.

To ensure hash stability, this module:
1. Sorts lists by deterministic keys (e.g., columns by ordinalPosition or name)
2. Excludes volatile fields that may change between runs (e.g., href, deleted, inherited)
3. Normalizes DDL/SQL whitespace in schemaDefinition
"""

import hashlib
import json
import re
import traceback
from typing import Dict, Optional
from typing import Any, Dict, List, Optional, Union

from metadata.ingestion.ometa.ometa_api import C
from metadata.utils.logger import utils_logger
Expand All @@ -27,24 +38,150 @@
"sourceHash": True,
}

VOLATILE_ENTITY_REFERENCE_FIELDS = {"href", "deleted", "inherited"}


def _normalize_whitespace(text: Optional[str]) -> Optional[str]:
"""
Normalize whitespace in SQL/DDL text to ensure consistent hashing.
- Collapses multiple whitespace characters into a single space
- Trims leading/trailing whitespace
"""
if text is None:
return None
return re.sub(r"\s+", " ", text.strip())


def _get_column_sort_key(column: Dict[str, Any]) -> tuple:
"""
Get a sort key for a column dict.
Prioritizes ordinalPosition if present, otherwise uses name.
"""
ordinal = column.get("ordinalPosition")
name = column.get("name", "")
if isinstance(name, dict):
name = name.get("root", name.get("__root__", ""))
return (ordinal if ordinal is not None else float("inf"), str(name))


def _get_tag_sort_key(tag: Dict[str, Any]) -> str:
"""Get a sort key for a tag dict based on tagFQN."""
tag_fqn = tag.get("tagFQN", "")
if isinstance(tag_fqn, dict):
tag_fqn = tag_fqn.get("root", tag_fqn.get("__root__", ""))
return str(tag_fqn)


def _get_constraint_sort_key(constraint: Dict[str, Any]) -> tuple:
"""Get a sort key for a table constraint dict."""
constraint_type = constraint.get("constraintType", "")
columns = constraint.get("columns", [])
columns_str = ",".join(sorted(columns)) if columns else ""
return (str(constraint_type), columns_str)


def _get_entity_reference_sort_key(ref: Dict[str, Any]) -> str:
"""Get a sort key for an entity reference dict."""
return str(ref.get("fullyQualifiedName") or ref.get("name") or ref.get("id") or "")


def _remove_volatile_fields(obj: Union[Dict, List, Any]) -> Union[Dict, List, Any]:
"""
Recursively remove volatile fields from entity references and normalize data.
This ensures that fields like href, deleted, inherited don't affect the hash.
"""
if isinstance(obj, dict): # pylint: disable=no-else-return
result = {}
for key, value in obj.items():
if key in VOLATILE_ENTITY_REFERENCE_FIELDS:
continue
result[key] = _remove_volatile_fields(value)
return result
elif isinstance(obj, list):
return [_remove_volatile_fields(item) for item in obj]
return obj


def _sort_columns(columns: List[Any]) -> List[Any]:
"""
Sort columns by ordinalPosition (if present) then by name.
Also recursively sorts nested children columns.

Handles both Column dict structures and simple string lists for backward compatibility.
"""
if not columns:
return columns

if not isinstance(columns[0], dict):
return sorted(columns, key=str)

sorted_columns = sorted(columns, key=_get_column_sort_key)
for col in sorted_columns:
if col.get("children"):
col["children"] = _sort_columns(col["children"])
if col.get("tags"):
col["tags"] = sorted(col["tags"], key=_get_tag_sort_key)
return sorted_columns


def _normalize_for_hash(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Normalize a create request dict to ensure deterministic hashing.

This function:
1. Sorts columns by ordinalPosition/name
2. Sorts tags by tagFQN
3. Sorts tableConstraints by type and columns
4. Sorts owners by FQN/name/id
5. Removes volatile EntityReference fields (href, deleted, inherited)
6. Normalizes schemaDefinition whitespace
"""
result = _remove_volatile_fields(data)

if "columns" in result and isinstance(result["columns"], list):
result["columns"] = _sort_columns(result["columns"])

if "tags" in result and isinstance(result["tags"], list):
result["tags"] = sorted(result["tags"], key=_get_tag_sort_key)

if "tableConstraints" in result and isinstance(result["tableConstraints"], list):
result["tableConstraints"] = sorted(
result["tableConstraints"], key=_get_constraint_sort_key
)

if "owners" in result and isinstance(result["owners"], list):
result["owners"] = sorted(result["owners"], key=_get_entity_reference_sort_key)

if "schemaDefinition" in result and result["schemaDefinition"]:
result["schemaDefinition"] = _normalize_whitespace(result["schemaDefinition"])

return result


def generate_source_hash(
create_request: C, exclude_fields: Optional[Dict] = None
) -> Optional[str]:
"""
Given a create_request model convert it to json string and generate a hash value
Given a create_request model convert it to a normalized json string
and generate a stable hash value.

The normalization process ensures hash stability by:
- Sorting lists (columns, tags, constraints, owners) by deterministic keys
- Removing volatile fields (href, deleted, inherited) from entity references
- Normalizing whitespace in DDL/SQL definitions
"""
try:
# We always want to exclude the sourceHash when generating the fingerprint
exclude_fields = (
SOURCE_HASH_EXCLUDE_FIELDS.update(exclude_fields)
if exclude_fields
else SOURCE_HASH_EXCLUDE_FIELDS
)
final_exclude = dict(SOURCE_HASH_EXCLUDE_FIELDS)
if exclude_fields:
final_exclude.update(exclude_fields)

request_dict = create_request.model_dump(exclude=final_exclude)

normalized_dict = _normalize_for_hash(request_dict)

create_request_json = create_request.model_dump_json(exclude=exclude_fields)
normalized_json = json.dumps(normalized_dict, sort_keys=True, default=str)

json_bytes = create_request_json.encode("utf-8")
json_bytes = normalized_json.encode("utf-8")
return hashlib.md5(json_bytes).hexdigest()

except Exception as exc:
Expand Down
26 changes: 13 additions & 13 deletions ingestion/tests/unit/topology/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_source_hash(self):
"""Check it works with generic models"""

mock_table = MockTable(name="name", columns=["a", "b", "c"])
real_fingerprint = "5a8e89ce98a5060590b55cf8c00a5d50"
real_fingerprint = "03a6bd999d83f6e8fe25659bb6f8ac90"

self.assertEqual(real_fingerprint, generate_source_hash(mock_table))

Expand Down Expand Up @@ -144,29 +144,29 @@ def test_node_and_stage(self):
],
[
MockSchema(
name="schema1", sourceHash="81598ade7c7fa8f5383a7578e2cb6242"
name="schema1", sourceHash="ddb43c9d34ccbe2363a37db746211fcb"
),
MockTable(
name="table1",
sourceHash="a5987f1e4ce03538f69af0b02f9e024c",
sourceHash="384ee4341cf5c1ac5658f9310ea8868c",
columns=["c1", "c2"],
),
MockTable(
name="table2",
sourceHash="b7779efcbd2c01636a104121d5abced4",
sourceHash="3b3c6ad507d2bbf24a68451d2bef38dd",
columns=["c1", "c2"],
),
MockSchema(
name="schema2", sourceHash="5aa190009346267c1fdc3da529e35c69"
name="schema2", sourceHash="18e4768ea591108c38e6b24a861cb3d2"
),
MockTable(
name="table1",
sourceHash="a5987f1e4ce03538f69af0b02f9e024c",
sourceHash="384ee4341cf5c1ac5658f9310ea8868c",
columns=["c1", "c2"],
),
MockTable(
name="table2",
sourceHash="b7779efcbd2c01636a104121d5abced4",
sourceHash="3b3c6ad507d2bbf24a68451d2bef38dd",
columns=["c1", "c2"],
),
"hello",
Expand Down Expand Up @@ -196,29 +196,29 @@ def test_multithread_node_and_stage(self):
],
[
MockSchema(
name="schema1", sourceHash="81598ade7c7fa8f5383a7578e2cb6242"
name="schema1", sourceHash="ddb43c9d34ccbe2363a37db746211fcb"
),
MockTable(
name="table1",
sourceHash="a5987f1e4ce03538f69af0b02f9e024c",
sourceHash="384ee4341cf5c1ac5658f9310ea8868c",
columns=["c1", "c2"],
),
MockTable(
name="table2",
sourceHash="b7779efcbd2c01636a104121d5abced4",
sourceHash="3b3c6ad507d2bbf24a68451d2bef38dd",
columns=["c1", "c2"],
),
MockSchema(
name="schema2", sourceHash="5aa190009346267c1fdc3da529e35c69"
name="schema2", sourceHash="18e4768ea591108c38e6b24a861cb3d2"
),
MockTable(
name="table1",
sourceHash="a5987f1e4ce03538f69af0b02f9e024c",
sourceHash="384ee4341cf5c1ac5658f9310ea8868c",
columns=["c1", "c2"],
),
MockTable(
name="table2",
sourceHash="b7779efcbd2c01636a104121d5abced4",
sourceHash="3b3c6ad507d2bbf24a68451d2bef38dd",
columns=["c1", "c2"],
),
"hello",
Expand Down
Loading
Loading