Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,41 @@ public class ConfigOptions {
+ WRITER_ID_EXPIRATION_TIME.key()
+ " passing. The default value is 10 minutes.");

public static final ConfigOption<Duration> KV_SCANNER_TTL =
key("kv.scanner.ttl")
.durationType()
.defaultValue(Duration.ofMinutes(10))
.withDescription(
"The time-to-live for an idle KV scanner session on the server. A scanner that has not "
+ "received a request within this duration will be automatically expired and its "
+ "resources released. The default value is 10 minutes.");

public static final ConfigOption<Duration> KV_SCANNER_EXPIRATION_INTERVAL =
key("kv.scanner.expiration-interval")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"How often the TTL evictor runs to close idle scanner sessions. "
+ "The default value is 30 seconds.");

public static final ConfigOption<Integer> KV_SCANNER_MAX_PER_BUCKET =
key("kv.scanner.max-per-bucket")
.intType()
.defaultValue(8)
.withDescription(
"Maximum number of concurrent scanner sessions per bucket. "
+ "Exceeding this limit returns TOO_MANY_SCANNERS. "
+ "The default value is 8.");

public static final ConfigOption<Integer> KV_SCANNER_MAX_PER_SERVER =
key("kv.scanner.max-per-server")
.intType()
.defaultValue(200)
.withDescription(
"Maximum number of concurrent scanner sessions per tablet server. "
+ "Exceeding this limit returns TOO_MANY_SCANNERS. "
+ "The default value is 200.");

public static final ConfigOption<Integer> TABLET_SERVER_CONTROLLED_SHUTDOWN_MAX_RETRIES =
key("tablet-server.controlled-shutdown.max-retries")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger;
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.kv.scan.ScannerContext;
import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader;
import org.apache.fluss.server.kv.snapshot.RocksIncrementalSnapshot;
Expand All @@ -70,12 +71,17 @@
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.utils.FatalErrorHandler;
import org.apache.fluss.server.utils.ResourceGuard;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.BytesUtils;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.IOUtils;

import org.rocksdb.RateLimiter;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -756,6 +762,83 @@ public List<byte[]> limitScan(int limit) throws IOException {
});
}

/**
* Opens a new full-scan session, taking a point-in-time RocksDB snapshot under the {@code
* kvLock} read lock.
*
* <p>Returns {@code null} if the bucket contains no rows at the time of the call. In that case
* all acquired resources are released internally and no session is registered.
*
* <p>The returned {@link ScannerContext} is <em>unregistered</em> — the caller ({@link
* org.apache.fluss.server.kv.scan.ScannerManager}) is responsible for registering it and for
* closing it when the scan is complete.
*
* @param scannerId the server-assigned scanner ID
* @param limit maximum number of rows to return across all batches ({@code ≤ 0} = unlimited)
* @param initialAccessTimeMs wall-clock time (ms) to use as the initial last-access timestamp
* @return a newly created, cursor-positioned {@link ScannerContext}, or {@code null} if the
* bucket is empty
* @throws IOException if the ResourceGuard is already closed (RocksDB is shutting down)
*/
Comment thread
polyzos marked this conversation as resolved.
@Nullable
public ScannerContext openScan(String scannerId, long limit, long initialAccessTimeMs)
throws IOException {
return inReadLock(
kvLock,
() -> {
rocksDBKv.checkIfRocksDBClosed();
ResourceGuard.Lease lease = rocksDBKv.getResourceGuard().acquireResource();
Snapshot snapshot = null;
ReadOptions readOptions = null;
RocksIterator iterator = null;
boolean success = false;
try {
snapshot = rocksDBKv.getDb().getSnapshot();
readOptions = new ReadOptions().setSnapshot(snapshot);
iterator =
rocksDBKv
.getDb()
.newIterator(
rocksDBKv.getDefaultColumnFamilyHandle(),
readOptions);
iterator.seekToFirst();
if (!iterator.isValid()) {
// Empty bucket: no session will be registered; cleanup in finally.
return null;
}
Comment thread
polyzos marked this conversation as resolved.
ScannerContext context =
new ScannerContext(
scannerId,
tableBucket,
rocksDBKv,
iterator,
readOptions,
snapshot,
lease,
limit,
initialAccessTimeMs);
success = true;
return context;
} finally {
if (!success) {
// Release in reverse allocation order. Each close is independent,
// so a failure in one must not prevent the others from running.
IOUtils.closeQuietly(iterator);
IOUtils.closeQuietly(readOptions);
if (snapshot != null) {
try {
rocksDBKv.getDb().releaseSnapshot(snapshot);
} catch (Throwable t) {
LOG.warn("Error releasing RocksDB snapshot.", t);
}
IOUtils.closeQuietly(snapshot);
}
IOUtils.closeQuietly(lease);
}
}
});
}

public KvBatchWriter createKvBatchWriter() {
return rocksDBKv.newWriteBatch(
writeBatchSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv.scan;

import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
import org.apache.fluss.server.utils.ResourceGuard;

import org.rocksdb.ReadOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Server-side state for a single KV full-scan session.
*
* <p>A {@code ScannerContext} holds a point-in-time RocksDB {@link Snapshot}, the {@link
* ReadOptions} pinning it, and a cursor ({@link RocksIterator}) that persists across multiple
* batched-fetch RPCs from the same client. It also holds a {@link ResourceGuard.Lease} that
* prevents the underlying RocksDB instance from being closed while the scan is in progress.
*
* <p>Instances are created by {@link org.apache.fluss.server.kv.KvTablet#openScan} and registered
* by {@link ScannerManager}. They must be closed when the scan completes, the client requests an
* explicit close, or the session expires due to inactivity.
*
* <p><b>Thread safety:</b> The iterator cursor ({@link #advance()}, {@link #isValid()}, {@link
* #currentValue()}) must be driven by only one thread at a time. {@link #close()} is thread-safe.
*/
@NotThreadSafe
public class ScannerContext implements Closeable {
private final String scannerId;
private final byte[] scannerIdBytes;
private final TableBucket tableBucket;
private final RocksDBKv rocksDBKv;
private final RocksIterator iterator;
private final ReadOptions readOptions;
private final Snapshot snapshot;
private final ResourceGuard.Lease resourceLease;
private long remainingLimit;
// Initial value -1 so that the first client call_seq_id of 0 satisfies the server's
// in-order check: expectedSeqId = callSeqId + 1 = -1 + 1 = 0.
// callSeqId validation is only performed for continuation requests (those carrying a
// scanner_id), never for the initial open request (those carrying a bucket_scan_req).
private int callSeqId = -1;

/**
* Wall-clock timestamp (ms) of the most recent request that touched this session. Used by
* {@link ScannerManager} for TTL-based eviction.
*/
private long lastAccessTime;

private final AtomicBoolean closed = new AtomicBoolean(false);

public ScannerContext(
String scannerId,
TableBucket tableBucket,
RocksDBKv rocksDBKv,
RocksIterator iterator,
ReadOptions readOptions,
Snapshot snapshot,
ResourceGuard.Lease resourceLease,
long limit,
long initialAccessTimeMs) {
this.scannerId = scannerId;
this.scannerIdBytes = scannerId.getBytes(StandardCharsets.UTF_8);
this.tableBucket = tableBucket;
this.rocksDBKv = rocksDBKv;
this.iterator = iterator;
this.readOptions = readOptions;
this.snapshot = snapshot;
this.resourceLease = resourceLease;
this.remainingLimit = limit <= 0 ? -1L : limit;
this.lastAccessTime = initialAccessTimeMs;
}

public byte[] getScannerId() {
return scannerIdBytes;
}

/**
* Returns the scanner ID as a UTF-8 {@link String}. Package-private: used by {@link
* ScannerManager} as the key in its internal {@code scanners} map. The wire-format
* representation is always {@link #getScannerId()} (raw bytes).
*/
String getIdString() {
return scannerId;
}

public TableBucket getTableBucket() {
return tableBucket;
}

public boolean isValid() {
return iterator.isValid() && remainingLimit != 0;
}

public byte[] currentValue() {
return iterator.value();
}

/**
* Advances the cursor by one entry and decrements the remaining-rows limit if applicable. Must
* only be called when {@link #isValid()} returns {@code true}.
*/
public void advance() {
iterator.next();
if (remainingLimit > 0) {
remainingLimit--;
}
}

/** Returns the call-sequence ID of the last successfully served request, or {@code -1}. */
public int getCallSeqId() {
return callSeqId;
}

/**
* Updates the call-sequence ID. Must be called <em>after</em> the response payload has been
* fully prepared, so that a client retry with the same {@code callSeqId} can be detected.
*/
public void setCallSeqId(int callSeqId) {
this.callSeqId = callSeqId;
}

/** Resets the idle-TTL timer to the given wall-clock time. */
public void updateLastAccessTime(long nowMs) {
this.lastAccessTime = nowMs;
}

/**
* Returns {@code true} if the session has been idle for longer than {@code ttlMs}, based on the
* provided current time.
*/
public boolean isExpired(long ttlMs, long nowMs) {
return nowMs - lastAccessTime > ttlMs;
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
try {
iterator.close();
} finally {
try {
readOptions.close();
} finally {
try {
rocksDBKv.getDb().releaseSnapshot(snapshot);
snapshot.close();
} finally {
resourceLease.close();
}
}
}
}
}
}
Loading