Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 132 additions & 33 deletions ingestion/src/metadata/ingestion/models/patch_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,44 +374,103 @@
source=source,
destination=destination,
array_entity_fields=array_entity_fields,
restrict_update_fields=restrict_update_fields,
override_metadata=override_metadata,
)

# special handler for tableConstraints
_table_constraints_handler(source, destination)

# Get the difference between source and destination
# Determine which array entity fields are present in allowed_fields
active_array_fields = set()
if array_entity_fields:
for field in array_entity_fields:
if allowed_fields is None or field in allowed_fields:
active_array_fields.add(field)

# Exclude array entity fields from the position-based jsonpatch diff.
# They are handled via full "replace" operations to preserve correct
# ordering when columns are added/removed/reordered.
if allowed_fields:
non_array_allowed = {
k: v for k, v in allowed_fields.items() if k not in active_array_fields
}
if non_array_allowed:
patch = jsonpatch.make_patch(
json.loads(
source.model_dump_json(
exclude_unset=True,
exclude_none=True,
include=non_array_allowed,
)
),
json.loads(
destination.model_dump_json(
exclude_unset=True,
exclude_none=True,
include=non_array_allowed,
)
),
)
else:
patch = jsonpatch.JsonPatch([])
else:
array_exclude = (
{f: True for f in active_array_fields} if active_array_fields else None

Check warning on line 419 in ingestion/src/metadata/ingestion/models/patch_request.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Replace with dict fromkeys method call

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ0cPf4uZXezoAn1Y1My&open=AZ0cPf4uZXezoAn1Y1My&pullRequest=26708
)
patch = jsonpatch.make_patch(
json.loads(
source.model_dump_json(
exclude_unset=True,
exclude_none=True,
include=allowed_fields,
exclude=array_exclude,
)
),
json.loads(
destination.model_dump_json(
exclude_unset=True,
exclude_none=True,
include=allowed_fields,
exclude=array_exclude,
)
),
)
else:
patch: jsonpatch.JsonPatch = jsonpatch.make_patch(
json.loads(
source.model_dump_json(exclude_unset=True, exclude_none=True)
),
json.loads(
destination.model_dump_json(exclude_unset=True, exclude_none=True)
),
)
if not patch:

# Add full "replace" operations for array entity fields whose
# content changed. The merge in _sort_array_entity_fields already
# applied the restrict_update_fields / override_metadata logic, so
# the replacement value is ready to use as-is.
for field in active_array_fields:
if hasattr(source, field) and hasattr(destination, field):
src_json = json.loads(
source.model_dump_json(
exclude_unset=True,
exclude_none=True,
include={field: True},
)
)
dst_json = json.loads(
destination.model_dump_json(
exclude_unset=True,
exclude_none=True,
include={field: True},
)
)
if src_json.get(field) != dst_json.get(field):
patch.patch.append(
{
"op": "replace",
"path": f"/{field}",
"value": dst_json.get(field, []),
}
)
Comment on lines +442 to +465
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_patch() builds full-array replace ops by dumping include={field: True} for array entity fields. This ignores the allowed_fields filter contract (e.g., ALLOWED_COMMON_PATCH_FIELDS['columns'] only allows a subset of column properties), so the generated /columns (or /tasks, /fields) replacement can unintentionally patch disallowed subfields or send payload the backend doesn't accept. Consider using the per-field include spec from allowed_fields when available (and the same spec for src/dst comparison + replacement value).

Copilot uses AI. Check for mistakes.

if not patch.patch:
return None

# For a user editable fields like descriptions, tags we only want to support "add" operation in patch
# we will remove the other operations.
# This will only be applicable if the override_metadata field is set to False.
# For user-editable fields like descriptions and tags we only want
# to support "add" operations in the patch. Array entity field
# "replace" operations (e.g. /columns) pass through because their
# paths do not contain restricted field names.
if restrict_update_fields:
updated_operations = JsonPatchUpdater.from_restrict_update_fields(
restrict_update_fields
Expand Down Expand Up @@ -515,42 +574,82 @@
setattr(destination, "tableConstraints", rearranged_constraints)


def _should_update_restricted_field(
source_value, dest_value, override_metadata: bool
) -> bool:
"""Decide whether a restricted field should be updated from destination.

Mirrors the restrict_update_fields filter semantics:
- ADD (source empty → dest has value): always allowed
- REPLACE (both have values): only with override
- REMOVE (source has value → dest empty): never allowed
"""
source_empty = source_value is None or (
isinstance(source_value, list) and len(source_value) == 0
)
dest_empty = dest_value is None or (
isinstance(dest_value, list) and len(dest_value) == 0
)
if dest_empty:
return False
if source_empty:
return True
return override_metadata


def _sort_array_entity_fields(

Check failure on line 600 in ingestion/src/metadata/ingestion/models/patch_request.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 46 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ0cPf4uZXezoAn1Y1Mz&open=AZ0cPf4uZXezoAn1Y1Mz&pullRequest=26708
source: T,
destination: T,
array_entity_fields: Optional[List] = None,
restrict_update_fields: Optional[List] = None,
override_metadata: Optional[bool] = False,
):
"""
Sort the array entity fields to make sure the order is consistent
Reorder array entity fields to match the destination order (the actual
source database column order), while merging metadata from source
(the existing entity in OpenMetadata) for columns that already exist.

The merge respects restrict_update_fields / override_metadata so the
resulting array can be used as a full replacement value without further
filtering.
"""
restrict_set = set(restrict_update_fields or [])

for field in array_entity_fields or []:
if hasattr(destination, field) and hasattr(source, field):
destination_attributes = getattr(destination, field)
source_attributes = getattr(source, field)

# Create a dictionary of destination attributes for easy lookup
destination_dict = {
_get_attribute_name(attr): attr for attr in destination_attributes
source_dict = {
_get_attribute_name(attr): attr for attr in source_attributes
}

updated_attributes = []
for source_attr in source_attributes or []:
# Update the destination attribute with the source attribute
destination_attr = destination_dict.get(
_get_attribute_name(source_attr)
)
if destination_attr:
for dest_attr in destination_attributes or []:
source_attr = source_dict.get(_get_attribute_name(dest_attr))
if source_attr:
update_dict = {}
for k, v in dest_attr.__dict__.items():
if k not in dest_attr.model_fields_set:
continue
if k in restrict_set:
src_val = getattr(source_attr, k, None)
if not _should_update_restricted_field(
src_val, v, override_metadata
):
continue
update_dict[k] = v
updated_attributes.append(
source_attr.model_copy(update=destination_attr.__dict__)
source_attr.model_copy(update=update_dict)
)
# Remove the updated attribute from the destination dictionary
del destination_dict[_get_attribute_name(source_attr)]
else:
updated_attributes.append(None)
updated_attributes.append(dest_attr)

for idx, attr in enumerate(updated_attributes):
if hasattr(attr, "ordinalPosition"):
attr.ordinalPosition = idx + 1

# Combine the updated attributes with the remaining destination attributes
final_attributes = updated_attributes + list(destination_dict.values())
setattr(destination, field, final_attributes)
setattr(destination, field, updated_attributes)


def _remove_change_description(entity: T) -> T:
Expand Down
3 changes: 1 addition & 2 deletions ingestion/tests/integration/mysql/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ def create_service_request(mysql_container):
"config": {
"username": mysql_container.username,
"authType": {"password": mysql_container.password},
"hostPort": "localhost:"
+ mysql_container.get_exposed_port(mysql_container.port),
"hostPort": f"localhost:{mysql_container.get_exposed_port(mysql_container.port)}",
"databaseSchema": mysql_container.dbname,
}
},
Expand Down
73 changes: 73 additions & 0 deletions ingestion/tests/integration/mysql/test_column_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import pytest
from sqlalchemy import create_engine, text

from metadata.generated.schema.entity.data.table import Table
from metadata.workflow.metadata import MetadataWorkflow


@pytest.fixture(scope="module")
def mysql_engine(mysql_container):
engine = create_engine(mysql_container.get_connection_url())
yield engine
engine.dispose()


@pytest.fixture()
def column_order_table(mysql_engine):
with mysql_engine.connect() as conn:
conn.execute(text("DROP TABLE IF EXISTS employees.column_order_test"))
conn.execute(
text(
"CREATE TABLE employees.column_order_test ("
" id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
" name VARCHAR(100),"
" created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
")"
)
)
conn.commit()
yield
with mysql_engine.connect() as conn:
conn.execute(text("DROP TABLE IF EXISTS employees.column_order_test"))
conn.commit()


def test_column_order_preserved_after_adding_column_in_middle(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
metadata,
db_service,
mysql_engine,
column_order_table,
):
run_workflow(MetadataWorkflow, ingestion_config)

table_fqn = (
f"{db_service.fullyQualifiedName.root}.default.employees.column_order_test"
)
table = metadata.get_by_name(entity=Table, fqn=table_fqn)
assert table is not None
assert len(table.columns) == 3
assert table.columns[0].name.root == "id"
assert table.columns[1].name.root == "name"
assert table.columns[2].name.root == "created_at"

with mysql_engine.connect() as conn:
conn.execute(
text(
"ALTER TABLE employees.column_order_test "
"ADD COLUMN email VARCHAR(255) AFTER name"
)
)
conn.commit()

run_workflow(MetadataWorkflow, ingestion_config)

table = metadata.get_by_name(entity=Table, fqn=table_fqn)
assert table is not None
assert len(table.columns) == 4
assert table.columns[0].name.root == "id"
assert table.columns[1].name.root == "name"
assert table.columns[2].name.root == "email"
assert table.columns[3].name.root == "created_at"
Loading
Loading