Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions airbyte_cdk/sources/declarative/expanders/record_expander.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import copy
from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Iterable, Mapping, MutableMapping, Sequence
Expand Down Expand Up @@ -59,7 +58,9 @@ class RecordExpander:
Items from this array will be extracted and emitted as separate records.
Supports wildcards (*).
remain_original_record: If True, each expanded record will include the original
parent record in an "original_record" field. Defaults to False.
parent record in an "original_record" field. The parent record is shared
(not deep-copied) across all expanded siblings, so treat it as read-only.
Defaults to False.
on_no_records: Behavior when expansion produces no records. "skip" (default)
emits nothing. "emit_parent" emits the original parent record unchanged.
config: The user-provided configuration as specified by the source's spec.
Expand Down Expand Up @@ -111,7 +112,7 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap
if self.remain_original_record:
yield {
"value": item,
"original_record": copy.deepcopy(parent_record),
"original_record": parent_record,
}
else:
yield item
Expand All @@ -125,4 +126,4 @@ def _apply_parent_context(
) -> None:
"""Apply parent context to a child record."""
if self.remain_original_record:
child_record["original_record"] = copy.deepcopy(parent_record)
child_record["original_record"] = parent_record
98 changes: 98 additions & 0 deletions unit_tests/sources/declarative/extractors/test_dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,104 @@ def test_dpath_extractor_with_expansion(
assert actual_records == expected_records


def test_record_expander_shares_parent_reference_for_dict_items():
"""Expanded dict records share the same parent reference instead of deep-copying.

This prevents O(N²) memory usage when a parent record contains a large nested
array (e.g., a Stripe invoice event with many line items). Each expanded child
record's `original_record` should be the exact same object as all siblings.
"""
parent = {
"id": "evt_123",
"type": "invoice.updated",
"data": {
"object": {
"id": "in_456",
"lines": {
"data": [
{"id": "il_1", "amount": 100},
{"id": "il_2", "amount": 200},
{"id": "il_3", "amount": 300},
]
},
}
},
}

expander = RecordExpander(
expand_records_from_field=["data", "object", "lines", "data"],
config={},
parameters={},
remain_original_record=True,
)

records = list(expander.expand_record(parent))
assert len(records) == 3

# All expanded records should share the same parent reference (identity, not just equality)
for rec in records:
assert rec["original_record"] is parent

# Values are still correct
assert records[0]["id"] == "il_1"
assert records[1]["id"] == "il_2"
assert records[2]["id"] == "il_3"


def test_record_expander_shares_parent_reference_for_non_dict_items():
"""Non-dict expanded records also share the same parent reference."""
parent = {"items": [1, "a", 3.14], "meta": "info"}

expander = RecordExpander(
expand_records_from_field=["items"],
config={},
parameters={},
remain_original_record=True,
)

records = list(expander.expand_record(parent))
assert len(records) == 3

for rec in records:
assert rec["original_record"] is parent


def test_record_expander_large_expansion_memory_efficient():
"""Expanding a record with many children should not multiply parent memory.

Simulates a Stripe-like scenario: an event with 500 line items. With deep
copy this would create 500 copies of the full event (O(N²) memory). With
shared references, memory usage stays O(N).
"""
n_items = 500
parent = {
"id": "evt_large",
"type": "invoice.updated",
"data": {
"object": {
"id": "in_big",
"lines": {"data": [{"id": f"il_{i}", "amount": i * 100} for i in range(n_items)]},
}
},
}

expander = RecordExpander(
expand_records_from_field=["data", "object", "lines", "data"],
config={},
parameters={},
remain_original_record=True,
)

records = list(expander.expand_record(parent))
assert len(records) == n_items

# All share the same parent object — no deep copies
for rec in records:
assert rec["original_record"] is parent
assert records[0]["id"] == "il_0"
assert records[-1]["id"] == f"il_{n_items - 1}"


@pytest.mark.parametrize(
"field_path, expand_records_from_field, on_no_records, body, expected_records",
[
Expand Down
Loading