Skip to content
Draft
129 changes: 129 additions & 0 deletions paimon-python/pypaimon/common/merge_engine_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Centralised merge-engine dispatch.

Both the read path (``MergeFileSplitRead``) and the write path
(``KeyValueDataWriter``'s in-memory merge buffer) need to pick a
``MergeFunction`` based on the table's ``merge-engine`` option. This
module is the single source of truth so the two sides cannot drift.

Mirrors Java ``MergeFunctionFactory`` (paimon-core/.../mergetree/
compact/MergeFunctionFactory.java).
"""

from typing import List

from pypaimon.common.options.core_options import MergeEngine
from pypaimon.read.reader.deduplicate_merge_function import \
DeduplicateMergeFunction
from pypaimon.read.reader.partial_update_merge_function import \
PartialUpdateMergeFunction


# Boolean-valued options that, when truthy, opt the table into
# behaviour the Python PartialUpdateMergeFunction does not implement.
# Mirrors org.apache.paimon.CoreOptions and the fallback keys in
# PartialUpdateMergeFunction.java.
_PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = (
"ignore-delete",
"partial-update.ignore-delete",
"first-row.ignore-delete",
"deduplicate.ignore-delete",
"partial-update.remove-record-on-delete",
"partial-update.remove-record-on-sequence-group",
)
_FIELDS_PREFIX = "fields."
_FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group"
_FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function"
_DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function"


def build_merge_function(
*,
engine: MergeEngine,
raw_options: dict,
key_arity: int,
value_arity: int,
value_field_nullables: List[bool],
):
"""Pick the MergeFunction for the table's ``merge-engine`` option.

``engine`` and ``raw_options`` come from the table's ``CoreOptions``
(typically ``table.options.merge_engine()`` and
``table.options.options.to_map()``). ``key_arity`` / ``value_arity``
/ ``value_field_nullables`` describe the value-side schema the
caller wants the merge function to operate on -- for the read path
this is the projected read schema, for the write path it's the full
table schema (minus primary keys).
"""
if engine == MergeEngine.DEDUPLICATE:
return DeduplicateMergeFunction()
if engine == MergeEngine.PARTIAL_UPDATE:
unsupported = partial_update_unsupported_options(raw_options)
if unsupported:
raise NotImplementedError(
"merge-engine 'partial-update' is enabled together with "
"options that pypaimon does not yet implement: {}. The "
"supported subset is per-key last-non-null merge with "
"no sequence-group, no per-field aggregator override, "
"no ignore-delete and no partial-update.remove-record-on-* "
"flags. Use the Java client for the full feature set, or "
"open an issue to track Python support.".format(
", ".join(sorted(unsupported))
)
)
return PartialUpdateMergeFunction(
key_arity=key_arity,
value_arity=value_arity,
nullables=list(value_field_nullables),
)
raise NotImplementedError(
"merge-engine '{}' is not implemented in pypaimon yet "
"(supported: deduplicate, partial-update). Use the Java "
"client or open an issue to track support.".format(engine.value)
)


def partial_update_unsupported_options(raw_options: dict):
"""Return the set of option keys this table sets that
``PartialUpdateMergeFunction`` does not yet support. Empty set
means we can safely run the simple last-non-null merge.
"""
flagged = set()
for key, value in raw_options.items():
if (key in _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS
and _option_is_truthy(value)):
flagged.add(key)
elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY:
flagged.add(key)
elif key.startswith(_FIELDS_PREFIX) and (
key.endswith(_FIELD_SEQUENCE_GROUP_SUFFIX)
or key.endswith(_FIELD_AGGREGATE_FUNCTION_SUFFIX)):
flagged.add(key)
return flagged


def _option_is_truthy(raw):
if raw is None:
return False
if isinstance(raw, bool):
return raw
if isinstance(raw, str):
return raw.strip().lower() in ("true", "1", "yes", "on")
return bool(raw)
50 changes: 50 additions & 0 deletions paimon-python/pypaimon/read/reader/deduplicate_merge_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Default merge function for primary-key tables.

Mirrors Java ``DeduplicateMergeFunction`` -- for a run of KVs sharing
the same primary key, keep only the one with the highest sequence
number (by virtue of ``add`` being called in sequence-number order).
"""

from typing import Optional

from pypaimon.table.row.key_value import KeyValue


class DeduplicateMergeFunction:
"""Keep only the latest KV per primary key.

Used by both the read path (``SortMergeReaderWithMinHeap``) and the
write path (``KeyValueDataWriter`` in-memory merge buffer) -- the
latter is what enforces the LSM "PK unique within a file"
invariant on flush.
"""

def __init__(self):
self.latest_kv: Optional[KeyValue] = None

def reset(self) -> None:
self.latest_kv = None

def add(self, kv: KeyValue) -> None:
self.latest_kv = kv

def get_result(self) -> Optional[KeyValue]:
return self.latest_kv
134 changes: 134 additions & 0 deletions paimon-python/pypaimon/read/reader/partial_update_merge_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""
Python port of Java's ``PartialUpdateMergeFunction``
(``paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
PartialUpdateMergeFunction.java``).

The merge function used by the ``partial-update`` merge engine on PK
tables: rows sharing a primary key are merged left-to-right, taking the
latest non-null value per non-PK field. ``DeduplicateMergeFunction``
keeps only the latest row; ``PartialUpdateMergeFunction`` instead lets
later writes "fill in" fields the earlier writes left null, so users
can write the same logical record across multiple commits with
different sets of non-null columns.

This is the **core merge semantics only**. The Java implementation also
supports per-field aggregator overrides (``fields.<name>.aggregate-
function``), sequence groups (``fields.<name>.sequence-group``),
``ignore-delete``, and ``partial-update.remove-record-on-*`` options.
None of those are implemented yet; non-INSERT row kinds raise
``NotImplementedError`` at ``add`` time so we never silently corrupt
data with a half-implemented contract.
"""

from typing import Any, List, Optional

from pypaimon.table.row.key_value import KeyValue
from pypaimon.table.row.row_kind import RowKind


class PartialUpdateMergeFunction:
"""A MergeFunction where the key is the primary key (unique) and the
value is merged across all rows for that key by taking the latest
non-null value per non-PK field.

Mirrors the ``MergeFunction`` protocol used by ``SortMergeReader``:
``reset`` (between groups of same-key rows), ``add`` (one row at a
time, oldest to newest), ``get_result`` (after the group is
exhausted).
"""

def __init__(self, key_arity: int, value_arity: int,
nullables: Optional[List[bool]] = None):
self._key_arity = key_arity
self._value_arity = value_arity
# Per-value-field nullable flags, parallel to value indices. When
# ``None``, no nullability check runs (preserves the contract for
# direct callers that don't have schema info handy). When given,
# mirrors Java's ``updateNonNullFields`` check: a null input on a
# NOT NULL field raises rather than being silently absorbed.
if nullables is not None and len(nullables) != value_arity:
raise ValueError(
"nullables length {} does not match value_arity {}".format(
len(nullables), value_arity))
self._nullables = nullables
# Lazily allocated on first add(); ``None`` means "no rows yet".
self._accumulator: Optional[List[Any]] = None
# Reference to the most recently added kv. We use it only to
# propagate the key + sequence_number into the result row, and we
# snapshot those two values into a fresh tuple in ``get_result()``
# so the result is not aliased to upstream's reused KeyValue.
self._latest_kv: Optional[KeyValue] = None

def reset(self) -> None:
self._accumulator = None
self._latest_kv = None

def add(self, kv: KeyValue) -> None:
row_kind_byte = kv.value_row_kind_byte
if not RowKind.is_add_byte(row_kind_byte):
# DELETE / UPDATE_BEFORE need ignore-delete or
# partial-update.remove-record-on-delete to be set in Java;
# neither option is wired up in pypaimon yet, so refuse the
# row rather than silently swallow it.
raise NotImplementedError(
"PartialUpdateMergeFunction received a {} row; this "
"Python port does not yet implement the ignore-delete / "
"partial-update.remove-record-on-delete options. Use the "
"Java client for tables that produce DELETE / "
"UPDATE_BEFORE rows.".format(RowKind(row_kind_byte).to_string())
)

# Mirror Java's reset() + updateNonNullFields(): the accumulator
# starts as all-null (equivalent to ``new GenericRow(arity)``) and
# each add() writes non-null inputs; null inputs are absorbed —
# except when the schema marks the field NOT NULL, in which case
# we raise to match Java's IllegalArgumentException check.
if self._accumulator is None:
self._accumulator = [None] * self._value_arity
for i in range(self._value_arity):
v = kv.value.get_field(i)
if v is not None:
self._accumulator[i] = v
elif self._nullables is not None and not self._nullables[i]:
raise ValueError("Field {} can not be null".format(i))
self._latest_kv = kv

def get_result(self) -> Optional[KeyValue]:
if self._accumulator is None or self._latest_kv is None:
return None

kv = self._latest_kv
# Snapshot the key as a fresh tuple — we cannot keep a reference
# to ``kv`` because upstream readers (e.g. KeyValueWrapReader)
# reuse a single KeyValue instance and mutate its underlying
# row_tuple between calls. Building a fresh tuple here means the
# result we return is decoupled from any subsequent iteration.
key_values = tuple(
kv.key.get_field(i) for i in range(self._key_arity)
)
result_row = key_values + (
kv.sequence_number,
RowKind.INSERT.value,
) + tuple(self._accumulator)

result = KeyValue(self._key_arity, self._value_arity)
result.replace(result_row)
return result
28 changes: 10 additions & 18 deletions paimon-python/pypaimon/read/reader/sort_merge_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import heapq
from typing import Any, Callable, List, Optional

from pypaimon.read.reader.deduplicate_merge_function import \
DeduplicateMergeFunction
from pypaimon.read.reader.iface.record_iterator import RecordIterator
from pypaimon.read.reader.iface.record_reader import RecordReader
from pypaimon.schema.data_types import DataField, Keyword
Expand All @@ -30,9 +32,15 @@
class SortMergeReaderWithMinHeap(RecordReader):
"""SortMergeReader implemented with min-heap."""

def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema):
def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema,
merge_function: Optional[Any] = None):
self.next_batch_readers = list(readers)
self.merge_function = DeduplicateMergeFunction()
# Default to dedupe so callers that don't pass a merge_function
# keep their old behaviour. The merge engine dispatch lives in
# ``MergeFileSplitRead.section_reader_supplier`` for the read
# path; tests or other ad-hoc callers can pass a different
# implementation here.
self.merge_function = merge_function if merge_function is not None else DeduplicateMergeFunction()

if schema.partition_keys:
trimmed_primary_keys = [pk for pk in schema.primary_keys if pk not in schema.partition_keys]
Expand Down Expand Up @@ -124,22 +132,6 @@ def _next_impl(self):
return True


class DeduplicateMergeFunction:
"""A MergeFunction where key is primary key (unique) and value is the full record, only keep the latest one."""

def __init__(self):
self.latest_kv = None

def reset(self) -> None:
self.latest_kv = None

def add(self, kv: KeyValue):
self.latest_kv = kv

def get_result(self) -> Optional[KeyValue]:
return self.latest_kv


class Element:
def __init__(self, kv: KeyValue, iterator: RecordIterator[KeyValue], reader: RecordReader[KeyValue]):
self.kv = kv
Expand Down
Loading
Loading