Skip to content

[python] Add compaction module with Ray distributed executor#7771

Draft
TheR1sing3un wants to merge 12 commits intoapache:masterfrom
TheR1sing3un:feat/python-compaction-phase1-4
Draft

[python] Add compaction module with Ray distributed executor#7771
TheR1sing3un wants to merge 12 commits intoapache:masterfrom
TheR1sing3un:feat/python-compaction-phase1-4

Conversation

@TheR1sing3un
Copy link
Copy Markdown
Member

@TheR1sing3un TheR1sing3un commented May 6, 2026

Summary

Brings Apache Paimon's compaction story to pypaimon end-to-end:

  • Phase 1: Extend CommitMessage with compact_before / compact_after; FileStoreCommit emits ADD + DELETE manifest entries and a new commit_compact() helper produces snapshots with commit_kind=COMPACT. DataFileMeta gains JSON-friendly to_dict / from_dict and a CommitMessageSerializer for cross-process transport.
  • Phase 2: Append-only compaction end-to-end via table.new_compact_job(...).execute(), plumbed through a Coordinator → Task → Executor → Driver-commit shape that mirrors Spark CompactProcedure. Ships a LocalExecutor for in-process / test usage.
  • Phase 3: Primary-key (merge-tree) compaction. Direct port of Levels + UniversalCompaction (size-amp / size-ratio / file-num three-stage decision). New abstract MergeFunction + factory; DeduplicateMergeFunction migrated, PartialUpdate / Aggregate / FirstRow stubbed so configured tables fail loudly with a Phase 6 message.
  • Phase 4: RayExecutor wires the same CompactJob to Ray. Driver serializes each CompactTask to JSON (table + payload + catalog loader spec), workers rebuild their FileStoreTable via the catalog and run the rewriter, driver collects messages for one atomic commit.

Each phase landed as a separate commit, with a follow-up *-fixup commit addressing the review findings inline. Eight commits total — a single PR keeps the design coherent for review while commits stay small enough to walk through.

Out of scope (later PRs):

  • Phase 5 — CLI + python -m pypaimon.compact.entrypoint for ray job submit
  • Phase 6 — Real PartialUpdate / Aggregate / FirstRow MergeFunction bodies
  • Phase 7 — Sort compact (zorder/hilbert), Deletion Vectors, Changelog producer, streaming Ray-Actor coordinator

Plan / design doc: `/Users/lcy/.claude/plans/paimon-compaction-java-spark-python-com-cached-cosmos.md` (local).

Test plan

  • Unit: `Levels` semantics, `UniversalCompaction` three-stage decision (size-amp, size-ratio, file-num, force-pick-L0), `MergeFunction` registry + Phase-6 stubs, `CompactOptions` validation, `CommitMessageSerializer` JSON round-trip, `CompactTask` serde with non-JSON-native partitions
  • Append-only e2e: file-count drops, data identity preserved, `snapshot.commit_kind == "COMPACT"`, partitioned per-partition messages, no-op on below-trigger
  • Primary-key e2e: full-compaction dedup keeps latest, level promotion to >0, no-op below trigger
  • Rewriter contracts: does not mutate manifest-owned `DataFileMeta`, aborts partial output on failure
  • Ray e2e (skipped if `ray` not installed): `ray.init(local_mode=True)` runs Append-only compaction end-to-end through real `ray.remote` task dispatch + `ray.get` collection
  • Integration with a Java-written table (manual; left for follow-up before un-drafting)

Lay the protocol-level groundwork for upcoming compaction work:
- CommitMessage gains compact_before / compact_after fields so a single
  message can carry both deletion and addition of files in a compact
  result.
- FileStoreCommit emits ADD entries for compact_after and DELETE entries
  for compact_before; commit() auto-selects COMPACT kind when no
  new_files are present, and a dedicated commit_compact() helper enforces
  COMPACT-only semantics with no row-id assignment.
- DataFileMeta exposes to_dict / from_dict round-trip plus tagged-value
  encoding (bytes, decimal, datetime, date, time, Timestamp) so file
  metas can be shipped JSON-safely between processes.
- New CommitMessageSerializer wraps the JSON form for use as a
  CompactTask payload (Phase 4 will consume it from the Ray executor).

No write/read behavior changes for existing callers.
… values

Promote encode_value/decode_value to the public DataFileMeta API and
reuse them for CommitMessage.partition. Without this, partitions
containing DATE/DECIMAL/bytes/Timestamp would crash json.dumps once
Phase 4 ships CommitMessage payloads through Ray workers.

Tests: round-trip date/Decimal/bytes and Timestamp partition tuples.
End-to-end Append-only compaction in-process, exposed as
table.new_compact_job(...).execute(). The plumbing follows the same
Coordinator → Executor → Driver-commit shape Spark uses, so plugging
in a Ray backend in Phase 4 only swaps out the executor.

New compact package layout (kept stable as Phase 3 will plug PK):
  pypaimon/compact/
    options.py
    coordinator/{coordinator.py, append_compact_coordinator.py}
    task/{compact_task.py, append_compact_task.py}
    rewriter/{rewriter.py, append_compact_rewriter.py}
    executor/{executor.py, local_executor.py}
    job/compact_job.py

Behavior:
- Coordinator scans the latest snapshot via FileScanner.plan_files,
  groups by (partition, bucket), filters out already target_file_size+
  files, and chunks each bucket at max_file_num. full_compaction=True
  rewrites every file regardless of size or count.
- Rewriter feeds files batch-by-batch into AppendOnlyDataWriter so the
  writer's existing target_file_size rolling produces correctly sized
  output without a separate rolling layer.
- AppendCompactTask captures the in-process FileStoreTable directly;
  to_dict/from_dict are stubbed and raise — Phase 4 will fill them in
  once Ray needs serialization.
- CompactJob assembles CommitMessage(compact_before, compact_after)
  from each task and calls FileStoreCommit.commit_compact for a single
  atomic snapshot tagged COMPACT.

Tests cover threshold behavior, full_compaction override, max_file_num
chunking, PK rejection, partitioned/unpartitioned e2e (file count
shrinks, data identity preserved, snapshot kind=COMPACT), and the
no-op path.
- Rewriter: stop mutating manifest-owned DataFileMeta. Resolve the
  read path locally each iteration, preferring external_path (matches
  SplitRead.file_reader_supplier) over file_path, and never write back.
- Rewriter: seed sequence_generator per bucket_mode — 0 for
  BUCKET_UNAWARE and max(input.max_seq) for HASH_FIXED — matching
  FileStoreWrite._create_data_writer instead of always using max.
- Rewriter: abort the AppendOnlyDataWriter on failure so partial
  output files don't leak when an executor raises mid-rewrite.
- CompactOptions: validate min_file_num >= 1 and max_file_num >=
  min_file_num at construction so misconfiguration fails loudly
  instead of being silently rounded up.
- AppendCompactCoordinator: drop the silent max(min, max) rescue and
  document that the trailing chunk below min_file_num is intentionally
  dropped (deferred to a future change).
- CompactTask: align docstring with reality — JSON serialization is
  declared on the base class but concrete subclasses may defer it
  until distributed execution arrives in Phase 4.

Tests: rewriter must not mutate input metadata; rewriter must abort
output on failure; CompactOptions validation. All 15 compact tests
plus 60 commit/manifest/scanner regression tests pass.
…strategy

End-to-end primary-key compaction in-process. table.new_compact_job(...)
on a PK table now plans a MergeTreeCompactTask per (partition, bucket)
that is eligible under UniversalCompaction's three-stage decision
(size-amp / size-ratio / file-num), rewrites it via SortMergeReader +
MergeFunction, and commits the result with snapshot kind=COMPACT.

New modules:
  pypaimon/compact/levels.py
    Direct port of Java mergetree.Levels — L0 ordered by maxSeq DESC,
    L1..N hold one SortedRun each, update() routes per-level.
  pypaimon/compact/strategy/
    compact_unit.py + strategy.py + universal_compaction.py — full
    Universal Compaction algorithm (size-amp, size-ratio, file-num,
    force-pick-L0). EarlyFullCompaction / OffPeak left for later.
  pypaimon/compact/rewriter/merge_tree_rolling_writer.py
    Subclass of DataWriter that consumes pre-merged KV batches; rewrites
    each appended file's metadata with the strategy's output_level, the
    actual min/max sequence numbers and retract count.
  pypaimon/compact/rewriter/merge_tree_compact_rewriter.py
    Drives IntervalPartition → per-section ConcatRecordReader →
    SortMergeReader (with the table's MergeFunction) → optional
    DropDeleteRecordReader → buffered RecordBatch → rolling writer.
  pypaimon/compact/coordinator/merge_tree_compact_coordinator.py
    Per-(partition, bucket) Levels build + strategy.pick + drop_delete
    rule (output_level >= non_empty_highest_level).
  pypaimon/compact/task/merge_tree_compact_task.py
    Carries one CompactUnit; assembles CommitMessage(compact_before,
    compact_after) for the driver to commit atomically.

Read path:
  pypaimon/read/reader/merge_function.py
    Abstract MergeFunction + Factory; DeduplicateMergeFunction migrated
    from sort_merge_reader.py. PartialUpdate / Aggregate / FirstRow are
    stubbed so configured tables fail loudly with a Phase 6 message.
  SortMergeReaderWithMinHeap accepts an optional merge_function
    (default DeduplicateMergeFunction → existing read path unchanged).
  KeyValue.row_tuple exposes the underlying physical tuple so the
    rewriter can buffer KVs back into a RecordBatch.

CompactJob now routes PK tables to MergeTreeCompactCoordinator.

Tests: 19 unit (Levels semantics, UniversalCompaction trigger
algorithm, MergeFunction registry + stubs) + 2 PK e2e (full-compaction
dedup keeps latest values & promotes level; below-trigger no-op).
99-test combined regression on commit/manifest/scanner/reader paths.
- Rewriter: count_retract_rows now matches RowKind.is_add_byte (only
  UPDATE_BEFORE=1 and DELETE=3 are retracts). The previous != 0 check
  wrongly inflated delete_row_count by counting UPDATE_AFTER rows,
  which would skew downstream size-amplification estimates and metrics.
- Levels.update: reject out-of-range levels with a clear ValueError
  instead of letting an IndexError leak from _update_level when a
  buggy strategy hands back an output_level above number_of_levels().
- Extract build_kv_file_fields() to split_read.py and consume it from
  both SplitRead._create_key_value_fields and the merge-tree rewriter,
  so the on-disk KV file schema layout (key cols / seq / kind / value)
  cannot drift between read and compact paths.
Compact jobs can now run their work on Ray. table.new_compact_job(...,
executor=RayExecutor(), catalog_options=..., table_identifier=...).execute()
plans on the driver, ships JSON-serialized CompactTask payloads through
ray.remote, rebuilds the FileStoreTable inside each worker via the
configured catalog, runs the rewriter, and returns CommitMessages back
to the driver for one atomic commit.

CompactTask base class:
- with_table_loader(catalog_options, table_identifier) attaches the
  spec a worker uses to rebuild its table.
- to_dict / from_dict are now concrete: a base envelope holding type +
  loader spec + payload, with subclasses owning _to_payload /
  _from_payload. CompactTask.deserialize(payload) returns the right
  subclass via the registry.
- _resolve_table_via_loader() centralizes catalog rebuild so subclasses
  share a single in-process-vs-distributed branch.

AppendCompactTask / MergeTreeCompactTask:
- replace the Phase 3 NotImplementedError stubs with real payload
  encoders that round-trip files via DataFileMeta.to_dict and
  partition tuples via encode_value/decode_value (handles DATE /
  Decimal / Timestamp partition columns).
- _resolve_table prefers the in-process table when LocalExecutor
  attached one and falls back to the loader otherwise.

CompactJob:
- Accepts catalog_options + table_identifier and propagates them onto
  every task before dispatch when present. LocalExecutor path
  unchanged.

RayExecutor:
- Top-level _run_task_payload worker so Ray pickling stays cheap and
  worker code can't capture driver state.
- ray.init only when not already initialized; respects ray_init_args.
- num_cpus_per_task + ray_remote_args expose the usual Ray knobs.

DataFileMeta serialization:
- Tolerate manifest-side BinaryRow (lazy-decoded) in addition to
  GenericRow, and pyarrow Array-like null_counts. Without this the
  Ray round trip fails on files that were just produced by the writer.

setup.py already declared ray as an optional extra (pip install
pypaimon[ray]); no packaging changes required.

Tests:
- compact_task_serde_test (5 tests): round-trip Append + MergeTree
  payloads with loader spec and non-JSON-native partitions; clear
  error when neither table nor loader was attached; unknown-type
  rejection in the registry.
- ray_executor_test (1 test): end-to-end Append-only compaction via a
  real ray.init(local_mode=True), asserting commit_kind=COMPACT and
  data identity. Skipped automatically if ray isn't installed.
- CompactJob.table_identifier default uses Identifier.get_full_name()
  instead of str(identifier). Identifier is a dataclass with no custom
  __str__, so str(...) returns its repr ("Identifier(database='db',
  ...)") and Identifier.from_string would refuse to parse that on the
  worker side. The default path was untested in Phase 4 (e2e passed
  only because the test explicitly passed table_identifier=...) — this
  fixup also drops that explicit kwarg from the e2e so the default is
  exercised.
- RayExecutor module imports the AppendCompactTask / MergeTreeCompactTask
  modules at the top level so their @register_compact_task side effects
  populate the task registry inside Ray worker processes. Without this,
  a real (non-local_mode) Ray cluster would unpickle _run_task_payload
  in a fresh process whose registry is empty and CompactTask.deserialize
  would raise "Unknown CompactTask type".
- MergeTreeCompactTask docstring updated — it no longer says
  "Phase 4 will plumb the loader fields" since Phase 4 already did.
…ctIncrement

Restructure CommitMessage to mirror org.apache.paimon.table.sink.CommitMessageImpl
exactly: instead of dropping new_files / compact_before / compact_after onto
CommitMessage as flat fields, package them inside DataIncrement and
CompactIncrement value objects that match their Java counterparts
field-for-field. This makes Python and Java messages structurally identical
and gives later phases a single, unambiguous slot to plug deletion vectors,
changelog files, and global index deltas into without inventing parallel
field names.

New value objects:
- DataIncrement(new_files, deleted_files, changelog_files,
  new_index_files, deleted_index_files) — direct port of
  org.apache.paimon.io.DataIncrement.
- CompactIncrement(compact_before, compact_after, changelog_files,
  new_index_files, deleted_index_files) — direct port of
  org.apache.paimon.io.CompactIncrement.

CommitMessage now holds (partition, bucket, total_buckets,
data_increment, compact_increment, check_from_snapshot). Convenience
properties (new_files, compact_before, compact_after, changelog_files,
...) keep call-sites readable without leaking the increment shape.

Migration:
- FileStoreWrite.prepare_commit, TableUpdate.prepare_commit,
  AppendCompactTask.run, MergeTreeCompactTask.run all build their
  CommitMessage through DataIncrement / CompactIncrement and now also
  populate total_buckets the way Java does.
- CommitMessageSerializer wire format bumps to version=2 and round-trips
  the full increment shape, including index file lists. IndexFileMeta
  serialization covers identity fields only — dv_ranges /
  global_index_meta will be wired up alongside the deletion-vector and
  changelog phases. Tests updated to construct messages via increments.

No behavior changes for the existing commit / read paths: FileStoreCommit
still reads message.new_files / compact_before / compact_after through the
new convenience properties.
This PR has not landed yet, so there is no on-disk / cross-process
payload from a prior version to stay compatible with — VERSION still
denotes "first shipped wire format". Bump it once we actually need to
break compat with a released version.
Replace the count-based chunking in _pick_files_for_bucket with the
size-based bin-packing algorithm Java's AppendCompactCoordinator
.SubCoordinator.pack uses, so plans produced by the Python coordinator
match Java's task shape on the same input:

- Sort candidates by file_size ascending instead of by sequence number,
  so smaller files lead and the packer has the most room to grow each
  bin before overshooting.
- Drain a bin as soon as it has >1 file AND its weighted size hits
  target_file_size * 2. The hardcoded ×2 is Java's "each task should
  yield ~2 target-sized output files" constant.
- Account for source.split.open-file-cost in bin size, matching Java's
  per-file IO weight: a bucket of many tiny files now fans out into
  several tasks instead of being packed into one giant task.
- Trailing bin emits only when it has at least min_file_num files;
  shorter tails wait for company on the next plan. full_compaction=True
  drops that minimum to 1 so a "rewrite this bucket" intent always
  produces at least one task.

CompactOptions:
- Drop max_file_num — Java has no such concept and size-based packing
  caps each task at ~2x target naturally.
- Drop the now-irrelevant max>=min check; the only invariant left is
  min_file_num >= 1.

Tests:
- New append_compact_packing_test (9 cases) drives the algorithm
  directly with hand-built DataFileMeta lists, mirroring the kind of
  coverage Java's AppendCompactCoordinatorTest has for pack().
- E2E coordinator/rewriter/Ray tests now zero source.split.open-file-cost
  on their tiny test tables (default 4 MB would dominate the 1 KB
  parquet files and trigger spurious mid-loop drains).
- Drop test_chunks_when_exceeding_max_file_num (max_file_num is gone)
  in favor of test_many_small_files_pack_into_single_task which
  documents the realistic tiny-file behavior.
… Java

Java BaseAppendFileStoreWrite.compactRewrite seeds its
RowDataRollingFileWriter with LongCounter(toCompact[0].minSequenceNumber())
and increments per row written. Each rolled output file therefore carries
a precise [first_row_seq, last_row_seq] range and the union across all
output files is contiguous: [seed, seed + total_input_rows - 1].

The previous Python rewriter:
- seeded the writer with bucket-mode-dependent values (UNAWARE→0,
  HASH_FIXED→max(input.max_seq)) which had no Java analog;
- never advanced sequence_generator.current — so every committed file
  ended up with min_seq == max_seq, i.e. compact output threw away the
  per-row seq information Java preserves.

This commit introduces AppendCompactRollingWriter, an AppendOnlyDataWriter
subclass that:
- treats sequence_generator.start as Java's "next-to-assign" counter,
  so a slice of N rows is laid out as [seq_start, seq_start + N - 1];
- works around the base SequenceGenerator's off-by-one quirk by setting
  current = seq_end before super()._write_data_to_file (so the parent
  reads min/max as seq_start/seq_end exactly) and bumping both fields to
  seq_end + 1 afterwards (so the next slice picks up where this one ended);
- stamps file_source = COMPACT on the just-appended DataFileMeta, the
  same shape MergeTreeRollingWriter uses on the PK side, instead of
  mutating it back in the rewriter.

Rewriter:
- seed_seq = files[0].min_sequence_number, matching Java's
  toCompact.get(0).minSequenceNumber();
- drops the bucket-mode-dependent _initial_max_seq helper.

Tests:
- new test_output_seq_range_starts_at_input0_min_seq_and_spans_total_rows
  enforces the Java contract: per-file (max - min + 1 == row_count) and
  cross-file (no gaps/overlaps, range = [seed, seed+total-1]);
- new test_output_files_tagged_compact_source verifies file_source is set
  by the writer, not the rewriter.

Out of scope (still NOTEd in the rewriter docstring): Java's reader path
runs through ReadForCompact for schema-evolution + DV awareness; pypaimon
still reads parquet directly. Both will be wired up alongside the broader
schema-evolution / deletion-vector phases.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant