Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,16 @@ class CoreOptions:
)
)

DYNAMIC_PARTITION_OVERWRITE: ConfigOption[bool] = (
ConfigOptions.key("dynamic-partition-overwrite")
.boolean_type()
.default_value(True)
.with_description(
"Whether only overwrite dynamic partition when overwriting a partitioned table "
"with dynamic partition columns. Works only when the table has partition keys."
)
)

def __init__(self, options: Options):
self.options = options

Expand Down Expand Up @@ -622,3 +632,6 @@ def read_batch_size(self, default=None) -> int:

def add_column_before_partition(self) -> bool:
return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False)

def dynamic_partition_overwrite(self) -> bool:
return self.options.get(CoreOptions.DYNAMIC_PARTITION_OVERWRITE)
100 changes: 98 additions & 2 deletions paimon-python/pypaimon/tests/partition_predicate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def _mock_table():
table.fields = TABLE_FIELDS
table.partition_keys = ['dt', 'region']
table.partition_keys_fields = PARTITION_FIELDS
table.options.options.get = Mock(return_value="__DEFAULT_PARTITION__")
return table


Expand Down Expand Up @@ -183,9 +184,9 @@ def _extract_partition_predicate(self, commit):
return mock_cls.call_args[1]['partition_predicate']

def test_overwrite_rejects_mismatched_partition(self, *_):
commit = self._create_commit(stub_commit=False)
commit = self._create_commit()
with self.assertRaises(RuntimeError) as ctx:
commit.overwrite(self._TARGET, [self._msg(('2024-01-15', 'us-west-2'))], 1)
commit._create_static_partition_filter(self._TARGET, [self._msg(('2024-01-15', 'us-west-2'))])
self.assertIn('does not belong to this partition', str(ctx.exception))

def test_overwrite_passes_partition_scoped_predicate(self, *_):
Expand All @@ -208,6 +209,89 @@ def test_drop_partitions_passes_or_predicate(self, *_):
self.assertTrue(pred.test(OffsetRow(('2024-01-16', 'us-west-2'), 0, 2)))
self.assertFalse(pred.test(OffsetRow(('2024-01-17', 'eu-west-1'), 0, 2)))

def test_overwrite_null_partition_value(self, *_):
"""Test that overwrite with None partition value uses isNull predicate."""
commit = self._create_commit()
target = {'dt': None, 'region': 'us-east-1'}
commit.overwrite(target, [self._msg((None, 'us-east-1'))], 1)

pred = self._extract_partition_predicate(commit)
# Should match rows where dt is None and region is 'us-east-1'
self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
# Should not match rows where dt has a value
self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 2)))
# Should not match rows where region differs
self.assertFalse(pred.test(OffsetRow((None, 'us-west-2'), 0, 2)))

def test_overwrite_default_partition_name_treated_as_null(self, *_):
"""Test that overwrite with default partition name string is treated as null."""
commit = self._create_commit()
target = {'dt': '__DEFAULT_PARTITION__', 'region': 'us-east-1'}
commit.overwrite(target, [self._msg((None, 'us-east-1'))], 1)

pred = self._extract_partition_predicate(commit)
# __DEFAULT_PARTITION__ should be treated like None (isNull)
self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 2)))

def test_overwrite_all_null_partition_values(self, *_):
"""Test overwrite where all partition values are None."""
commit = self._create_commit()
target = {'dt': None, 'region': None}
commit.overwrite(target, [self._msg((None, None))], 1)

pred = self._extract_partition_predicate(commit)
self.assertTrue(pred.test(OffsetRow((None, None), 0, 2)))
self.assertFalse(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
self.assertFalse(pred.test(OffsetRow(('2024-01-15', None), 0, 2)))

def test_overwrite_null_partition_rejects_mismatched(self, *_):
"""Test that overwrite with null partition rejects rows that don't match."""
commit = self._create_commit()
target = {'dt': None, 'region': 'us-east-1'}
# Trying to overwrite null dt partition with data that has a non-null dt
with self.assertRaises(RuntimeError) as ctx:
commit._create_static_partition_filter(target, [self._msg(('2024-01-15', 'us-east-1'))])
self.assertIn('does not belong to this partition', str(ctx.exception))

def test_dynamic_overwrite_null_partition_value(self, *_):
"""Test dynamic partition overwrite with None partition values."""
commit = self._create_commit()
self.table.options.dynamic_partition_overwrite.return_value = True
commit.overwrite({}, [self._msg((None, 'us-east-1'))], 1)

pred = self._extract_partition_predicate(commit)
self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 2)))

def test_dynamic_overwrite_mixed_null_and_nonnull(self, *_):
"""Test dynamic partition overwrite with both null and non-null partitions."""
commit = self._create_commit()
self.table.options.dynamic_partition_overwrite.return_value = True
commit.overwrite({}, [
self._msg(('2024-01-15', 'us-east-1')),
self._msg((None, 'us-west-2')),
], 1)

pred = self._extract_partition_predicate(commit)
self.assertTrue(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 2)))
self.assertTrue(pred.test(OffsetRow((None, 'us-west-2'), 0, 2)))
self.assertFalse(pred.test(OffsetRow(('2024-01-16', 'eu-west-1'), 0, 2)))

def test_drop_partitions_null_partition_value(self, *_):
"""Test drop_partitions with default partition name string representing null."""
commit = self._create_commit()
commit.drop_partitions([
{'dt': '__DEFAULT_PARTITION__', 'region': 'us-east-1'},
{'dt': '2024-01-16', 'region': 'us-west-2'},
], 1)

pred = self._extract_partition_predicate(commit)
self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
self.assertTrue(pred.test(OffsetRow(('2024-01-16', 'us-west-2'), 0, 2)))
self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 2)))
self.assertFalse(pred.test(OffsetRow((None, 'us-west-2'), 0, 2)))


class TestCommitScannerPartitionPredicate(unittest.TestCase):

Expand All @@ -225,6 +309,18 @@ def test_filter_uses_partition_key_index(self):
self.assertTrue(pred.test(GenericRow(['2024-01-16', 'us-west-2'], PARTITION_FIELDS)))
self.assertFalse(pred.test(GenericRow(['2024-01-17', 'eu-west-1'], PARTITION_FIELDS)))

def test_filter_handles_null_partition_values(self):
scanner = self._scanner()
pred = scanner._build_partition_filter_from_entries([
_manifest_entry([None, 'us-east-1']),
_manifest_entry(['2024-01-16', 'us-west-2']),
])

self.assertTrue(pred.test(GenericRow([None, 'us-east-1'], PARTITION_FIELDS)))
self.assertTrue(pred.test(GenericRow(['2024-01-16', 'us-west-2'], PARTITION_FIELDS)))
self.assertFalse(pred.test(GenericRow(['2024-01-15', 'us-east-1'], PARTITION_FIELDS)))
self.assertFalse(pred.test(GenericRow([None, 'us-west-2'], PARTITION_FIELDS)))

def test_filter_none_without_partition_keys(self):
scanner = CommitScanner(Mock(partition_keys=[]), Mock())

Expand Down
65 changes: 65 additions & 0 deletions paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,71 @@ def test_overwrite(self):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def test_dynamic_partition_overwrite(self):
pa_schema = pa.schema([
('f0', pa.string()),
('f1', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
self.rest_catalog.create_table('default.test_dynamic_overwrite', schema, False)
table = self.rest_catalog.get_table('default.test_dynamic_overwrite')
read_builder = table.new_read_builder()

# Write initial non-null and null partitions
self._batch_write(table, pd.DataFrame({
'f0': ['a', 'b', None],
'f1': ['apple', 'banana', 'cherry'],
}))

# Dynamic overwrite partition f0='a' only; 'b' and null untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': ['a'],
'f1': ['watermelon'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': ['a', 'b', None],
'f1': ['watermelon', 'banana', 'cherry'],
}), sort_by='f0')

# Dynamic overwrite partitions f0='a' and f0=None; 'b' untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': ['a', None],
'f1': ['mango', 'grape'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': ['a', 'b', None],
'f1': ['mango', 'banana', 'grape'],
}), sort_by='f0')

def _batch_write(self, table, df):
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _batch_overwrite(self, table, df, partition=None):
write_builder = table.new_batch_write_builder().overwrite(partition)
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df = table_read.to_pandas(table_scan.plan().splits())
if sort_by:
actual_df = actual_df.sort_values(by=sort_by)
pd.testing.assert_frame_equal(
actual_df.reset_index(drop=True), expected_df.reset_index(drop=True))

def test_full_data_types(self):
simple_pa_schema = pa.schema([
('f0', pa.int8()),
Expand Down
65 changes: 65 additions & 0 deletions paimon-python/pypaimon/tests/reader_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,71 @@ def test_overwrite(self):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def test_dynamic_partition_overwrite(self):
pa_schema = pa.schema([
('f0', pa.string()),
('f1', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
self.catalog.create_table('default.test_dynamic_overwrite', schema, False)
table = self.catalog.get_table('default.test_dynamic_overwrite')
read_builder = table.new_read_builder()

# Write initial non-null and null partitions
self._batch_write(table, pd.DataFrame({
'f0': ['a', 'b', None],
'f1': ['apple', 'banana', 'cherry'],
}))

# Dynamic overwrite partition f0='a' only; 'b' and null untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': ['a'],
'f1': ['watermelon'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': ['a', 'b', None],
'f1': ['watermelon', 'banana', 'cherry'],
}), sort_by='f0')

# Dynamic overwrite partitions f0='a' and f0=None; 'b' untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': ['a', None],
'f1': ['mango', 'grape'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': ['a', 'b', None],
'f1': ['mango', 'banana', 'grape'],
}), sort_by='f0')

def _batch_write(self, table, df):
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _batch_overwrite(self, table, df, partition=None):
write_builder = table.new_batch_write_builder().overwrite(partition)
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df = table_read.to_pandas(table_scan.plan().splits())
if sort_by:
actual_df = actual_df.sort_values(by=sort_by)
pd.testing.assert_frame_equal(
actual_df.reset_index(drop=True), expected_df.reset_index(drop=True))

def test_full_data_types(self):
simple_pa_schema = pa.schema([
('f0', pa.int8()),
Expand Down
65 changes: 65 additions & 0 deletions paimon-python/pypaimon/tests/rest/rest_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,71 @@ def test_overwrite(self):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def test_dynamic_partition_overwrite(self):
pa_schema = pa.schema([
('f0', pa.string()),
('f1', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
self.rest_catalog.create_table('default.test_dynamic_overwrite', schema, False)
table = self.rest_catalog.get_table('default.test_dynamic_overwrite')
read_builder = table.new_read_builder()

# Write initial non-null and null partitions
self._batch_write(table, pd.DataFrame({
'f0': ['a', 'b', None],
'f1': ['apple', 'banana', 'cherry'],
}))

# Dynamic overwrite partition f0='a' only; 'b' and null untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': ['a'],
'f1': ['watermelon'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': ['a', 'b', None],
'f1': ['watermelon', 'banana', 'cherry'],
}), sort_by='f0')

# Dynamic overwrite partitions f0='a' and f0=None; 'b' untouched
self._batch_overwrite(table, pd.DataFrame({
'f0': ['a', None],
'f1': ['mango', 'grape'],
}))

self._assert_table_equals(read_builder, pd.DataFrame({
'f0': ['a', 'b', None],
'f1': ['mango', 'banana', 'grape'],
}), sort_by='f0')

def _batch_write(self, table, df):
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _batch_overwrite(self, table, df, partition=None):
write_builder = table.new_batch_write_builder().overwrite(partition)
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_pandas(df)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df = table_read.to_pandas(table_scan.plan().splits())
if sort_by:
actual_df = actual_df.sort_values(by=sort_by)
pd.testing.assert_frame_equal(
actual_df.reset_index(drop=True), expected_df.reset_index(drop=True))

def test_parquet_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet', schema, False)
Expand Down
5 changes: 4 additions & 1 deletion paimon-python/pypaimon/write/commit/commit_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ def _build_partition_filter_from_entries(self, entries: List[ManifestEntry]):
for partition_values in changed_partitions:
sub_predicates = []
for i, key in enumerate(partition_keys):
sub_predicates.append(predicate_builder.equal(key, partition_values[i]))
if partition_values[i] is None:
sub_predicates.append(predicate_builder.is_null(key))
else:
sub_predicates.append(predicate_builder.equal(key, partition_values[i]))
partition_predicates.append(predicate_builder.and_predicates(sub_predicates))

return predicate_builder.or_predicates(partition_predicates)
Loading
Loading