Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.globalindex;

import org.apache.paimon.data.InternalRow;

import javax.annotation.Nullable;

/** Index writer for global index that accepts multiple column values per row. */
public interface GlobalIndexMultiColumnWriter extends GlobalIndexWriter {

/**
* Write a projected row containing all indexed columns for one record. The row layout matches
* the fields order passed to {@link GlobalIndexerFactory#create(java.util.List,
* org.apache.paimon.options.Options)}.
*/
void write(@Nullable InternalRow row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ static GlobalIndexer create(String type, DataField dataField, Options options) {
GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type);
return globalIndexerFactory.create(dataField, options);
}

static GlobalIndexer create(String type, List<DataField> fields, Options options) {
GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type);
return globalIndexerFactory.create(fields, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,22 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;

import java.util.List;

/** File index factory to construct {@link FileIndexer}. */
public interface GlobalIndexerFactory {

String identifier();

GlobalIndexer create(DataField dataField, Options options);

default GlobalIndexer create(List<DataField> fields, Options options) {
if (fields.size() > 1) {
throw new UnsupportedOperationException(
String.format(
"Index type '%s' does not support multi-column index, got columns: %s",
identifier(), fields));
}
return create(fields.get(0), options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,31 @@
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexPathFactory;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.Range;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High risk — MERGE path crash: MULTI_COLUMN_INDEX_FIELD_ID = -1 breaks existing code that calls rowType.getField(globalIndexMeta.indexFieldId()) without guarding against -1:

  1. MergeIntoUpdateChecker.java:104 (Flink): scans index manifest entries and does rowType.getField(globalIndexMeta.indexFieldId()) — will throw when encountering a multi-column index.
  2. MergeIntoPaimonDataEvolutionTable.scala:514 (Spark): same pattern — rowType.getField(globalIndexMeta.indexFieldId()).name().

Once a table has a multi-column global index, any MERGE INTO that touches indexed columns will crash with "Cannot find field by field id: -1".

Fix: these callers need to handle MULTI_COLUMN_INDEX_FIELD_ID by reading extraFieldIds() to get the actual column list.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix:

Added getIndexedFieldNames helper in both Flink and Spark paths:

  • When indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID (-1): resolve column names from extraFieldIds()
  • Otherwise: use the original single-column logic (rowType.getField(indexFieldId) + optional extraFieldIds)

Both the index filter (which entries are affected) and the error reporting (conflicted column names) now correctly handle multi-column indexes.

Affected files:

  • paimon-flink/.../dataevolution/MergeIntoUpdateChecker.java
  • paimon-spark/paimon-spark-common/.../MergeIntoPaimonDataEvolutionTable.scala
  • paimon-spark/paimon-spark-4.0/.../MergeIntoPaimonDataEvolutionTable.scala


/** Utils for global index build. */
public class GlobalIndexBuilderUtils {

private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexBuilderUtils.class);

public static final int MULTI_COLUMN_INDEX_FIELD_ID = -1;

public static List<IndexFileMeta> toIndexFileMetas(
FileIO fileIO,
IndexPathFactory indexPathFactory,
Expand All @@ -45,12 +58,56 @@ public static List<IndexFileMeta> toIndexFileMetas(
String indexType,
List<ResultEntry> entries)
throws IOException {
return toIndexFileMetas(
fileIO, indexPathFactory, options, range, indexFieldId, null, indexType, entries);
}

public static List<IndexFileMeta> toIndexFileMetas(
FileIO fileIO,
IndexPathFactory indexPathFactory,
CoreOptions options,
Range range,
List<DataField> fields,
String indexType,
List<ResultEntry> entries)
throws IOException {
int indexFieldId;
int[] extraFieldIds;
if (fields.size() > 1) {
indexFieldId = MULTI_COLUMN_INDEX_FIELD_ID;
extraFieldIds = fields.stream().mapToInt(DataField::id).toArray();
} else {
indexFieldId = fields.get(0).id();
extraFieldIds = null;
}
return toIndexFileMetas(
fileIO,
indexPathFactory,
options,
range,
indexFieldId,
extraFieldIds,
indexType,
entries);
}

private static List<IndexFileMeta> toIndexFileMetas(
FileIO fileIO,
IndexPathFactory indexPathFactory,
CoreOptions options,
Range range,
int indexFieldId,
@Nullable int[] extraFieldIds,
String indexType,
List<ResultEntry> entries)
throws IOException {
List<IndexFileMeta> results = new ArrayList<>();
for (ResultEntry entry : entries) {
String fileName = entry.fileName();
long fileSize = fileIO.getFileSize(indexPathFactory.toPath(fileName));
GlobalIndexMeta globalIndexMeta =
new GlobalIndexMeta(range.from, range.to, indexFieldId, null, entry.meta());
new GlobalIndexMeta(
range.from, range.to, indexFieldId, extraFieldIds, entry.meta());

Path externalPathDir = options.globalIndexExternalPath();
String externalPathString = null;
Expand Down Expand Up @@ -78,6 +135,77 @@ public static GlobalIndexWriter createIndexWriter(
return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table));
}

public static GlobalIndexWriter createIndexWriter(
FileStoreTable table, String indexType, List<DataField> fields, Options options)
throws IOException {
GlobalIndexer globalIndexer = GlobalIndexer.create(indexType, fields, options);
return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table));
}

/**
* Find the minimum firstRowId among files whose schema does not contain all index columns.
* Files at or beyond this rowId cannot be indexed because the column was added later via ALTER
* TABLE.
*
* @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the columns
*/
public static long findMinNonIndexableRowId(
SchemaManager schemaManager, List<ManifestEntry> entries, List<String> indexColumns) {
Map<Long, Boolean> schemaContainsColumns = new HashMap<>();
long minRowId = Long.MAX_VALUE;
long minSchemaId = -1;
for (ManifestEntry entry : entries) {
long sid = entry.file().schemaId();
boolean contains =
schemaContainsColumns.computeIfAbsent(
sid,
id -> schemaManager.schema(id).fieldNames().containsAll(indexColumns));
if (!contains && entry.file().firstRowId() != null) {
long rowId = entry.file().nonNullFirstRowId();
if (rowId < minRowId) {
minRowId = rowId;
minSchemaId = sid;
}
}
}
if (minRowId != Long.MAX_VALUE) {
List<String> schemaFields = schemaManager.schema(minSchemaId).fieldNames();
List<String> missingColumns = new ArrayList<>();
for (String col : indexColumns) {
if (!schemaFields.contains(col)) {
missingColumns.add(col);
}
}
LOG.info(
"Found non-indexable files: schemaId={} missing columns {}, boundaryRowId={}.",
minSchemaId,
missingColumns,
minRowId);
}
return minRowId;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: the old filterEntriesBefore in GenericIndexTopoBuilder had a LOG.info("Filtered {} files ...") line for observability. This was lost during extraction since GlobalIndexBuilderUtils has no logger. Consider adding one — this log is useful for debugging index build issues in production.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add log done

/** Keep only entries whose firstRowId is strictly less than the given boundary. */
public static List<ManifestEntry> filterEntriesBefore(
List<ManifestEntry> entries, long boundaryRowId) {
if (boundaryRowId == Long.MAX_VALUE) {
return entries;
}
List<ManifestEntry> result = new ArrayList<>();
for (ManifestEntry entry : entries) {
if (entry.file().firstRowId() != null
&& entry.file().nonNullFirstRowId() < boundaryRowId) {
result.add(entry);
}
}
LOG.info(
"Filtered {} files to {} indexable files (boundaryRowId={}).",
entries.size(),
result.size(),
boundaryRowId);
return result;
}

private static GlobalIndexFileReadWrite createGlobalIndexFileReadWrite(FileStoreTable table) {
IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory();
return new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_THREAD_NUM;
import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.MULTI_COLUMN_INDEX_FIELD_ID;
import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
import static org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;

Expand Down Expand Up @@ -81,21 +83,28 @@ public GlobalIndexScanner(
GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
int fieldId = meta.indexFieldId();
String indexType = indexFile.indexType();
indexMetas
.computeIfAbsent(fieldId, k -> new HashMap<>())
.computeIfAbsent(indexType, k -> new HashMap<>())
.computeIfAbsent(
new Range(meta.rowRangeStart(), meta.rowRangeEnd()),
k -> new ArrayList<>())
.add(indexFile);
Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd());
if (fieldId != MULTI_COLUMN_INDEX_FIELD_ID) {
indexMetas
.computeIfAbsent(fieldId, k -> new HashMap<>())
.computeIfAbsent(indexType, k -> new HashMap<>())
.computeIfAbsent(range, k -> new ArrayList<>())
.add(indexFile);
}

if (meta.extraFieldIds() != null) {
for (int extraId : meta.extraFieldIds()) {
indexMetas
.computeIfAbsent(extraId, k -> new HashMap<>())
.computeIfAbsent(indexType, k -> new HashMap<>())
.computeIfAbsent(range, k -> new ArrayList<>())
.add(indexFile);
}
}
}

IntFunction<Collection<GlobalIndexReader>> readersFunction =
fieldId ->
createReaders(
indexFileReader,
indexMetas.get(fieldId),
rowType.getField(fieldId));
fieldId -> createReaders(indexFileReader, indexMetas.get(fieldId), rowType);
this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, readersFunction, executor);
}

Expand Down Expand Up @@ -129,7 +138,17 @@ public static Optional<GlobalIndexScanner> create(
if (globalIndex == null) {
return false;
}
return filterFieldIds.contains(globalIndex.indexFieldId());
if (filterFieldIds.contains(globalIndex.indexFieldId())) {
return true;
}
if (globalIndex.extraFieldIds() != null) {
for (int id : globalIndex.extraFieldIds()) {
if (filterFieldIds.contains(id)) {
return true;
}
}
}
return false;
};

List<IndexFileMeta> indexFiles =
Expand All @@ -147,7 +166,7 @@ public Optional<GlobalIndexResult> scan(Predicate predicate) {
private Collection<GlobalIndexReader> createReaders(
GlobalIndexFileReader indexFileReadWrite,
Map<String, Map<Range, List<IndexFileMeta>>> indexMetas,
DataField dataField) {
RowType rowType) {
if (indexMetas == null) {
return Collections.emptyList();
}
Expand All @@ -159,7 +178,8 @@ private Collection<GlobalIndexReader> createReaders(
Map<Range, List<IndexFileMeta>> metas = entry.getValue();
GlobalIndexerFactory globalIndexerFactory =
GlobalIndexerFactoryUtils.load(indexType);
GlobalIndexer globalIndexer = globalIndexerFactory.create(dataField, options);
List<DataField> fields = resolveFields(metas, rowType);
GlobalIndexer globalIndexer = globalIndexerFactory.create(fields, options);

List<GlobalIndexReader> unionReader = new ArrayList<>();
for (Map.Entry<Range, List<IndexFileMeta>> rangeMetas : metas.entrySet()) {
Expand Down Expand Up @@ -187,6 +207,46 @@ private Collection<GlobalIndexReader> createReaders(
return readers;
}

private List<DataField> resolveFields(Map<Range, List<IndexFileMeta>> metas, RowType rowType) {
GlobalIndexMeta firstMeta =
checkNotNull(metas.values().iterator().next().get(0).globalIndexMeta());
int indexFieldId = firstMeta.indexFieldId();

if (indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID) {
int[] expectedExtraIds =
checkNotNull(
firstMeta.extraFieldIds(),
"Multi-column index must have extraFieldIds.");
for (List<IndexFileMeta> rangeFiles : metas.values()) {
for (IndexFileMeta fileMeta : rangeFiles) {
GlobalIndexMeta meta = checkNotNull(fileMeta.globalIndexMeta());
checkArgument(
meta.indexFieldId() == MULTI_COLUMN_INDEX_FIELD_ID,
"Inconsistent indexFieldId across range groups: expected %s but found %s.",
MULTI_COLUMN_INDEX_FIELD_ID,
meta.indexFieldId());
checkArgument(
java.util.Arrays.equals(meta.extraFieldIds(), expectedExtraIds),
"Inconsistent extraFieldIds across range groups.");
}
}
List<DataField> fields = new ArrayList<>();
for (int id : expectedExtraIds) {
fields.add(rowType.getField(id));
}
return fields;
}

List<DataField> fields = new ArrayList<>();
fields.add(rowType.getField(indexFieldId));
if (firstMeta.extraFieldIds() != null) {
for (int id : firstMeta.extraFieldIds()) {
fields.add(rowType.getField(id));
}
}
return fields;
}

private GlobalIndexIOMeta toGlobalMeta(IndexFileMeta meta) {
GlobalIndexMeta globalIndex = meta.globalIndexMeta();
checkNotNull(globalIndex);
Expand Down
Loading