diff --git a/.gitignore b/.gitignore index f533fd540d20..e62797c76052 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,7 @@ paimon-python/dist/ paimon-python/*.egg-info/ paimon-python/dev/log paimon-spark/paimon-spark-ut/PaimonLambdaFunctionfunction_test.java +paimon-python/build/ ### Misc ### *.swp diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java index a068d04f728d..ffd83eddd227 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java @@ -56,6 +56,13 @@ public boolean intersects(long start, long end) { return candidate < starts.length && starts[candidate] <= end; } + public boolean contains(Range range) { + int candidate = lowerBound(ends, range.from); + return candidate < starts.length + && starts[candidate] <= range.from + && ends[candidate] >= range.to; + } + public List intersectedRanges(long start, long end) { int left = lowerBound(ends, start); if (left >= ranges.size()) { diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/RowRangeIndexTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/RowRangeIndexTest.java new file mode 100644 index 000000000000..7c4a9b155013 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/utils/RowRangeIndexTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RowRangeIndex}. */ +class RowRangeIndexTest { + + @Test + void testContains() { + RowRangeIndex index = + RowRangeIndex.create( + Arrays.asList(new Range(0, 99), new Range(100, 149), new Range(200, 299))); + + assertThat(index.contains(new Range(0, 149))).isTrue(); + assertThat(index.contains(new Range(50, 120))).isTrue(); + assertThat(index.contains(new Range(150, 199))).isFalse(); + assertThat(index.contains(new Range(100, 200))).isFalse(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 3f9fdb9f1c0c..f7d9ba11545c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -325,6 +325,13 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { checkAppendFiles = true; allowRollback = true; } + if (changes.appendIndexFiles.stream() + .anyMatch( + entry -> + entry.kind() == FileKind.ADD + && entry.indexFile().globalIndexMeta() != null)) { + checkAppendFiles = true; + } attempts += tryCommit( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 493d13d88dee..1f73b705f6f2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; @@ -37,7 +38,9 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RangeHelper; +import org.apache.paimon.utils.RowRangeIndex; import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; @@ -237,6 +240,11 @@ public Optional checkConflicts( return exception; } + exception = checkGlobalIndexRowIdExistence(baseEntries, deltaIndexEntries); + if (exception.isPresent()) { + return exception; + } + return checkForRowIdFromSnapshot( latestSnapshot, deltaEntries, deltaIndexEntries, rowIdColumnConflictChecker); } @@ -544,6 +552,55 @@ private Optional checkForRowIdFromSnapshot( return Optional.empty(); } + private Optional checkGlobalIndexRowIdExistence( + List baseEntries, List deltaIndexEntries) { + if (!dataEvolutionEnabled) { + return Optional.empty(); + } + + List indexesToCheck = globalIndexFileAdditions(deltaIndexEntries); + if (indexesToCheck.isEmpty()) { + return Optional.empty(); + } + + List dataRanges = new ArrayList<>(); + for (SimpleFileEntry entry : baseEntries) { + if (entry.kind() == FileKind.ADD && entry.firstRowId() != null) { + dataRanges.add(entry.nonNullRowIdRange()); + } + } + RowRangeIndex rowRangeIndex = RowRangeIndex.create(dataRanges); + + for (IndexManifestEntry indexEntry : indexesToCheck) { + GlobalIndexMeta globalIndex = indexEntry.indexFile().globalIndexMeta(); + checkState(globalIndex != null, "Global index meta must not be null."); + Range indexRange = globalIndex.rowRange(); + if (!rowRangeIndex.contains(indexRange)) { + return Optional.of( + new RuntimeException( + String.format( + "Global index row ID existence conflict: index file '%s' " + + "references row range %s, but this range " + + "is not fully covered by current data " + + "files. The referenced row IDs may have been " + + "reassigned or removed by a concurrent commit.", + indexEntry.indexFile().fileName(), indexRange))); + } + } + return Optional.empty(); + } + + private List globalIndexFileAdditions( + List indexFileChanges) { + List result = new ArrayList<>(); + for (IndexManifestEntry entry : indexFileChanges) { + if (entry.kind() == FileKind.ADD && entry.indexFile().globalIndexMeta() != null) { + result.add(entry); + } + } + return result; + } + Optional checkRowIdExistence( List baseEntries, List deltaEntries, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java index f32f6c9ef5ff..faa65bba4750 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java @@ -170,7 +170,8 @@ public static List changedPartitions( changedPartitions.add(file.partition()); } for (IndexManifestEntry file : indexFileChanges) { - if (file.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) { + if (file.indexFile().indexType().equals(DELETION_VECTORS_INDEX) + || file.indexFile().globalIndexMeta() != null) { changedPartitions.add(file.partition()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 71eb081de89f..5625fe81a3cc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -31,8 +31,11 @@ import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestCommittable; @@ -1008,6 +1011,48 @@ public void testCommitManifestWithProperties() throws Exception { } } + @Test + public void testGlobalIndexCommitChecksExistingRowIds() throws Exception { + TestFileStore store = createRowTrackingDataEvolutionStore(); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + Snapshot dataSnapshot = store.commitData(keyValues, s -> partition, kv -> 0).get(0); + assertThat(dataSnapshot.nextRowId()).isEqualTo(1L); + + try (FileStoreCommitImpl commit = store.newCommit()) { + commit.commit(indexCommittable(partition, "existing-index", 0, 0), false); + } + + Snapshot latest = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(latest.indexManifest()).isNotNull(); + } + + @Test + public void testGlobalIndexCommitFailsForMissingRowIds() throws Exception { + TestFileStore store = createRowTrackingDataEvolutionStore(); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + Snapshot dataSnapshot = store.commitData(keyValues, s -> partition, kv -> 0).get(0); + long missingRowId = checkNotNull(dataSnapshot.nextRowId()); + + try (FileStoreCommitImpl commit = store.newCommit()) { + assertThatThrownBy( + () -> + commit.commit( + indexCommittable( + partition, + "missing-index", + missingRowId, + missingRowId), + false)) + .hasMessageContaining("Global index row ID existence conflict") + .hasMessageContaining("missing-index") + .hasMessageContaining("[" + missingRowId + ", " + missingRowId + "]"); + } + } + @Test public void testCommitTwiceWithDifferentKind() throws Exception { TestFileStore store = createStore(false); @@ -1082,6 +1127,20 @@ public void testCommitRetryAfterFalseSuccessDoesNotCleanManifest() throws Except private FileStoreCommitImpl newCommitWithSnapshotCommit( TestFileStore store, String commitUser, SnapshotCommit snapshotCommit) { + return newCommitWithSnapshotCommit( + store, + commitUser, + snapshotCommit, + store.options(), + store.options().dataEvolutionEnabled()); + } + + private FileStoreCommitImpl newCommitWithSnapshotCommit( + TestFileStore store, + String commitUser, + SnapshotCommit snapshotCommit, + CoreOptions options, + boolean dataEvolutionEnabled) { String tableName = store.options().path().getName(); return new FileStoreCommitImpl( snapshotCommit, @@ -1090,7 +1149,7 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( tableName, commitUser, store.partitionType(), - store.options(), + options, store.pathFactory(), store.snapshotManager(), store.manifestFileFactory(), @@ -1109,15 +1168,37 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( store.pathFactory(), store.newKeyComparator(), store.bucketMode(), - store.options().deletionVectorsEnabled(), - store.options().dataEvolutionEnabled(), - store.options().pkClusteringOverride(), + options.deletionVectorsEnabled(), + dataEvolutionEnabled, + options.pkClusteringOverride(), store.newIndexFileHandler(), store.snapshotManager(), scanner), null); } + private ManifestCommittable indexCommittable( + BinaryRow partition, String fileName, long rowRangeStart, long rowRangeEnd) { + ManifestCommittable committable = new ManifestCommittable(0); + committable.addFileCommittable( + new CommitMessageImpl( + partition, + 0, + null, + DataIncrement.indexIncrement( + Collections.singletonList( + new IndexFileMeta( + "btree", + fileName, + 1, + 1, + new GlobalIndexMeta( + rowRangeStart, rowRangeEnd, 0, null, null), + null))), + CompactIncrement.emptyIncrement())); + return committable; + } + private static class FalseSuccessSnapshotCommit implements SnapshotCommit { private final SnapshotCommit delegate; @@ -1153,6 +1234,13 @@ private TestFileStore createStore(boolean failing, Map options) return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE, options); } + private TestFileStore createRowTrackingDataEvolutionStore() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + return createStore(false, -1, CoreOptions.ChangelogProducer.NONE, options); + } + private TestFileStore createStore(boolean failing) throws Exception { return createStore(failing, 1); } @@ -1179,14 +1267,18 @@ private TestFileStore createStore( ? FailingFileIO.getFailingPath(failingName, tempDir.toString()) : TraceableFileIO.SCHEME + "://" + tempDir.toString(); Path path = new Path(tempDir.toUri()); + List primaryKeys = + Boolean.parseBoolean(options.get(CoreOptions.ROW_TRACKING_ENABLED.key())) + ? Collections.emptyList() + : TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED); TableSchema tableSchema = SchemaUtils.forceCommit( new SchemaManager(new LocalFileIO(), path), new Schema( TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), - TestKeyValueGenerator.getPrimaryKeys( - TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + primaryKeys, options, null)); return new TestFileStore.Builder( diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index 1c36b9e09ee7..d7282fe66173 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -18,7 +18,10 @@ package org.apache.paimon.operation.commit; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; @@ -330,6 +333,21 @@ private IndexManifestEntry createDvIndexEntry( DELETION_VECTORS_INDEX, fileName, 11, dvRanges.size(), dvRanges, null)); } + private IndexManifestEntry createGlobalIndexEntry( + String fileName, FileKind kind, BinaryRow partition, long from, long to) { + return new IndexManifestEntry( + kind, + partition, + 0, + new IndexFileMeta( + "btree", + fileName, + 11, + 1, + new GlobalIndexMeta(from, to, 0, null, null), + null)); + } + private void assertConflict( List baseEntries, List deltaEntries) { ArrayList simpleFileEntryWithDVS = new ArrayList<>(baseEntries); @@ -375,6 +393,18 @@ void testShouldBeOverwriteCommit() { .isFalse(); } + @Test + void testChangedPartitionsIncludesGlobalIndexFiles() { + BinaryRow partition = BinaryRow.singleColumn(1); + + assertThat( + ManifestEntryChanges.changedPartitions( + Collections.emptyList(), + Collections.singletonList( + createGlobalIndexEntry("idx", ADD, partition, 0, 99)))) + .containsExactly(partition); + } + @Test void testCheckRowIdExistenceNoConflict() { ConflictDetection detection = createConflictDetection(); @@ -489,6 +519,63 @@ private SimpleFileEntry createFileEntryWithRowId( firstRowId); } + @Test + void testCheckGlobalIndexRowIdExistenceNoConflict() { + ConflictDetection detection = createConflictDetection(); + + Optional exception = + detection.checkConflicts( + snapshot(1), + Arrays.asList( + createFileEntryWithRowId("f1", ADD, 0L, 100L), + createFileEntryWithRowId("f2", ADD, 100L, 50L)), + Collections.emptyList(), + Collections.singletonList( + createGlobalIndexEntry("idx", ADD, BinaryRow.EMPTY_ROW, 0, 149)), + null, + Snapshot.CommitKind.APPEND); + + assertThat(exception).isNotPresent(); + } + + @Test + void testCheckGlobalIndexRowIdExistenceBaseFileRemoved() { + ConflictDetection detection = createConflictDetection(); + + Optional exception = + detection.checkConflicts( + snapshot(1), + Collections.singletonList(createFileEntryWithRowId("f1", ADD, 0L, 100L)), + Collections.emptyList(), + Collections.singletonList( + createGlobalIndexEntry("idx", ADD, BinaryRow.EMPTY_ROW, 0, 149)), + null, + Snapshot.CommitKind.APPEND); + + assertThat(exception).isPresent(); + assertThat(exception.get()) + .hasMessageContaining("Global index row ID existence conflict") + .hasMessageContaining("idx") + .hasMessageContaining("[0, 149]"); + } + + @Test + void testCheckGlobalIndexRowIdExistenceSkipsDeleteIndexEntry() { + ConflictDetection detection = createConflictDetection(); + + Optional exception = + detection.checkConflicts( + snapshot(1), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList( + createGlobalIndexEntry("idx", DELETE, BinaryRow.EMPTY_ROW, 0, 149)), + null, + Snapshot.CommitKind.APPEND); + + assertThat(exception).isNotPresent(); + } + private ConflictDetection createConflictDetection() { return new ConflictDetection( "test-table", @@ -504,4 +591,28 @@ private ConflictDetection createConflictDetection() { null, null); } + + private Snapshot snapshot(long id) { + return new Snapshot( + id, + 0, + null, + null, + null, + null, + null, + null, + null, + "commit-user", + id, + Snapshot.CommitKind.APPEND, + id, + 0, + 0, + null, + null, + null, + null, + null); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java index 48ea935f5b62..415c1590a59c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericGlobalIndexBuilder.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.globalindex; +import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.partition.PartitionPredicate; @@ -72,7 +73,17 @@ public List scan() { + "deleted rows to be indexed.", table.name()); - return table.store().newScan().withPartitionFilter(partitionPredicate).plan().files(); + Snapshot snapshot = table.snapshotManager().latestSnapshot(); + if (snapshot == null) { + return Collections.emptyList(); + } + + return table.store() + .newScan() + .withSnapshot(snapshot) + .withPartitionFilter(partitionPredicate) + .plan() + .files(); } /** Returns old index file entries that should be deleted after new indexes are built. */