[python] Extend commit protocol for compaction (DataIncrement/CompactIncrement)#7873
[python] Extend commit protocol for compaction (DataIncrement/CompactIncrement)#7873TheR1sing3un wants to merge 2 commits into
Conversation
…Increment) Lay the protocol-level groundwork for upcoming compaction work in pypaimon by aligning CommitMessage with Java's CommitMessageImpl shape and adding a JSON-safe wire format for cross-process transport. Structural changes: - New DataIncrement (write side) and CompactIncrement (compaction side) value objects, direct ports of org.apache.paimon.io.DataIncrement and CompactIncrement. Each holds (new_files, deleted_files, changelog_files, new_index_files, deleted_index_files) so future deletion-vector / changelog work has an unambiguous slot. - CommitMessage refactored to (partition, bucket, total_buckets, data_increment, compact_increment, check_from_snapshot). Convenience properties (new_files, compact_before, compact_after, ...) preserve read-site ergonomics. - FileStoreCommit emits ADD entries for compact_after, DELETE entries for compact_before, and auto-selects commit_kind=COMPACT when a message carries only compact increments. A dedicated commit_compact() helper enforces COMPACT-only semantics with no row-id assignment. - FileStoreWrite / TableUpdate construct CommitMessage via DataIncrement on the existing write path — no behavior change for current callers. DataFileMeta serde: - to_dict / from_dict round-trip with tagged-value encoding for bytes, Decimal, datetime, date, time, and Timestamp so file metas can ship through JSON-only transports (e.g. Ray task payloads later). - Public encode_value / decode_value helpers reused by CommitMessage's partition tuples (DATE / DECIMAL / bytes / Timestamp partitions). - Tolerates manifest-side BinaryRow (lazy-decoded) and pyarrow Array-like null_counts so round-tripping a freshly-produced file meta doesn't fail. CommitMessageSerializer: - VERSION=1 wire format covering full DataIncrement + CompactIncrement shape (including IndexFileMeta identity fields). dv_ranges / global_index_meta will be wired up alongside deletion-vector phases. No observable behavior change for read / write / commit today; this is foundation for the compaction module, append-only compaction job, PK LSM compaction, and Ray distributed executor that land in follow-up PRs (apache#7771 originally bundled all of them). Test plan: - New commit_message_serializer_test: round-trip CommitMessage with DataIncrement / CompactIncrement / index files / non-JSON-native partition tuples (DATE, Decimal, bytes, Timestamp); IndexFileMeta round-trip; unknown-version rejection. - New file_store_commit_compact_test: protocol-level coverage of compact_before -> DELETE entry, compact_after -> ADD entry, and auto-COMPACT kind selection (full e2e covered when the compactor lands). - Existing file_store_commit_test / partition_predicate_test / table_commit_test updated to construct CommitMessage via DataIncrement instead of the legacy new_files= kwarg. Refs: split from apache#7771 to ease incremental community review.
81e00a8 to
c1fc089
Compare
Correctness:
- FileStoreCommit._build_commit_entries now rejects DataIncrement.{deleted_files,
changelog_files, new_index_files, deleted_index_files} and
CompactIncrement.{changelog_files, new_index_files, deleted_index_files}.
These slots used to be silently dropped at commit; raising loudly turns
a future correctness foot-gun into a NotImplementedError so later
changelog-producer / deletion-vector / row-level-delete work has to wire
the new path through commit explicitly.
- ManifestEntry now uses msg.total_buckets when set, falling back to
self.table.total_buckets otherwise. A stale plan whose bucket count has
since been rescaled would otherwise be silently overwritten with the
new value.
- FileStoreCommit.commit() now rejects messages carrying compact_increment.
commit() is the write-side entry (always APPEND, OVERWRITE if conflict
detection demands it); compact_increment must go through commit_compact().
The previous 'auto-pick COMPACT when no new_files' branch was unreachable
(FileStoreWrite.prepare_commit() only fills new_files; CompactJob calls
commit_compact() directly) and would have produced the wrong snapshot
shape for a mixed message anyway.
Docs / naming:
- CommitMessageSerializer: docstring trimmed to its job (JSON wire format
for pypaimon ↔ pypaimon transport, e.g. Ray driver ↔ workers).
- commit_compact: docstring trimmed to its behavior.
- DataIncrement.empty() / CompactIncrement.empty() renamed to
empty_increment() for a more specific name; no callers in-tree yet.
- Trim cross-language commentary from class docstrings on CommitMessage,
DataIncrement, CompactIncrement.
Tests (file_store_commit_compact_test):
- New: NotImplementedError raised for unsupported DataIncrement /
CompactIncrement fields.
- New: msg.total_buckets wins over table.total_buckets when set; fallback
otherwise.
- New: commit() rejects compact_increment messages.
- Removed: two cases that exercised the old auto-COMPACT branch (now
unreachable).
c1fc089 to
4ee0654
Compare
|
Ready for review, 1st pr of the entire compaction feature. |
JingsongLi
left a comment
There was a problem hiding this comment.
Review: [python] Extend commit protocol for compaction (DataIncrement/CompactIncrement)
Overall this is a well-structured PR that cleanly separates write-side and compaction-side semantics by introducing DataIncrement / CompactIncrement value objects, aligning pypaimon's CommitMessage with Java's CommitMessageImpl. The code is clean, well-documented, and has good test coverage. A few observations:
Correctness
-
encode_valuedoes not handlelist/tuplevalues. If a GenericRow or partition column ever carries an ARRAY-typed value, serialization will raiseTypeError. This may be fine for the current scope (partition keys are typically scalar), but worth a brief comment or a graceful error message mentioning "ARRAY" if it's intentionally out of scope. -
_index_file_to_dictdropsdv_rangesandglobal_index_meta— the comment explains this is deferred to Phase 6/7. Just want to flag: if any code path populates those fields before Phase 6/7 lands, a round-trip through the serializer will silently lose them. TheIndexFileMeta.__eq__only checks the four scalar fields so tests won't catch this. Consider adding an assertion/warning in_index_file_to_dictif those fields are non-None, similar to how_build_commit_entriesrejects un-wired increment slots.
Design
-
_build_commit_entriesentry ordering. For a message with bothnew_filesandcompact_before/compact_after(currently rejected at the API level but structurally possible), the method emits ADD(new_files), then DELETE(compact_before), then ADD(compact_after). The explicit separation ofcommit()vscommit_compact()makes this a non-issue today, but the ordering assumption should be documented in a comment on_build_commit_entriesfor future maintainers who may relax the validation. -
total_bucketsfallback logic is good. Using the message'stotal_buckets(captured at plan time) over the table's current value correctly handles bucket rescale races. The testtest_build_entries_uses_message_total_buckets_when_setexplicitly covers this, which is great. -
Convenience properties on
CommitMessage(new_files,compact_before, etc.) preserve ergonomics nicely. These delegate cleanly and avoid breaking existing call sites.
Minor / Nits
-
In
_generic_row_to_dict, the fallback path[row.get_field(i) for i in range(len(row))]assumes the row implements__len__. A brief type annotation or protocol check (e.g.,InternalRow) would improve clarity for readers unfamiliar with the row hierarchy. -
The
CommitMessageSerializerusesseparators=(",", ":")for compact JSON output — good choice for a wire format since it minimizes payload size without sacrificing readability when pretty-printed for debugging. -
DataIncrement.empty_increment()andCompactIncrement.empty_increment()class methods duplicate whatDataIncrement()/CompactIncrement()already do with default factories. Unless these are intended as semantic markers for readability in call sites, they could be removed to reduce API surface.
Summary
Solid foundation work. The separation of concerns between write and compaction paths is well-motivated, the serializer is defensively versioned, and the fail-loud approach for un-wired slots (NotImplementedError) is the right call for incremental development. The test suite covers the key round-trip and rejection scenarios thoroughly.
Purpose
Align
CommitMessagewith Java'sCommitMessageImplshape and add a JSON-safe wire format, so later compaction work has somewhere to plugcompact_before/compact_afterfiles and a serializer to ship them through Ray workers.Foundation only — read / write / commit produce the same snapshots.
Split from #7771.
Changes
DataIncrement/CompactIncrementvalue objects;CommitMessagenow holds(partition, bucket, total_buckets, data_increment, compact_increment, check_from_snapshot). Convenience properties preservemsg.new_files/msg.compact_beforeergonomics.FileStoreCommitemits ADD forcompact_after, DELETE forcompact_before, auto-pickscommit_kind=COMPACTwhen only compact increments are present. Newcommit_compact()skips row-id assignment.DataFileMeta.to_dict/from_dictwith tagged encoding forbytes/Decimal/datetime/date/time/Timestamp;encode_value/decode_valuepublic forCommitMessage.partitionround-trip.CommitMessageSerializer(VERSION=1) covers the fullDataIncrement+CompactIncrementshape includingIndexFileMeta.Tests
commit_message_serializer_test— round-trip with non-JSON-native partition values + index files + version rejection.file_store_commit_compact_test—compact_before→ DELETE,compact_after→ ADD, auto-COMPACT kind.file_store_commit_test/partition_predicate_test/table_commit_testadapted to the newCommitMessagesignature.