[globalindex] Support multi-column GlobalIndex framework#7933
Conversation
3ba3a58 to
394d600
Compare
Extend the GlobalIndex SPI, build path, and query path to support one index builder handling multiple columns (e.g. Lucene indexing title + content + tags together). Key changes: - GlobalIndexerFactory/GlobalIndexer: add List<DataField> create overloads - GlobalIndexMultiColumnWriter: new interface for multi-column writes - GlobalIndexBuilderUtils: toIndexFileMetas/createIndexWriter accept List<DataField> - GlobalIndexScanner: route extraFieldIds to same reader group - VectorScanImpl/FullTextScanImpl: match against extraFieldIds - GenericIndexTopoBuilder (Flink): multi-column projection and writer dispatch - DefaultGlobalIndexBuilder/TopoBuilder (Spark): multi-column support - All single-column APIs preserved for backward compatibility
394d600 to
682e613
Compare
JingsongLi
left a comment
There was a problem hiding this comment.
Review: [globalindex] Support multi-column GlobalIndex framework
Overall this is a well-structured change that cleanly extends the single-column SPI to support multi-column indexes. The backward compatibility is handled well via default methods. I have a few concerns about correctness and robustness:
1. [Bug] Flink BuildIndexOperator: Multi-column writer receives row with extra _ROW_ID field
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
The projected row type is built as:
List<String> readColumns = new ArrayList<>(indexColumns);
readColumns.add(SpecialFields.ROW_ID.name());
RowType projectedRowType = SpecialFields.rowTypeWithRowId(rowType).project(readColumns);So the InternalRow read from data has the layout [indexCol1, indexCol2, ..., _ROW_ID]. In the multi-column branch:
if (multiColumn) {
((GlobalIndexMultiColumnWriter) indexWriter).write(row);
}The entire row (including the trailing _ROW_ID field) is passed to the writer. But the writer was created with only the index fields via createIndexWriter(table, indexType, indexFields, mergedOptions), and the Javadoc on GlobalIndexMultiColumnWriter.write() states: "The row layout matches the fields order passed to GlobalIndexerFactory#create(List, Options)".
This is a contract violation. Writer implementations that iterate over all row fields or check row.getFieldCount() will see an unexpected extra column. You should either:
- Create a sub-projection of the row that excludes
_ROW_IDbefore passing it to the writer, or - Clearly document that the row may contain trailing fields beyond the indexed columns (and update the Javadoc accordingly).
2. [Correctness] No null-field handling in multi-column mode (Flink)
In single-column mode, if the indexed field is null, the shard stops early:
Object fieldData = indexFieldGetters[0].getFieldOrNull(row);
if (fieldData == null) {
LOG.info("Null value at rowId={}, stopping shard [{}, {}].", ...);
break;
}In multi-column mode, the row is passed directly without any null check on individual fields. If one of the indexed columns is null in a multi-column row, the behavior depends entirely on the writer implementation. This asymmetry could lead to:
- Writer failures (NPE inside Lucene, for example)
- Silent corruption of the index
Consider at minimum documenting this contract difference, or adding validation that checks indexed columns for null before passing to the multi-column writer.
3. [Robustness] Unsafe cast without instanceof check
Files: GenericIndexTopoBuilder.java (Flink), DefaultGlobalIndexBuilder.java (Spark)
Both paths cast the writer based solely on the multiColumn flag:
((GlobalIndexMultiColumnWriter) indexWriter).write(row);However, GlobalIndexerFactory.create(List<DataField>, Options) has a default implementation that falls back to single-column:
default GlobalIndexer create(List<DataField> fields, Options options) {
return create(fields.get(0), options);
}If an existing factory has not been updated to support multi-column (and relies on this default), it will return a GlobalIndexSingletonWriter, and the cast will fail with a ClassCastException at runtime.
Suggestion: Add a validation check after writer creation:
if (multiColumn && !(indexWriter instanceof GlobalIndexMultiColumnWriter)) {
throw new UnsupportedOperationException(
"Index type '" + indexType + "' does not support multi-column indexing. " +
"The factory must override create(List<DataField>, Options) and return a GlobalIndexMultiColumnWriter.");
}4. [Design] Interface default method creates silent data-loss path
File: paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java
The new default method on the interface:
default List<CommitMessage> buildIndex(..., List<DataField> indexFields, ...) {
return buildIndex(..., indexFields.get(0), options);
}This silently discards all fields beyond the first for any implementation that has not been updated to override the multi-field method. Combined with DefaultGlobalIndexTopoBuilder overriding the single-field method to delegate to multi-field, this creates an unusual delegation pattern that is correct for DefaultGlobalIndexTopoBuilder but could be a trap for other implementations.
Consider adding a log warning in the default method when indexFields.size() > 1, or throwing UnsupportedOperationException instead of silently dropping fields.
5. [Minor] resolveFields assumes metadata consistency across range groups
File: paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
private List<DataField> resolveFields(Map<Range, List<IndexFileMeta>> metas, RowType rowType) {
GlobalIndexMeta firstMeta =
checkNotNull(metas.values().iterator().next().get(0).globalIndexMeta());
...
}This takes the first index file's metadata from an arbitrary range to determine the field list. If different ranges have inconsistent metadata (e.g., during a transition where an old single-column index coexists with a new multi-column index for the same type), this could resolve incorrect fields. A defensive check that all ranges share the same field list would prevent hard-to-diagnose query errors.
Summary
The most critical issue is #1 (the _ROW_ID field leaking into the multi-column writer). Issues #2 and #3 are important for production robustness. Issues #4 and #5 are lower priority but worth addressing for long-term maintainability.
Extend the GlobalIndex SPI, build path, and query path to support one index builder handling multiple columns (e.g. Lucene indexing title + content + tags together). Key changes:
Purpose
Some index engines (e.g. Lucene) can build a single index over multiple columns — full-text on title and vector on embedding in the same index file. Previously the GlobalIndex SPI only supported one column
per indexer: GlobalIndexerFactory.create(DataField, Options) and GlobalIndexSingletonWriter.write(Object). This meant multi-column engines had to create separate index files per column, losing co-located
search benefits and doubling I/O.
This PR adds a multi-column path through the entire stack:
accepts InternalRow with all indexed columns projected in field order.
field list from metadata and passes it to the factory. Extra field IDs are registered in the indexMetas map so queries against any column in the group find the same reader.
includes embedding as an extra field.
findMinNonIndexableRowId checks containsAll(indexColumns) for schema evolution safety.
All existing single-column callers are unchanged — new APIs have default implementations that delegate to the original single-column methods.
Tests
queries on same index file; also tests SPI discovery path via GlobalIndexer.create("lucene", ...) (2 tests)
VectorSearchBuilder/FullTextSearchBuilder → ReadBuilder.newScan().withGlobalIndexResult(), reads back rows and asserts correctness (3 tests)