[WIP][format] Add paimon-mosaic module with reader and writer#7917
[WIP][format] Add paimon-mosaic module with reader and writer#7917JingsongLi wants to merge 11 commits into
Conversation
…xtraction and tests Introduces the Mosaic file format integration for Paimon with: - MosaicRecordsReader: row-group level predicate filtering using statistics, column projection, and correct returnedPosition tracking - MosaicRecordsWriter: BundleFormatWriter with writerMetadata() support for in-memory stats capture (avoids re-reading files on object stores) - MosaicSimpleStatsExtractor: stats extraction from file or writerMetadata, with SimpleColStatsCollector integration - MosaicObjects: byte[] to Paimon object conversion for all supported types - Comprehensive test suite (6 test classes covering unit and integration tests) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for adding the Mosaic format integration. I found several blockers that need to be fixed before this can be merged:
-
paimon-mosaic-format/pom.xmladdsorg.apache.paimon.mosaic:mosaic-writer:0.1.0-SNAPSHOT, andpom.xmladds the module to the root reactor. This makes normal Paimon CI/release depend on an external SNAPSHOT artifact. The current PR checks are already failing, and Paimon releases should not depend on SNAPSHOT artifacts. Please switch to a released Mosaic artifact / a release-compatible dependency strategy before adding the module to the root reactor. -
MosaicRecordsWriter.close()callscollectMetadata()beforenativeWriter.close(). In paimon-mosaic,MosaicWriter.numRowGroups()andgetRowGroupStatistics()throwIllegalStateException("writer is not closed yet")untilMosaicWriter.close()has collected the stats. As written, closing any non-test-skipped writer will record that exception and rethrow at the end. Please close the native writer first, then collect metadata from the Java wrapper's cached stats. -
MosaicRecordsWritercreatesnew WriterOptions().zstdLevel(...)but never configuresstatsColumns. In paimon-mosaic, an emptystatsColumnsdisables stats collection, soMosaicSimpleStatsExtractoralways sees empty row-group stats and predicate pruning inMosaicRecordsReaderis effectively disabled. This also means file stats/null counts cannot match Paimon'sSimpleStatsExtractorcontract. Please enable stats for the supported Paimon columns, or otherwise make the extractor fall back to a correct row scan. -
MosaicObjects.convertStatsValue()decodes numeric stats withByteOrder.LITTLE_ENDIAN, but paimon-mosaic exposes stats usingValue::to_be_bytes()/ big-endian bytes. This reverses integer/float/decimal/timestamp min/max values and can lead to wrong file statistics and wrong row-group pruning. The currentMosaicObjectsTestalso constructs little-endian fixtures, so it does not catch this mismatch; please update both the conversion and tests to match the Mosaic API. -
MosaicSimpleStatsExtractor.extractWithFileInfo()computes row count by reading every row group (reader.readRowGroup(...).getRowCount()) and does not close the returnedVectorSchemaRoots. Since Mosaic now exposesrowGroupNumRows, this should sumreader.rowGroupNumRows(rg)instead. That avoids full data reads during stats extraction and prevents Arrow buffer/resource leaks.
I did not approve because these are correctness/build issues rather than nits.
1. Remove paimon-mosaic-format from root reactor to avoid CI/release dependency on external SNAPSHOT artifact 2. Fix writer close order: close native writer before collecting metadata (MosaicWriter requires close() before stats are accessible) 3. Configure statsColumns with all column indices to enable stats collection 4. Fix byte order: use big-endian (matching Mosaic's Value::to_be_bytes()) instead of little-endian for numeric stats decoding 5. Fix extractWithFileInfo() to use rowGroupNumRows() instead of reading full row groups (avoids data reads and Arrow buffer leaks) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for the quick fixes. The close-ordering, endian conversion, rowGroupNumRows row-count path, and the configured stats-columns wiring look much better now. I still have to request changes because the new root reactor module still depends on an external unreleased SNAPSHOT and does not compile from a clean checkout. I reproduced it with mvn -U -pl paimon-mosaic -am -Pfast-build -Dmaven.test.skip=true -DskipTests compile, which fails with Could not find artifact org.apache.paimon:mosaic:jar:0.1.0-SNAPSHOT. I also found one stats correctness issue on the file-only fallback path.
| <dependency> | ||
| <groupId>org.apache.paimon</groupId> | ||
| <artifactId>mosaic</artifactId> | ||
| <version>0.1.0-SNAPSHOT</version> |
There was a problem hiding this comment.
This still makes the default Paimon reactor depend on an external SNAPSHOT artifact. Since paimon-mosaic is listed in the root <modules>, a clean mvn -pl paimon-mosaic -am ... compile fails with Could not find artifact org.apache.paimon:mosaic:jar:0.1.0-SNAPSHOT. Please use a released artifact, build/include this dependency in the same releaseable reactor, or keep this module out of the default root reactor/profile-gated until the Mosaic Java artifact is released.
| MosaicInputFileAdapter inputFile = new MosaicInputFileAdapter(fileIO, path); | ||
| try (BufferAllocator allocator = new RootAllocator(); | ||
| MosaicReader reader = MosaicReader.open(inputFile, length, allocator)) { | ||
| return extractFromStats(reader.numRowGroups(), reader::getRowGroupStatistics, null); |
There was a problem hiding this comment.
The file-only extraction path loses the configured stats-column set. For a file written with only a subset of mosaic.stats-columns (or the default empty set), columns missing from Mosaic's ColumnStatistics keep nullCounts[colIdx] == 0, so untracked columns are returned as SimpleColStats(null, null, 0) instead of SimpleColStats.NONE. That can persist an exact zero-null count on fallback/migration paths. Please track which columns actually appear in the Mosaic stats and return NONE for unseen columns, or persist/recover the collected stats-column set from the file metadata. The same issue applies to extractWithFileInfo.
leaves12138
left a comment
There was a problem hiding this comment.
Besides the existing SNAPSHOT dependency issue, I found two more issues that should be fixed before this module is merged.
| if (minValues[colIdx] == null) { | ||
| minValues[colIdx] = min; | ||
| } else { | ||
| if (((Comparable<Object>) min).compareTo(minValues[colIdx]) < 0) { |
There was a problem hiding this comment.
MosaicObjects.convertStatsValue returns byte[] for BINARY/VARBINARY stats, but this aggregation path assumes every min/max value is Comparable. If a binary stats column spans more than one row group, the second row group will hit ClassCastException here. Please either compare through Paimon's byte-array-aware comparator (for example CompareUtils.compareLiteral(dataType, ...)) or do not expose min/max for binary columns, and add a multi-row-group binary stats test.
| } | ||
|
|
||
| @Override | ||
| public FormatWriter create(PositionOutputStream out, String compression) throws IOException { |
There was a problem hiding this comment.
The compression argument from Paimon's writer path is currently ignored. MosaicRecordsWriter always uses WriterOptions' default compression (zstd) and only forwards the zstd level, so table options like file.compression = none or file.compression.per.level silently have no effect for Mosaic files. Please map supported Paimon compression values to Mosaic WriterOptions.compression(...) (or reject unsupported ones explicitly) and add coverage for at least zstd and none.
leaves12138
left a comment
There was a problem hiding this comment.
I re-reviewed the latest revision ignoring the SNAPSHOT dependency. The previous binary-stats and file-only fallback issues look addressed, but I still found two non-SNAPSHOT issues around option semantics.
| // return WriterOptions.COMPRESSION_NONE; | ||
| return 0; | ||
| case "zstd": | ||
| default: |
There was a problem hiding this comment.
Thanks for wiring the compression argument. This still silently maps every value other than exact lower-case none and zstd to ZSTD. Paimon accepts values such as lz4, snappy, gzip, and options are generally case-insensitive; with file.suffix.include.compression = true, this can even create a .lz4.mosaic file whose Mosaic footer says ZSTD. Please normalize the value and fail fast for unsupported compressions instead of falling through to ZSTD, and add coverage for none plus an unsupported value. It would also be better for the Mosaic Java API to expose COMPRESSION_NONE instead of relying on magic 0.
| .compression(resolveCompression(compression)) | ||
| .zstdLevel(formatContext.zstdLevel()) | ||
| .numBuckets(numBuckets) | ||
| .rowGroupMaxSize(writeBatchMemory); |
There was a problem hiding this comment.
rowGroupMaxSize is currently derived from write.batch-memory. That option controls the Arrow writer buffer memory, while Paimon exposes file.block-size through FormatContext.blockSize() for format block / row-group sizing. As a result, Mosaic ignores file.block-size, and tuning write.batch-memory unexpectedly changes the Mosaic row-group layout. Please wire formatContext.blockSize() when it is configured, and otherwise leave Mosaic's own default (or document the intended default explicitly).
leaves12138
left a comment
There was a problem hiding this comment.
The file.block-size part of the fix looks good to me. However, the compression handling is still not safe if Mosaic intentionally supports only zstd for now.
|
|
||
| @Override | ||
| public FormatWriter create(PositionOutputStream out, String compression) { | ||
| // only support zstd, ignore compression |
There was a problem hiding this comment.
If Mosaic only supports ZSTD for now, silently ignoring this argument is still not safe. Users can configure file.compression = none/lz4/snappy/..., and Paimon may also put the configured compression into file names when file.suffix.include.compression is enabled, while the actual Mosaic footer remains ZSTD. Please fail fast unless the value is null or zstd (case-insensitive), or wait for the Mosaic Java API to expose and wire other supported compression constants explicitly.
leaves12138
left a comment
There was a problem hiding this comment.
Re-reviewed the latest revision. The non-SNAPSHOT issues I raised are addressed: unsupported non-zstd compression now fails fast, file.block-size is wired to Mosaic row group sizing, and the stats fallback fixes are still in place. Assuming the external Mosaic artifact is released before merge, no further comments from me.
Purpose
See https://paimon.apache.org/docs/mosaic/
Introduces the Mosaic file format integration for Paimon with:
Tests