-
Notifications
You must be signed in to change notification settings - Fork 527
[KV] Kvscan Server Side #3151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
polyzos
wants to merge
5
commits into
apache:main
Choose a base branch
from
polyzos:kvscan-server-side
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
[KV] Kvscan Server Side #3151
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
177 changes: 177 additions & 0 deletions
177
fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,177 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.server.kv.scan; | ||
|
|
||
| import org.apache.fluss.metadata.TableBucket; | ||
| import org.apache.fluss.server.kv.rocksdb.RocksDBKv; | ||
| import org.apache.fluss.server.utils.ResourceGuard; | ||
|
|
||
| import org.rocksdb.ReadOptions; | ||
| import org.rocksdb.RocksIterator; | ||
| import org.rocksdb.Snapshot; | ||
|
|
||
| import javax.annotation.concurrent.NotThreadSafe; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| /** | ||
| * Server-side state for a single KV full-scan session. | ||
| * | ||
| * <p>A {@code ScannerContext} holds a point-in-time RocksDB {@link Snapshot}, the {@link | ||
| * ReadOptions} pinning it, and a cursor ({@link RocksIterator}) that persists across multiple | ||
| * batched-fetch RPCs from the same client. It also holds a {@link ResourceGuard.Lease} that | ||
| * prevents the underlying RocksDB instance from being closed while the scan is in progress. | ||
| * | ||
| * <p>Instances are created by {@link org.apache.fluss.server.kv.KvTablet#openScan} and registered | ||
| * by {@link ScannerManager}. They must be closed when the scan completes, the client requests an | ||
| * explicit close, or the session expires due to inactivity. | ||
| * | ||
| * <p><b>Thread safety:</b> The iterator cursor ({@link #advance()}, {@link #isValid()}, {@link | ||
| * #currentValue()}) must be driven by only one thread at a time. {@link #close()} is thread-safe. | ||
| */ | ||
| @NotThreadSafe | ||
| public class ScannerContext implements Closeable { | ||
| private final String scannerId; | ||
| private final byte[] scannerIdBytes; | ||
| private final TableBucket tableBucket; | ||
| private final RocksDBKv rocksDBKv; | ||
| private final RocksIterator iterator; | ||
| private final ReadOptions readOptions; | ||
| private final Snapshot snapshot; | ||
| private final ResourceGuard.Lease resourceLease; | ||
| private long remainingLimit; | ||
| // Initial value -1 so that the first client call_seq_id of 0 satisfies the server's | ||
| // in-order check: expectedSeqId = callSeqId + 1 = -1 + 1 = 0. | ||
| // callSeqId validation is only performed for continuation requests (those carrying a | ||
| // scanner_id), never for the initial open request (those carrying a bucket_scan_req). | ||
| private int callSeqId = -1; | ||
|
|
||
| /** | ||
| * Wall-clock timestamp (ms) of the most recent request that touched this session. Used by | ||
| * {@link ScannerManager} for TTL-based eviction. | ||
| */ | ||
| private long lastAccessTime; | ||
|
|
||
| private final AtomicBoolean closed = new AtomicBoolean(false); | ||
|
|
||
| public ScannerContext( | ||
| String scannerId, | ||
| TableBucket tableBucket, | ||
| RocksDBKv rocksDBKv, | ||
| RocksIterator iterator, | ||
| ReadOptions readOptions, | ||
| Snapshot snapshot, | ||
| ResourceGuard.Lease resourceLease, | ||
| long limit, | ||
| long initialAccessTimeMs) { | ||
| this.scannerId = scannerId; | ||
| this.scannerIdBytes = scannerId.getBytes(StandardCharsets.UTF_8); | ||
| this.tableBucket = tableBucket; | ||
| this.rocksDBKv = rocksDBKv; | ||
| this.iterator = iterator; | ||
| this.readOptions = readOptions; | ||
| this.snapshot = snapshot; | ||
| this.resourceLease = resourceLease; | ||
| this.remainingLimit = limit <= 0 ? -1L : limit; | ||
| this.lastAccessTime = initialAccessTimeMs; | ||
| } | ||
|
|
||
| public byte[] getScannerId() { | ||
| return scannerIdBytes; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the scanner ID as a UTF-8 {@link String}. Package-private: used by {@link | ||
| * ScannerManager} as the key in its internal {@code scanners} map. The wire-format | ||
| * representation is always {@link #getScannerId()} (raw bytes). | ||
| */ | ||
| String getIdString() { | ||
| return scannerId; | ||
| } | ||
|
|
||
| public TableBucket getTableBucket() { | ||
| return tableBucket; | ||
| } | ||
|
|
||
| public boolean isValid() { | ||
| return iterator.isValid() && remainingLimit != 0; | ||
| } | ||
|
|
||
| public byte[] currentValue() { | ||
| return iterator.value(); | ||
| } | ||
|
|
||
| /** | ||
| * Advances the cursor by one entry and decrements the remaining-rows limit if applicable. Must | ||
| * only be called when {@link #isValid()} returns {@code true}. | ||
| */ | ||
| public void advance() { | ||
| iterator.next(); | ||
| if (remainingLimit > 0) { | ||
| remainingLimit--; | ||
| } | ||
| } | ||
|
|
||
| /** Returns the call-sequence ID of the last successfully served request, or {@code -1}. */ | ||
| public int getCallSeqId() { | ||
| return callSeqId; | ||
| } | ||
|
|
||
| /** | ||
| * Updates the call-sequence ID. Must be called <em>after</em> the response payload has been | ||
| * fully prepared, so that a client retry with the same {@code callSeqId} can be detected. | ||
| */ | ||
| public void setCallSeqId(int callSeqId) { | ||
| this.callSeqId = callSeqId; | ||
| } | ||
|
|
||
| /** Resets the idle-TTL timer to the given wall-clock time. */ | ||
| public void updateLastAccessTime(long nowMs) { | ||
| this.lastAccessTime = nowMs; | ||
| } | ||
|
|
||
| /** | ||
| * Returns {@code true} if the session has been idle for longer than {@code ttlMs}, based on the | ||
| * provided current time. | ||
| */ | ||
| public boolean isExpired(long ttlMs, long nowMs) { | ||
| return nowMs - lastAccessTime > ttlMs; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (closed.compareAndSet(false, true)) { | ||
| try { | ||
| iterator.close(); | ||
| } finally { | ||
| try { | ||
| readOptions.close(); | ||
| } finally { | ||
| try { | ||
| rocksDBKv.getDb().releaseSnapshot(snapshot); | ||
| snapshot.close(); | ||
| } finally { | ||
| resourceLease.close(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.