diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index df21129db6..fe93dd328d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -515,6 +515,41 @@ public class ConfigOptions { + WRITER_ID_EXPIRATION_TIME.key() + " passing. The default value is 10 minutes."); + public static final ConfigOption KV_SCANNER_TTL = + key("kv.scanner.ttl") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "The time-to-live for an idle KV scanner session on the server. A scanner that has not " + + "received a request within this duration will be automatically expired and its " + + "resources released. The default value is 10 minutes."); + + public static final ConfigOption KV_SCANNER_EXPIRATION_INTERVAL = + key("kv.scanner.expiration-interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "How often the TTL evictor runs to close idle scanner sessions. " + + "The default value is 30 seconds."); + + public static final ConfigOption KV_SCANNER_MAX_PER_BUCKET = + key("kv.scanner.max-per-bucket") + .intType() + .defaultValue(8) + .withDescription( + "Maximum number of concurrent scanner sessions per bucket. " + + "Exceeding this limit returns TOO_MANY_SCANNERS. " + + "The default value is 8."); + + public static final ConfigOption KV_SCANNER_MAX_PER_SERVER = + key("kv.scanner.max-per-server") + .intType() + .defaultValue(200) + .withDescription( + "Maximum number of concurrent scanner sessions per tablet server. " + + "Exceeding this limit returns TOO_MANY_SCANNERS. " + + "The default value is 200."); + public static final ConfigOption TABLET_SERVER_CONTROLLED_SHUTDOWN_MAX_RETRIES = key("tablet-server.controlled-shutdown.max-retries") .intType() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index e72428a02f..23976ccd84 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -58,6 +58,7 @@ import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger; import org.apache.fluss.server.kv.rowmerger.RowMerger; +import org.apache.fluss.server.kv.scan.ScannerContext; import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath; import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader; import org.apache.fluss.server.kv.snapshot.RocksIncrementalSnapshot; @@ -70,12 +71,17 @@ import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.utils.FatalErrorHandler; +import org.apache.fluss.server.utils.ResourceGuard; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.BytesUtils; import org.apache.fluss.utils.FileUtils; +import org.apache.fluss.utils.IOUtils; import org.rocksdb.RateLimiter; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -756,6 +762,83 @@ public List limitScan(int limit) throws IOException { }); } + /** + * Opens a new full-scan session, taking a point-in-time RocksDB snapshot under the {@code + * kvLock} read lock. + * + *

Returns {@code null} if the bucket contains no rows at the time of the call. In that case + * all acquired resources are released internally and no session is registered. + * + *

The returned {@link ScannerContext} is unregistered — the caller ({@link + * org.apache.fluss.server.kv.scan.ScannerManager}) is responsible for registering it and for + * closing it when the scan is complete. + * + * @param scannerId the server-assigned scanner ID + * @param limit maximum number of rows to return across all batches ({@code ≤ 0} = unlimited) + * @param initialAccessTimeMs wall-clock time (ms) to use as the initial last-access timestamp + * @return a newly created, cursor-positioned {@link ScannerContext}, or {@code null} if the + * bucket is empty + * @throws IOException if the ResourceGuard is already closed (RocksDB is shutting down) + */ + @Nullable + public ScannerContext openScan(String scannerId, long limit, long initialAccessTimeMs) + throws IOException { + return inReadLock( + kvLock, + () -> { + rocksDBKv.checkIfRocksDBClosed(); + ResourceGuard.Lease lease = rocksDBKv.getResourceGuard().acquireResource(); + Snapshot snapshot = null; + ReadOptions readOptions = null; + RocksIterator iterator = null; + boolean success = false; + try { + snapshot = rocksDBKv.getDb().getSnapshot(); + readOptions = new ReadOptions().setSnapshot(snapshot); + iterator = + rocksDBKv + .getDb() + .newIterator( + rocksDBKv.getDefaultColumnFamilyHandle(), + readOptions); + iterator.seekToFirst(); + if (!iterator.isValid()) { + // Empty bucket: no session will be registered; cleanup in finally. + return null; + } + ScannerContext context = + new ScannerContext( + scannerId, + tableBucket, + rocksDBKv, + iterator, + readOptions, + snapshot, + lease, + limit, + initialAccessTimeMs); + success = true; + return context; + } finally { + if (!success) { + // Release in reverse allocation order. Each close is independent, + // so a failure in one must not prevent the others from running. + IOUtils.closeQuietly(iterator); + IOUtils.closeQuietly(readOptions); + if (snapshot != null) { + try { + rocksDBKv.getDb().releaseSnapshot(snapshot); + } catch (Throwable t) { + LOG.warn("Error releasing RocksDB snapshot.", t); + } + IOUtils.closeQuietly(snapshot); + } + IOUtils.closeQuietly(lease); + } + } + }); + } + public KvBatchWriter createKvBatchWriter() { return rocksDBKv.newWriteBatch( writeBatchSize, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java new file mode 100644 index 0000000000..ba0c91fe66 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.kv.rocksdb.RocksDBKv; +import org.apache.fluss.server.utils.ResourceGuard; + +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Closeable; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Server-side state for a single KV full-scan session. + * + *

A {@code ScannerContext} holds a point-in-time RocksDB {@link Snapshot}, the {@link + * ReadOptions} pinning it, and a cursor ({@link RocksIterator}) that persists across multiple + * batched-fetch RPCs from the same client. It also holds a {@link ResourceGuard.Lease} that + * prevents the underlying RocksDB instance from being closed while the scan is in progress. + * + *

Instances are created by {@link org.apache.fluss.server.kv.KvTablet#openScan} and registered + * by {@link ScannerManager}. They must be closed when the scan completes, the client requests an + * explicit close, or the session expires due to inactivity. + * + *

Thread safety: The iterator cursor ({@link #advance()}, {@link #isValid()}, {@link + * #currentValue()}) must be driven by only one thread at a time. {@link #close()} is thread-safe. + */ +@NotThreadSafe +public class ScannerContext implements Closeable { + private final String scannerId; + private final byte[] scannerIdBytes; + private final TableBucket tableBucket; + private final RocksDBKv rocksDBKv; + private final RocksIterator iterator; + private final ReadOptions readOptions; + private final Snapshot snapshot; + private final ResourceGuard.Lease resourceLease; + private long remainingLimit; + // Initial value -1 so that the first client call_seq_id of 0 satisfies the server's + // in-order check: expectedSeqId = callSeqId + 1 = -1 + 1 = 0. + // callSeqId validation is only performed for continuation requests (those carrying a + // scanner_id), never for the initial open request (those carrying a bucket_scan_req). + private int callSeqId = -1; + + /** + * Wall-clock timestamp (ms) of the most recent request that touched this session. Used by + * {@link ScannerManager} for TTL-based eviction. + */ + private long lastAccessTime; + + private final AtomicBoolean closed = new AtomicBoolean(false); + + public ScannerContext( + String scannerId, + TableBucket tableBucket, + RocksDBKv rocksDBKv, + RocksIterator iterator, + ReadOptions readOptions, + Snapshot snapshot, + ResourceGuard.Lease resourceLease, + long limit, + long initialAccessTimeMs) { + this.scannerId = scannerId; + this.scannerIdBytes = scannerId.getBytes(StandardCharsets.UTF_8); + this.tableBucket = tableBucket; + this.rocksDBKv = rocksDBKv; + this.iterator = iterator; + this.readOptions = readOptions; + this.snapshot = snapshot; + this.resourceLease = resourceLease; + this.remainingLimit = limit <= 0 ? -1L : limit; + this.lastAccessTime = initialAccessTimeMs; + } + + public byte[] getScannerId() { + return scannerIdBytes; + } + + /** + * Returns the scanner ID as a UTF-8 {@link String}. Package-private: used by {@link + * ScannerManager} as the key in its internal {@code scanners} map. The wire-format + * representation is always {@link #getScannerId()} (raw bytes). + */ + String getIdString() { + return scannerId; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public boolean isValid() { + return iterator.isValid() && remainingLimit != 0; + } + + public byte[] currentValue() { + return iterator.value(); + } + + /** + * Advances the cursor by one entry and decrements the remaining-rows limit if applicable. Must + * only be called when {@link #isValid()} returns {@code true}. + */ + public void advance() { + iterator.next(); + if (remainingLimit > 0) { + remainingLimit--; + } + } + + /** Returns the call-sequence ID of the last successfully served request, or {@code -1}. */ + public int getCallSeqId() { + return callSeqId; + } + + /** + * Updates the call-sequence ID. Must be called after the response payload has been + * fully prepared, so that a client retry with the same {@code callSeqId} can be detected. + */ + public void setCallSeqId(int callSeqId) { + this.callSeqId = callSeqId; + } + + /** Resets the idle-TTL timer to the given wall-clock time. */ + public void updateLastAccessTime(long nowMs) { + this.lastAccessTime = nowMs; + } + + /** + * Returns {@code true} if the session has been idle for longer than {@code ttlMs}, based on the + * provided current time. + */ + public boolean isExpired(long ttlMs, long nowMs) { + return nowMs - lastAccessTime > ttlMs; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + try { + iterator.close(); + } finally { + try { + readOptions.close(); + } finally { + try { + rocksDBKv.getDb().releaseSnapshot(snapshot); + snapshot.close(); + } finally { + resourceLease.close(); + } + } + } + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java new file mode 100644 index 0000000000..e0401f8c9d --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.TooManyScannersException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.kv.KvTablet; +import org.apache.fluss.utils.AutoCloseableAsync; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.FutureUtils; +import org.apache.fluss.utils.concurrent.Scheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Manages server-side KV full-scan sessions ({@link ScannerContext}). + * + *

Each KV full scan opens a persistent server-side session that holds a point-in-time RocksDB + * snapshot and a cursor. Sessions are keyed by a server-assigned UUID-based scanner ID and persist + * across multiple batched-fetch RPCs from the same client. + * + *

Concurrency limits

+ * + *
    + *
  • Per-bucket: at most {@code maxPerBucket} concurrent sessions on any single bucket. + *
  • Per-server: at most {@code maxPerServer} concurrent sessions across all buckets. + *
+ * + *

Limit enforcement is two-phase: a fast pre-check guards the common case; the subsequent atomic + * increment with re-check and rollback prevents the TOCTOU race from permanently breaching the + * configured limits. Exceeding either limit causes {@link TooManyScannersException}. + * + *

Empty bucket handling

+ * + *

If the target bucket contains no rows at the time the scan is opened, {@link + * #createScanner(KvTablet, TableBucket, Long)} returns {@code null} without consuming a limit slot. + * The caller should return an empty response immediately. + * + *

TTL eviction

+ * + *

A background evictor task runs every {@code kv.scanner.expiration-interval} and removes + * sessions idle longer than {@code kv.scanner.ttl}. Recently evicted IDs are retained for {@code 2 + * × ttl} so callers can distinguish "expired" from "never existed." + * + *

Leadership change

+ * + * {@link #closeScannersForBucket(TableBucket)} must be called when a bucket loses leadership to + * release all RocksDB snapshot/iterator resources for that bucket promptly. + */ +public class ScannerManager implements AutoCloseableAsync { + + private static final Logger LOG = LoggerFactory.getLogger(ScannerManager.class); + + private final Map scanners = new ConcurrentHashMap<>(); + private final Map recentlyExpiredIds = new ConcurrentHashMap<>(); + + /** Per-bucket active scanner count, used for O(1) per-bucket limit enforcement. */ + private final Map perBucketCount = new ConcurrentHashMap<>(); + + /** Total active scanner count across all buckets on this tablet server. */ + private final AtomicInteger totalScanners = new AtomicInteger(0); + + private final Clock clock; + private final long scannerTtlMs; + private final long recentlyExpiredRetentionMs; + private final int maxPerBucket; + private final int maxPerServer; + + @Nullable private ScheduledFuture evictorTask; + + public ScannerManager(Configuration conf, Scheduler scheduler) { + this(conf, scheduler, SystemClock.getInstance()); + } + + @VisibleForTesting + ScannerManager(Configuration conf, Scheduler scheduler, Clock clock) { + this.clock = clock; + this.scannerTtlMs = conf.get(ConfigOptions.KV_SCANNER_TTL).toMillis(); + this.recentlyExpiredRetentionMs = 2 * scannerTtlMs; + this.maxPerBucket = conf.get(ConfigOptions.KV_SCANNER_MAX_PER_BUCKET); + this.maxPerServer = conf.get(ConfigOptions.KV_SCANNER_MAX_PER_SERVER); + + long expirationIntervalMs = + conf.get(ConfigOptions.KV_SCANNER_EXPIRATION_INTERVAL).toMillis(); + this.evictorTask = + scheduler.schedule( + "scanner-expiration", + this::evictExpiredScanners, + expirationIntervalMs, + expirationIntervalMs); + + LOG.info( + "Started ScannerManager: ttl={}ms, expirationInterval={}ms, " + + "maxPerBucket={}, maxPerServer={}", + scannerTtlMs, + expirationIntervalMs, + maxPerBucket, + maxPerServer); + } + + /** + * Creates a new scan session for the given bucket, taking a point-in-time RocksDB snapshot. + * + *

Returns {@code null} if the bucket is empty (no rows to scan). In that case no session + * slot is consumed and the caller should return an empty response immediately. + * + *

Limit enforcement is two-phase: a fast pre-check guards the common case; the + * subsequent atomic increment + re-check prevents the TOCTOU race from permanently breaching + * configured limits. If registration fails after the snapshot is already opened, the context is + * closed and the exception is re-thrown to avoid leaking resources. + * + * @param kvTablet the {@link KvTablet} for the bucket; used to open the snapshot + * @param tableBucket the bucket being scanned + * @param limit optional row-count limit ({@code null} or ≤ 0 means unlimited) + * @return the newly registered {@link ScannerContext}, or {@code null} if the bucket is empty + * @throws TooManyScannersException if the per-bucket or per-server limit is exceeded + * @throws IOException if the underlying {@link org.apache.fluss.server.utils.ResourceGuard} is + * already closed (the bucket is shutting down) + */ + @Nullable + public ScannerContext createScanner( + KvTablet kvTablet, TableBucket tableBucket, @Nullable Long limit) throws IOException { + checkLimits(tableBucket); + + String scannerId = generateScannerId(); + ScannerContext context = + kvTablet.openScan(scannerId, limit != null ? limit : -1L, clock.milliseconds()); + if (context == null) { + // Bucket is empty — no session slot consumed. + return null; + } + + try { + registerContext(context, tableBucket); + } catch (TooManyScannersException e) { + // Limit was exceeded between the initial check and registration (race window). + // Close the already-opened context to avoid leaking the snapshot and lease. + closeScannerContext(context); + throw e; + } + return context; + } + + /** + * Looks up an existing scanner session by its raw ID bytes and refreshes its last-access + * timestamp. + * + * @return the {@link ScannerContext}, or {@code null} if not found (may have expired or never + * existed) + */ + @Nullable + public ScannerContext getScanner(byte[] scannerId) { + ScannerContext context = scanners.get(new String(scannerId, StandardCharsets.UTF_8)); + if (context != null) { + context.updateLastAccessTime(clock.milliseconds()); + } + return context; + } + + /** + * Returns {@code true} if the given scanner ID belongs to a session that was recently evicted + * by the TTL evictor (within the last {@code 2 × ttlMs}). + * + *

Callers can use this to distinguish "scanner expired" from "unknown scanner ID." + */ + public boolean isRecentlyExpired(byte[] scannerId) { + return recentlyExpiredIds.containsKey(new String(scannerId, StandardCharsets.UTF_8)); + } + + /** + * Removes and closes a known scanner context directly, avoiding a map lookup. + * + *

Uses a conditional remove ({@link java.util.concurrent.ConcurrentHashMap#remove(Object, + * Object)}) so that concurrent calls — e.g. from the TTL evictor and a close-scanner RPC + * arriving simultaneously — result in exactly one winner closing the context, preventing + * double-release of the non-idempotent {@link + * org.apache.fluss.server.utils.ResourceGuard.Lease}. + */ + public void removeScanner(ScannerContext context) { + if (scanners.remove(context.getIdString(), context)) { + decrementCounts(context.getTableBucket()); + closeScannerContext(context); + } + } + + /** + * Looks up and removes a scanner session by its raw ID bytes. + * + *

Delegates to {@link #removeScanner(ScannerContext)} to ensure a conditional {@link + * java.util.concurrent.ConcurrentHashMap#remove(Object, Object)} is used, which prevents a + * double-decrement of {@code perBucketCount} when the TTL evictor races with an explicit close + * request for the same scanner. + * + *

No-op if the ID is not found (already removed or expired). + */ + public void removeScanner(byte[] scannerId) { + String key = new String(scannerId, StandardCharsets.UTF_8); + ScannerContext context = scanners.get(key); + if (context != null) { + removeScanner(context); + } + } + + /** Returns the total number of active scanner sessions on this tablet server. */ + @VisibleForTesting + public int activeScannerCount() { + return totalScanners.get(); + } + + /** Returns the number of active scanner sessions for the given bucket. */ + @VisibleForTesting + public int activeScannerCountForBucket(TableBucket tableBucket) { + AtomicInteger count = perBucketCount.get(tableBucket); + return count == null ? 0 : count.get(); + } + + /** + * Closes and removes all active scanner sessions for the given bucket. Must be called when a + * bucket loses leadership to prevent stale RocksDB snapshot/iterator leaks. + */ + public void closeScannersForBucket(TableBucket tableBucket) { + List toRemove = new ArrayList<>(); + for (Map.Entry entry : scanners.entrySet()) { + if (tableBucket.equals(entry.getValue().getTableBucket())) { + toRemove.add(entry.getKey()); + } + } + for (String key : toRemove) { + ScannerContext context = scanners.get(key); + if (context != null) { + LOG.info( + "Closing scanner {} for bucket {} due to leadership change.", + key, + tableBucket); + removeScanner(context); + } + } + } + + /** + * Fast pre-check of per-server and per-bucket limits before opening the snapshot. This is a + * best-effort check; a small race window exists between the check and {@link #registerContext}. + * The race is handled by the atomic re-check inside {@link #registerContext}. + */ + private void checkLimits(TableBucket tableBucket) { + if (totalScanners.get() >= maxPerServer) { + throw new TooManyScannersException( + String.format( + "Cannot create scanner for bucket %s: server-wide limit of %d reached.", + tableBucket, maxPerServer)); + } + AtomicInteger bucketCount = + perBucketCount.computeIfAbsent(tableBucket, k -> new AtomicInteger(0)); + if (bucketCount.get() >= maxPerBucket) { + throw new TooManyScannersException( + String.format( + "Cannot create scanner for bucket %s: per-bucket limit of %d reached.", + tableBucket, maxPerBucket)); + } + } + + /** + * Atomically increments the counters and puts the context in the map. Throws {@link + * TooManyScannersException} and rolls back the increments if a concurrent create caused either + * limit to be exceeded between the initial check and this call. + */ + private void registerContext(ScannerContext context, TableBucket tableBucket) { + AtomicInteger bucketCount = + perBucketCount.computeIfAbsent(tableBucket, k -> new AtomicInteger(0)); + + int newTotal = totalScanners.incrementAndGet(); + if (newTotal > maxPerServer) { + totalScanners.decrementAndGet(); + throw new TooManyScannersException( + String.format( + "Cannot create scanner for bucket %s: server-wide limit of %d reached.", + tableBucket, maxPerServer)); + } + + int newBucketCount = bucketCount.incrementAndGet(); + if (newBucketCount > maxPerBucket) { + bucketCount.decrementAndGet(); + totalScanners.decrementAndGet(); + throw new TooManyScannersException( + String.format( + "Cannot create scanner for bucket %s: per-bucket limit of %d reached.", + tableBucket, maxPerBucket)); + } + + scanners.put(context.getIdString(), context); + + LOG.debug( + "Registered scanner {} for bucket {} (total={}, perBucket={})", + context.getIdString(), + tableBucket, + newTotal, + newBucketCount); + } + + /** TTL evictor — invoked periodically by the background scheduler. */ + private void evictExpiredScanners() { + long now = clock.milliseconds(); + + // Prune stale entries from the recently-expired cache to bound memory usage. + recentlyExpiredIds + .entrySet() + .removeIf(e -> now - e.getValue() > recentlyExpiredRetentionMs); + + for (Map.Entry entry : scanners.entrySet()) { + ScannerContext context = entry.getValue(); + if (context.isExpired(scannerTtlMs, now)) { + // Conditional remove prevents double-close if removeScanner() fires concurrently. + if (scanners.remove(entry.getKey(), context)) { + LOG.info( + "Evicted idle scanner {} for bucket {} (idle > {}ms).", + entry.getKey(), + context.getTableBucket(), + scannerTtlMs); + recentlyExpiredIds.put(entry.getKey(), now); + decrementCounts(context.getTableBucket()); + closeScannerContext(context); + } + } + } + } + + private void decrementCounts(TableBucket bucket) { + totalScanners.decrementAndGet(); + perBucketCount.computeIfPresent( + bucket, + (k, count) -> { + int remaining = count.decrementAndGet(); + return remaining <= 0 ? null : count; + }); + } + + private void closeScannerContext(ScannerContext context) { + try { + context.close(); + } catch (Exception e) { + LOG.warn( + "Error closing scanner {} for bucket {}.", + context.getIdString(), + context.getTableBucket(), + e); + } + } + + private static String generateScannerId() { + return UUID.randomUUID().toString().replace("-", ""); + } + + @Override + public CompletableFuture closeAsync() { + try { + close(); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } + } + + @Override + public void close() { + // Note: we cancel but do not join the evictor. The evictor may still be mid-iteration + // when close() begins. This is safe because (a) scanners is a ConcurrentHashMap, and + // (b) both shutdown and the evictor use conditional remove(key, value) to mutate it, + // so at most one side ever closes a given ScannerContext. + if (evictorTask != null) { + evictorTask.cancel(false); + evictorTask = null; + } + + for (Map.Entry entry : scanners.entrySet()) { + if (scanners.remove(entry.getKey(), entry.getValue())) { + decrementCounts(entry.getValue().getTableBucket()); + closeScannerContext(entry.getValue()); + } + } + + // Note: totalScanners and perBucketCount are not forcibly reset here. Because both + // shutdown and the evictor use conditional remove(key, value), each scanner is + // decremented exactly once, so the counters naturally reach zero. A forced reset + // would risk driving counters negative if the evictor wins a remove during close(). + recentlyExpiredIds.clear(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 7a48813237..afd6b17d56 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -61,6 +61,7 @@ import org.apache.fluss.server.kv.RemoteLogFetcher; import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath; @@ -209,6 +210,15 @@ public final class Replica { private volatile @Nullable CloseableRegistry closeableRegistryForKv; private @Nullable PeriodicSnapshotManager kvSnapshotManager; + /** + * Optional reference to the server-wide {@link ScannerManager}. When set, active scanner + * sessions for this bucket are closed eagerly in {@link #dropKv()} as a safety net, even on + * code paths that do not go through {@link + * org.apache.fluss.server.replica.ReplicaManager#makeFollowers} or {@link + * org.apache.fluss.server.replica.ReplicaManager#stopReplicas}. + */ + @Nullable private volatile ScannerManager scannerManager; + // ------- metrics private Counter isrShrinks; private Counter isrExpands; @@ -376,6 +386,11 @@ public Path getTabletParentDir() { return kvTablet; } + /** Injects the {@link ScannerManager} so that {@link #dropKv()} can close active scanners. */ + public void setScannerManager(@Nullable ScannerManager scannerManager) { + this.scannerManager = scannerManager; + } + public TablePath getTablePath() { return physicalPath.getTablePath(); } @@ -669,6 +684,15 @@ private void createKv() { } private void dropKv() { + // Safety net: close any lingering scanner sessions for this bucket before tearing down + // the KV tablet. The main cleanup paths (makeFollowers, stopReplica) call + // ScannerManager.closeScannersForBucket directly on ReplicaManager, but this guard + // ensures ResourceGuard leases are released even on unexpected code paths, preventing + // KvTablet.close() from blocking indefinitely on resourceGuard.close(). + ScannerManager sm = this.scannerManager; + if (sm != null) { + sm.closeScannersForBucket(tableBucket); + } // close any closeable registry for kv if (closeableRegistry.unregisterCloseable(closeableRegistryForKv)) { IOUtils.closeQuietly(closeableRegistryForKv); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 3b847266eb..b34cea21f9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidColumnProjectionException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidRequiredAcksException; +import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.LogOffsetOutOfRangeException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.NotLeaderOrFollowerException; @@ -79,6 +80,8 @@ import org.apache.fluss.server.entity.UserContext; import org.apache.fluss.server.kv.KvManager; import org.apache.fluss.server.kv.KvSnapshotResource; +import org.apache.fluss.server.kv.KvTablet; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter; import org.apache.fluss.server.kv.snapshot.DefaultSnapshotContext; import org.apache.fluss.server.log.FetchDataInfo; @@ -209,6 +212,8 @@ public class ReplicaManager implements ServerReconfigurable { private final Clock clock; + @Nullable private ScannerManager scannerManager; + public ReplicaManager( Configuration conf, Scheduler scheduler, @@ -324,6 +329,10 @@ public void startup() { conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis() / 2); } + public void setScannerManager(ScannerManager scannerManager) { + this.scannerManager = scannerManager; + } + public RemoteLogManager getRemoteLogManager() { return remoteLogManager; } @@ -1155,6 +1164,9 @@ private void makeFollowers( Replica replica = getReplicaOrException(data.getTableBucket()); if (replica.makeFollower(data)) { replicasBecomeFollower.add(replica); + if (scannerManager != null) { + scannerManager.closeScannersForBucket(tb); + } } // stop the remote log tiering tasks for followers remoteLogManager.stopLogTiering(replica); @@ -1833,6 +1845,16 @@ private StopReplicaResultForBucket stopReplica( // First stop fetchers for this table bucket. replicaFetcherManager.removeFetcherForBuckets(Collections.singleton(tb)); + // Close active scanner sessions for this bucket before tearing down the KV tablet. + // A concurrent scanKv RPC in flight can still reach getLeaderKvTablet(tb) and race + // with createScanner. Both KvTablet.openScan and ResourceGuard.acquireResource() + // synchronise with the subsequent replica/RocksDB close, so the worst case is an + // IOException surfaced cleanly as an RPC error — never cursor corruption or a + // post-shutdown snapshot. + if (scannerManager != null) { + scannerManager.closeScannersForBucket(tb); + } + HostedReplica replica = getReplica(tb); if (replica instanceof OnlineReplica) { Replica replicaToDelete = ((OnlineReplica) replica).getReplica(); @@ -1946,6 +1968,9 @@ protected Optional maybeCreateReplica(NotifyLeaderAndIsrData data) { tableInfo, clock, remoteLogManager); + // Inject the ScannerManager so that Replica.dropKv() can eagerly close scanner + // sessions as a safety net on unexpected shutdown paths. + replica.setScannerManager(scannerManager); allReplicas.put(tb, new OnlineReplica(replica)); replicaOpt = Optional.of(replica); } else if (hostedReplica instanceof OnlineReplica) { @@ -1978,6 +2003,33 @@ public HostedReplica getReplica(TableBucket tableBucket) { return allReplicas.getOrDefault(tableBucket, new NoneReplica()); } + /** + * Returns the {@link KvTablet} for the local leader replica of the given bucket. + * + * @throws NotLeaderOrFollowerException if this server is not the leader for the bucket + * @throws InvalidTableException if the bucket does not have KV storage (not a primary-key + * table) + * @throws UnknownTableOrBucketException if the bucket is not known to this server + */ + public KvTablet getLeaderKvTablet(TableBucket tableBucket) { + Replica replica = getReplicaOrException(tableBucket); + if (!replica.isLeader()) { + throw new NotLeaderOrFollowerException( + String.format( + "Leader not local for bucket %s on tablet server %d", + tableBucket, serverId)); + } + KvTablet kvTablet = replica.getKvTablet(); + if (kvTablet == null) { + throw new InvalidTableException( + String.format( + "Bucket %s does not have KV storage. " + + "Full KV scan is only supported on primary-key tables.", + tableBucket)); + } + return kvTablet; + } + private boolean isRequiredAcksInvalid(int requiredAcks) { return requiredAcks != 0 && requiredAcks != 1 && requiredAcks != -1; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 988f7565ac..247d270ba7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -40,6 +40,7 @@ import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader; import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.kv.KvManager; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.DefaultCompletedKvSnapshotCommitter; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.remote.RemoteLogManager; @@ -145,6 +146,9 @@ public class TabletServer extends ServerBase { @GuardedBy("lock") private ReplicaManager replicaManager; + @GuardedBy("lock") + private ScannerManager scannerManager; + @GuardedBy("lock") private @Nullable RemoteLogManager remoteLogManager = null; @@ -281,6 +285,9 @@ protected void startServices() throws Exception { // Start dynamicConfigManager after all reconfigurable components are registered dynamicConfigManager.startup(); + this.scannerManager = new ScannerManager(conf, scheduler); + replicaManager.setScannerManager(scannerManager); + this.tabletService = new TabletService( serverId, @@ -291,7 +298,8 @@ protected void startServices() throws Exception { metadataManager, authorizer, dynamicConfigManager, - ioExecutor); + ioExecutor, + scannerManager); RequestsMetrics requestsMetrics = RequestsMetrics.createTabletServerRequestMetrics(tabletServerMetricGroup); @@ -433,6 +441,10 @@ CompletableFuture stopServices() { scheduler.shutdown(); } + if (scannerManager != null) { + scannerManager.close(); + } + if (kvManager != null) { kvManager.shutdown(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 08536adfd9..1a56f3e193 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -19,10 +19,14 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.exception.AuthorizationException; +import org.apache.fluss.exception.InvalidScanRequestException; +import org.apache.fluss.exception.ScannerExpiredException; +import org.apache.fluss.exception.UnknownScannerIdException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.DefaultValueRecordBatch; import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; @@ -52,6 +56,7 @@ import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import org.apache.fluss.rpc.messages.PbScanReqForBucket; import org.apache.fluss.rpc.messages.PrefixLookupRequest; import org.apache.fluss.rpc.messages.PrefixLookupResponse; import org.apache.fluss.rpc.messages.ProduceLogRequest; @@ -76,6 +81,8 @@ import org.apache.fluss.server.entity.FetchReqInfo; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.entity.UserContext; +import org.apache.fluss.server.kv.scan.ScannerContext; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.log.FetchParams; import org.apache.fluss.server.log.FetchParamsBuilder; import org.apache.fluss.server.log.FilterInfo; @@ -137,6 +144,7 @@ public final class TabletService extends RpcServiceBase implements TabletServerG private final ReplicaManager replicaManager; private final TabletServerMetadataCache metadataCache; private final TabletServerMetadataProvider metadataFunctionProvider; + private final ScannerManager scannerManager; public TabletService( int serverId, @@ -147,7 +155,8 @@ public TabletService( MetadataManager metadataManager, @Nullable Authorizer authorizer, DynamicConfigManager dynamicConfigManager, - ExecutorService ioExecutor) { + ExecutorService ioExecutor, + ScannerManager scannerManager) { super( remoteFileSystem, ServerType.TABLET_SERVER, @@ -161,6 +170,7 @@ public TabletService( this.metadataCache = metadataCache; this.metadataFunctionProvider = new TabletServerMetadataProvider(zkClient, metadataManager, metadataCache); + this.scannerManager = scannerManager; } @Override @@ -435,7 +445,146 @@ public CompletableFuture notifyLakeTableOffset( @Override public CompletableFuture scanKv(ScanKvRequest request) { - return null; + ScanKvResponse response = new ScanKvResponse(); + // Tracks a live session that must be force-closed if anything after it throws. + // A scan that produced a partial but un-returned batch is not resumable: the RocksDB + // cursor has already advanced past rows whose data never reached the client, so we + // tear the session down and force the client to restart. This matches the non-resumable + // contract documented on ScannerExpiredException. + ScannerContext openedContext = null; + try { + ScannerContext context; + + if (request.hasBucketScanReq() && request.hasScannerId()) { + throw new InvalidScanRequestException( + "ScanKvRequest must not set both bucket_scan_req and scanner_id."); + } + + if (request.hasBucketScanReq()) { + // New scan: open a fresh scanner session + PbScanReqForBucket bucketReq = request.getBucketScanReq(); + long tableId = bucketReq.getTableId(); + authorizeTable(READ, tableId); + + TableBucket tableBucket = + new TableBucket( + tableId, + bucketReq.hasPartitionId() ? bucketReq.getPartitionId() : null, + bucketReq.getBucketId()); + Long limit = bucketReq.hasLimit() ? bucketReq.getLimit() : null; + + context = + scannerManager.createScanner( + replicaManager.getLeaderKvTablet(tableBucket), tableBucket, limit); + + if (context == null) { + // Bucket is empty — return an empty response immediately without registering a + // session. + response.setHasMoreResults(false); + return CompletableFuture.completedFuture(response); + } + openedContext = context; + } else { + if (!request.hasScannerId()) { + throw new InvalidScanRequestException( + "ScanKvRequest must have either bucket_scan_req (new scan) " + + "or scanner_id (continuation)."); + } + byte[] scannerId = request.getScannerId(); + context = scannerManager.getScanner(scannerId); + if (context == null) { + if (scannerManager.isRecentlyExpired(scannerId)) { + throw new ScannerExpiredException( + "Scanner session has expired due to inactivity. " + + "Please start a new scan."); + } else { + throw new UnknownScannerIdException( + "Unknown scanner ID. The session may have expired or " + + "never existed."); + } + } + // Validate call-sequence ordering to detect duplicate or out-of-order requests. + // getScanner() already refreshed the last-access timestamp. Use long arithmetic to + // avoid a silent 32-bit overflow at Integer.MAX_VALUE continuations. + if (request.hasCallSeqId()) { + long expectedSeqId = (long) context.getCallSeqId() + 1L; + int requestSeqId = request.getCallSeqId(); + if ((long) requestSeqId != expectedSeqId) { + throw new InvalidScanRequestException( + String.format( + "Out-of-order scan request: expected callSeqId=%d but got %d.", + expectedSeqId, requestSeqId)); + } + } + openedContext = context; + } + + // Handle explicit close request + if (request.hasCloseScanner() && request.isCloseScanner()) { + scannerManager.removeScanner(context); + openedContext = null; + response.setScannerId(context.getScannerId()); + response.setHasMoreResults(false); + return CompletableFuture.completedFuture(response); + } + + // Build the next batch + int batchSizeBytes = request.getBatchSizeBytes(); + if (batchSizeBytes <= 0) { + throw new InvalidScanRequestException("batch_size_bytes must be greater than 0."); + } + DefaultValueRecordBatch.Builder builder = DefaultValueRecordBatch.builder(); + long totalBytes = 0L; + + while (context.isValid() && totalBytes < batchSizeBytes) { + byte[] value = context.currentValue(); + builder.append(value); + totalBytes += value.length; + context.advance(); + } + + boolean hasMore = context.isValid(); + DefaultValueRecordBatch batch = builder.build(); + + response.setScannerId(context.getScannerId()); + response.setHasMoreResults(hasMore); + if (batch.sizeInBytes() > 0) { + response.setRecords(batch.getSegment(), batch.getPosition(), batch.sizeInBytes()); + } + + // Update callSeqId AFTER the response is prepared so that a client retry with the + // same callSeqId (due to a transient failure) can be detected and rejected. + if (request.hasCallSeqId()) { + context.setCallSeqId(request.getCallSeqId()); + } + + // Auto-close the session when all data has been drained. + if (!hasMore) { + scannerManager.removeScanner(context); + } + // Response successfully prepared — session state is consistent; do not force-close. + openedContext = null; + + } catch (Exception e) { + // Restore the interrupt flag if a lower-level call wrapped an InterruptedException. + // No method in the try block declares `throws InterruptedException` directly, but a + // future refactor or a rethrown wrapper should not silently lose the signal. + if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + response.setErrorCode(Errors.forException(e).code()); + response.setErrorMessage(e.getMessage() != null ? e.getMessage() : ""); + } finally { + // If we made it past createScanner/getScanner but failed to deliver a complete + // response, close the session rather than leaking it to TTL. The cursor has + // already advanced past rows whose values were never sent; resuming would drop + // data. Forcing a restart is the safe option. + if (openedContext != null) { + scannerManager.removeScanner(openedContext); + } + } + + return CompletableFuture.completedFuture(response); } @Override diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 792a0149ba..0894a1d317 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -57,6 +57,7 @@ import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.server.kv.rowmerger.RowMerger; +import org.apache.fluss.server.kv.scan.ScannerContext; import org.apache.fluss.server.log.FetchIsolation; import org.apache.fluss.server.log.LogAppendInfo; import org.apache.fluss.server.log.LogTablet; @@ -1813,4 +1814,142 @@ void testRowCountWithMixedOperations() throws Exception { kvTablet.close(); } + + @Test + void testOpenScan_emptyBucket_returnsNull() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + // No data has been written — openScan must return null. + ScannerContext context = kvTablet.openScan("scanner-empty", -1L, 0L); + assertThat(context).isNull(); + } + + @Test + void testOpenScan_returnsAllRows() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + int numRows = 5; + List rows = new ArrayList<>(); + for (int i = 0; i < numRows; i++) { + rows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "v" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + ScannerContext context = kvTablet.openScan("scanner-all", -1L, 0L); + assertThat(context).isNotNull(); + + int count = 0; + while (context.isValid()) { + assertThat(context.currentValue()).isNotNull(); + context.advance(); + count++; + } + context.close(); + + assertThat(count).isEqualTo(numRows); + } + + @Test + void testOpenScan_snapshotIsolation() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + // Write and flush 3 rows before opening the scan. + List initialRows = + Arrays.asList( + kvRecordFactory.ofRecord("0".getBytes(), new Object[] {0, "v0"}), + kvRecordFactory.ofRecord("1".getBytes(), new Object[] {1, "v1"}), + kvRecordFactory.ofRecord("2".getBytes(), new Object[] {2, "v2"})); + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(initialRows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + ScannerContext context = kvTablet.openScan("scanner-snap", -1L, 0L); + assertThat(context).isNotNull(); + + // Write 2 more rows AFTER opening the scan, then flush. + List lateRows = + Arrays.asList( + kvRecordFactory.ofRecord("3".getBytes(), new Object[] {3, "v3"}), + kvRecordFactory.ofRecord("4".getBytes(), new Object[] {4, "v4"})); + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(lateRows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + // The scan must still see only the 3 rows that existed at snapshot time. + int count = 0; + while (context.isValid()) { + context.advance(); + count++; + } + context.close(); + + assertThat(count).isEqualTo(3); + } + + @Test + void testOpenScan_withLimit() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + int numRows = 5; + List rows = new ArrayList<>(); + for (int i = 0; i < numRows; i++) { + rows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "v" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + long limit = 3L; + ScannerContext context = kvTablet.openScan("scanner-limit", limit, 0L); + assertThat(context).isNotNull(); + + int count = 0; + while (context.isValid()) { + context.advance(); + count++; + } + context.close(); + + // The scan must stop after exactly `limit` rows. + assertThat(count).isEqualTo((int) limit); + } + + @Test + void testOpenScan_multipleSessionsIndependent() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + List rows = + Arrays.asList( + kvRecordFactory.ofRecord("0".getBytes(), new Object[] {0, "v0"}), + kvRecordFactory.ofRecord("1".getBytes(), new Object[] {1, "v1"}), + kvRecordFactory.ofRecord("2".getBytes(), new Object[] {2, "v2"})); + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + // Open two independent scans. + ScannerContext ctx1 = kvTablet.openScan("scanner-a", -1L, 0L); + ScannerContext ctx2 = kvTablet.openScan("scanner-b", -1L, 0L); + assertThat(ctx1).isNotNull(); + assertThat(ctx2).isNotNull(); + + // Drain ctx1 fully. + int count1 = 0; + while (ctx1.isValid()) { + ctx1.advance(); + count1++; + } + ctx1.close(); + + // ctx2 cursor must be unaffected; it should still see all 3 rows. + int count2 = 0; + while (ctx2.isValid()) { + ctx2.advance(); + count2++; + } + ctx2.close(); + + assertThat(count1).isEqualTo(3); + assertThat(count2).isEqualTo(3); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java new file mode 100644 index 0000000000..b054d30960 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.exception.TooManyScannersException; +import org.apache.fluss.memory.TestingMemorySegmentPool; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.SchemaInfo; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.KvRecord; +import org.apache.fluss.record.KvRecordTestUtils; +import org.apache.fluss.record.TestData; +import org.apache.fluss.record.TestingSchemaGetter; +import org.apache.fluss.server.kv.KvManager; +import org.apache.fluss.server.kv.KvTablet; +import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; +import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory; +import org.apache.fluss.server.kv.rowmerger.RowMerger; +import org.apache.fluss.server.log.LogTablet; +import org.apache.fluss.server.log.LogTestUtils; +import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; +import org.apache.fluss.utils.clock.ManualClock; +import org.apache.fluss.utils.concurrent.FlussScheduler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; +import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link ScannerManager}. */ +class ScannerManagerTest { + + private static final short SCHEMA_ID = 1; + + @TempDir File tempLogDir; + @TempDir File tmpKvDir; + + private final Configuration conf = new Configuration(); + private final KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory = + KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID); + private final KvRecordTestUtils.KvRecordFactory kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(TestData.DATA1_ROW_TYPE); + + private ManualClock clock; + private FlussScheduler scheduler; + private LogTablet logTablet; + private KvTablet kvTablet; + + @BeforeEach + void setUp() throws Exception { + clock = new ManualClock(0); + scheduler = new FlussScheduler(1); + scheduler.startup(); + + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(TablePath.of("testDb", "t1")); + TestingSchemaGetter schemaGetter = + new TestingSchemaGetter(new SchemaInfo(DATA1_SCHEMA_PK, SCHEMA_ID)); + + File logTabletDir = + LogTestUtils.makeRandomLogTabletDir( + tempLogDir, + physicalTablePath.getDatabaseName(), + 0L, + physicalTablePath.getTableName()); + logTablet = + LogTablet.create( + physicalTablePath, + logTabletDir, + conf, + TestingMetricGroups.TABLET_SERVER_METRICS, + 0, + new FlussScheduler(1), + LogFormat.ARROW, + 1, + true, + org.apache.fluss.utils.clock.SystemClock.getInstance(), + true); + + TableBucket tableBucket = logTablet.getTableBucket(); + TableConfig tableConf = new TableConfig(new Configuration()); + RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, schemaGetter); + AutoIncrementManager autoIncrementManager = + new AutoIncrementManager( + schemaGetter, + physicalTablePath.getTablePath(), + tableConf, + new TestingSequenceGeneratorFactory()); + + kvTablet = + KvTablet.create( + physicalTablePath, + tableBucket, + logTablet, + tmpKvDir, + conf, + TestingMetricGroups.TABLET_SERVER_METRICS, + new RootAllocator(Long.MAX_VALUE), + new TestingMemorySegmentPool(10 * 1024), + KvFormat.COMPACTED, + rowMerger, + DEFAULT_COMPRESSION, + schemaGetter, + tableConf.getChangelogImage(), + KvManager.getDefaultRateLimiter(), + autoIncrementManager); + } + + @AfterEach + void tearDown() throws Exception { + if (kvTablet != null) { + kvTablet.close(); + } + if (logTablet != null) { + logTablet.close(); + } + if (scheduler != null) { + scheduler.shutdown(); + } + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /** Creates a {@link ScannerManager} with a long TTL so the evictor never fires during tests. */ + private ScannerManager createManager() { + Configuration c = new Configuration(); + c.set(ConfigOptions.KV_SCANNER_TTL, Duration.ofHours(1)); + c.set(ConfigOptions.KV_SCANNER_EXPIRATION_INTERVAL, Duration.ofHours(1)); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_BUCKET, 8); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_SERVER, 200); + return new ScannerManager(c, scheduler, clock); + } + + /** Creates a {@link ScannerManager} with configurable limits and a long evictor interval. */ + private ScannerManager createManager(int maxPerBucket, int maxPerServer) { + Configuration c = new Configuration(); + c.set(ConfigOptions.KV_SCANNER_TTL, Duration.ofHours(1)); + c.set(ConfigOptions.KV_SCANNER_EXPIRATION_INTERVAL, Duration.ofHours(1)); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_BUCKET, maxPerBucket); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_SERVER, maxPerServer); + return new ScannerManager(c, scheduler, clock); + } + + /** + * Creates a {@link ScannerManager} with a short TTL and evictor interval for eviction tests. + */ + private ScannerManager createManagerWithShortTtl(long ttlMs, long expirationIntervalMs) { + Configuration c = new Configuration(); + c.set(ConfigOptions.KV_SCANNER_TTL, Duration.ofMillis(ttlMs)); + c.set( + ConfigOptions.KV_SCANNER_EXPIRATION_INTERVAL, + Duration.ofMillis(expirationIntervalMs)); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_BUCKET, 8); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_SERVER, 200); + return new ScannerManager(c, scheduler, clock); + } + + /** Writes {@code count} rows into the KvTablet and flushes to RocksDB. */ + private void putAndFlush(int count) throws Exception { + List rows = new ArrayList<>(); + for (int i = 0; i < count; i++) { + rows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "v" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + } + + // ------------------------------------------------------------------------- + // Tests + // ------------------------------------------------------------------------- + + @Test + void testCreateScanner_emptyBucket_returnsNull() throws Exception { + try (ScannerManager manager = createManager()) { + TableBucket tableBucket = kvTablet.getTableBucket(); + // Bucket has no data — openScan must return null; no slot consumed. + ScannerContext context = manager.createScanner(kvTablet, tableBucket, null); + assertThat(context).isNull(); + assertThat(manager.activeScannerCount()).isEqualTo(0); + } + } + + @Test + void testCreateAndRemoveScanner() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + TableBucket tableBucket = kvTablet.getTableBucket(); + + ScannerContext context = manager.createScanner(kvTablet, tableBucket, null); + assertThat(context).isNotNull(); + assertThat(manager.activeScannerCount()).isEqualTo(1); + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(1); + + manager.removeScanner(context); + assertThat(manager.activeScannerCount()).isEqualTo(0); + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(0); + } + } + + @Test + void testGetScanner_refreshesLastAccessTime() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + TableBucket tableBucket = kvTablet.getTableBucket(); + + // Create scanner at t=0. + ScannerContext context = manager.createScanner(kvTablet, tableBucket, null); + assertThat(context).isNotNull(); + byte[] scannerId = context.getScannerId(); + + // Advance clock far past any TTL, then getScanner to refresh. + clock.advanceTime(5000, TimeUnit.MILLISECONDS); + ScannerContext fetched = manager.getScanner(scannerId); + assertThat(fetched).isSameAs(context); + + // With a 1-hour TTL, isExpired must be false right after the refresh. + assertThat(context.isExpired(3_600_000L, clock.milliseconds())).isFalse(); + + manager.removeScanner(context); + } + } + + @Test + void testTtlEviction() throws Exception { + putAndFlush(3); + // TTL = 200 ms, evictor every 200 ms — wide enough for slow CI schedulers. + ScannerManager manager = createManagerWithShortTtl(200, 200); + try { + TableBucket tableBucket = kvTablet.getTableBucket(); + + ScannerContext context = manager.createScanner(kvTablet, tableBucket, null); + assertThat(context).isNotNull(); + byte[] scannerId = context.getScannerId(); + + // Advance ManualClock past TTL so the evictor considers the session idle. + clock.advanceTime(500, TimeUnit.MILLISECONDS); + + // Wait for the real scheduler to invoke the cleanup task. + long deadline = System.currentTimeMillis() + 10_000; + while (manager.activeScannerCount() > 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(50); + } + + assertThat(manager.activeScannerCount()).isEqualTo(0); + assertThat(manager.getScanner(scannerId)).isNull(); + assertThat(manager.isRecentlyExpired(scannerId)).isTrue(); + } finally { + manager.close(); + } + } + + @Test + void testPerBucketLimit() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager(2, 200)) { + TableBucket tableBucket = kvTablet.getTableBucket(); + + ScannerContext ctx1 = manager.createScanner(kvTablet, tableBucket, null); + ScannerContext ctx2 = manager.createScanner(kvTablet, tableBucket, null); + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(2); + + assertThatThrownBy(() -> manager.createScanner(kvTablet, tableBucket, null)) + .isInstanceOf(TooManyScannersException.class); + + // Count must not have changed after the failed attempt. + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(2); + + manager.removeScanner(ctx1); + manager.removeScanner(ctx2); + } + } + + @Test + void testPerServerLimit() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager(8, 2)) { + TableBucket tableBucket = kvTablet.getTableBucket(); + + ScannerContext ctx1 = manager.createScanner(kvTablet, tableBucket, null); + ScannerContext ctx2 = manager.createScanner(kvTablet, tableBucket, null); + assertThat(manager.activeScannerCount()).isEqualTo(2); + + assertThatThrownBy(() -> manager.createScanner(kvTablet, tableBucket, null)) + .isInstanceOf(TooManyScannersException.class); + + assertThat(manager.activeScannerCount()).isEqualTo(2); + + manager.removeScanner(ctx1); + manager.removeScanner(ctx2); + } + } + + @Test + void testCloseScannersForBucket() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + TableBucket tableBucket = kvTablet.getTableBucket(); + + manager.createScanner(kvTablet, tableBucket, null); + manager.createScanner(kvTablet, tableBucket, null); + assertThat(manager.activeScannerCount()).isEqualTo(2); + + manager.closeScannersForBucket(tableBucket); + + assertThat(manager.activeScannerCount()).isEqualTo(0); + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(0); + } + } + + @Test + void testShutdown_closesAllScanners() throws Exception { + putAndFlush(3); + ScannerManager manager = createManager(); + TableBucket tableBucket = kvTablet.getTableBucket(); + + manager.createScanner(kvTablet, tableBucket, null); + manager.createScanner(kvTablet, tableBucket, null); + assertThat(manager.activeScannerCount()).isEqualTo(2); + + manager.close(); + + assertThat(manager.activeScannerCount()).isEqualTo(0); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index 0d9f5bc565..378c4623e0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -37,6 +37,7 @@ import org.apache.fluss.record.DefaultValueRecordBatch; import org.apache.fluss.record.KvRecord; import org.apache.fluss.record.KvRecordBatch; +import org.apache.fluss.record.KvRecordTestUtils; import org.apache.fluss.record.LogRecord; import org.apache.fluss.record.LogRecordBatch; import org.apache.fluss.record.LogRecordReadContext; @@ -62,7 +63,9 @@ import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import org.apache.fluss.server.entity.StopReplicaData; import org.apache.fluss.server.entity.StopReplicaResultForBucket; +import org.apache.fluss.server.kv.KvTablet; import org.apache.fluss.server.kv.rocksdb.RocksDBKv; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.log.FetchParams; import org.apache.fluss.server.log.ListOffsetsParam; @@ -72,6 +75,7 @@ import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.metadata.TableMetadata; import org.apache.fluss.server.testutils.KvTestUtils; +import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.testutils.DataTestUtils; @@ -79,6 +83,7 @@ import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.concurrent.FlussScheduler; import org.apache.fluss.utils.types.Tuple2; import org.junit.jupiter.api.Test; @@ -2370,4 +2375,56 @@ private void assertUpdateMetadataEquals( } }); } + + /** + * When a replica is stopped, {@link ScannerManager#closeScannersForBucket} must be called so + * that open scanner sessions are released before the KV store is destroyed. + */ + @Test + void testStopReplicas_closesScanners() throws Exception { + TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 0); + makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb.getBucket()); + + KvTablet kvTablet = + kvManager + .getKv(tb) + .orElseThrow(() -> new IllegalStateException("KvTablet not found")); + KvRecordTestUtils.KvRecordBatchFactory batchFactory = + KvRecordTestUtils.KvRecordBatchFactory.of(DEFAULT_SCHEMA_ID); + KvRecordTestUtils.KvRecordFactory recordFactory = + KvRecordTestUtils.KvRecordFactory.of(DATA1_ROW_TYPE); + kvTablet.putAsLeader( + batchFactory.ofRecords( + Collections.singletonList( + recordFactory.ofRecord("k1".getBytes(), new Object[] {1, "v1"}))), + null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + FlussScheduler testScheduler = new FlussScheduler(1); + testScheduler.startup(); + try (ScannerManager scannerManager = new ScannerManager(conf, testScheduler)) { + + replicaManager.setScannerManager(scannerManager); + scannerManager.createScanner(kvTablet, tb, null); + assertThat(scannerManager.activeScannerCount()).isEqualTo(1); + + CompletableFuture> future = new CompletableFuture<>(); + replicaManager.stopReplicas( + INITIAL_COORDINATOR_EPOCH, + Collections.singletonList( + new StopReplicaData( + tb, + false, + false, + INITIAL_COORDINATOR_EPOCH, + INITIAL_LEADER_EPOCH)), + future::complete); + future.get(); + + assertThat(scannerManager.activeScannerCount()).isEqualTo(0); + } finally { + testScheduler.shutdown(); + replicaManager.setScannerManager(null); + } + } } diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 48fc393b80..3c5d45e098 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -171,6 +171,10 @@ during the Fluss cluster working. | kv.rocksdb.bloom-filter.block-based-mode | Boolean | false | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is `false`. | | kv.rocksdb.shared-rate-limiter-bytes-per-sec | MemorySize | Long.MAX_VALUE | The bytes per second rate limit for RocksDB flush and compaction operations shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to limit the rate. This configuration can be updated dynamically without server restart. See [Updating Configs](operations/updating-configs.md) for more details. | | kv.recover.log-record-batch.max-size | MemorySize | 16mb | The max fetch size for fetching log to apply to kv during recovering kv. | +| kv.scanner.ttl | Duration | 10min | The time-to-live for an idle KV scanner session on the server. A scanner that has not received a request within this duration will be automatically expired and its resources released. The default value is 10 minutes. | +| kv.scanner.expiration-interval | Duration | 30s | The interval at which the server checks for and removes expired KV scanner sessions. The default value is 30 seconds. | +| kv.scanner.max-per-bucket | Integer | 8 | The maximum number of concurrent KV scanner sessions allowed per bucket. New scan requests that exceed this limit will be rejected with an error. The default value is 8. | +| kv.scanner.max-per-server | Integer | 200 | The maximum total number of concurrent KV scanner sessions allowed across all buckets on a single tablet server. New scan requests that exceed this limit will be rejected with an error. The default value is 200. | ## Metrics