From d4a7142fd18cfc0ab4de81c1a81fa7e9af9ddad0 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 20 Apr 2026 12:30:27 +0100 Subject: [PATCH 1/4] Support limit and filter for ScanBuilder and RepeatedScan Signed-off-by: Adam Gutglick --- vortex-bench/src/datasets/tpch_l_comment.rs | 8 +- vortex-cxx/src/read.rs | 15 +- vortex-datafusion/public-api.lock | 2 +- .../src/persistent/access_plan.rs | 5 +- vortex-datafusion/src/persistent/opener.rs | 15 +- vortex-file/public-api.lock | 2 +- vortex-file/src/file.rs | 3 +- vortex-file/src/tests.rs | 2 +- vortex-layout/public-api.lock | 82 +++-- vortex-layout/src/scan/arrow.rs | 11 +- vortex-layout/src/scan/limit.rs | 74 +++++ vortex-layout/src/scan/mod.rs | 1 + vortex-layout/src/scan/repeated_scan.rs | 118 ++++--- vortex-layout/src/scan/scan_builder.rs | 290 +++++++++++------- vortex-layout/src/scan/tasks.rs | 19 +- vortex-python/src/file.rs | 2 +- vortex-python/src/scan.rs | 3 +- 17 files changed, 414 insertions(+), 238 deletions(-) create mode 100644 vortex-layout/src/scan/limit.rs diff --git a/vortex-bench/src/datasets/tpch_l_comment.rs b/vortex-bench/src/datasets/tpch_l_comment.rs index 3488ef92979..7af091382a6 100644 --- a/vortex-bench/src/datasets/tpch_l_comment.rs +++ b/vortex-bench/src/datasets/tpch_l_comment.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use anyhow::Result; use async_trait::async_trait; +use futures::StreamExt; use futures::TryStreamExt; use glob::glob; use vortex::array::ArrayRef; @@ -77,12 +78,11 @@ impl Dataset for TPCHLCommentChunked { let file_chunks: Vec<_> = file .scan()? .with_projection(pack(vec![("l_comment", col("l_comment"))], NonNullable)) - .map(|a| { + .into_stream()? + .map(|result| { #[expect(deprecated)] - let canonical = a.to_canonical()?; - Ok(canonical.into_array()) + result.and_then(|a| a.to_canonical().map(|canonical| canonical.into_array())) }) - .into_array_stream()? .try_collect() .await?; chunks.extend(file_chunks); diff --git a/vortex-cxx/src/read.rs b/vortex-cxx/src/read.rs index 1ef4aec2734..b060c607cca 100644 --- a/vortex-cxx/src/read.rs +++ b/vortex-cxx/src/read.rs @@ -13,8 +13,8 @@ use arrow_schema::ArrowError; use arrow_schema::DataType; use arrow_schema::Schema; use arrow_schema::SchemaRef; +use futures::StreamExt; use futures::stream::TryStreamExt; -use vortex::array::ArrayRef; use vortex::array::arrow::IntoArrowArray; use vortex::buffer::Buffer; use vortex::file::OpenOptionsSessionExt; @@ -58,7 +58,7 @@ pub(crate) fn open_file_from_buffer(data: &[u8]) -> Result> { } pub(crate) struct VortexScanBuilder { - inner: ScanBuilder, + inner: ScanBuilder, output_schema: Option, } @@ -162,11 +162,14 @@ pub(crate) fn scan_builder_into_threadsafe_cloneable_reader( let stream = builder .inner - .map(move |b| { - b.into_arrow(&data_type) - .map(|struct_array| RecordBatch::from(struct_array.as_struct())) - }) .into_stream()? + .map(move |result| { + result.and_then(|chunk| { + chunk + .into_arrow(&data_type) + .map(|struct_array| RecordBatch::from(struct_array.as_struct())) + }) + }) .map_err(|e| ArrowError::ExternalError(Box::new(e))); let iter = RUNTIME.block_on_stream_thread_safe(|_h| stream); diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index 9d558c1de48..a806df7dfe2 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -108,7 +108,7 @@ pub struct vortex_datafusion::VortexAccessPlan impl vortex_datafusion::VortexAccessPlan -pub fn vortex_datafusion::VortexAccessPlan::apply_to_builder(&self, scan_builder: vortex_layout::scan::scan_builder::ScanBuilder) -> vortex_layout::scan::scan_builder::ScanBuilder where A: 'static + core::marker::Send +pub fn vortex_datafusion::VortexAccessPlan::apply_to_builder(&self, scan_builder: vortex_layout::scan::scan_builder::ScanBuilder) -> vortex_layout::scan::scan_builder::ScanBuilder pub fn vortex_datafusion::VortexAccessPlan::selection(&self) -> core::option::Option<&vortex_scan::selection::Selection> diff --git a/vortex-datafusion/src/persistent/access_plan.rs b/vortex-datafusion/src/persistent/access_plan.rs index 99583b7c0e4..bc6dc39e762 100644 --- a/vortex-datafusion/src/persistent/access_plan.rs +++ b/vortex-datafusion/src/persistent/access_plan.rs @@ -60,10 +60,7 @@ impl VortexAccessPlan { /// /// This is used internally by the file opener after it has translated a /// `PartitionedFile` into a Vortex scan. - pub fn apply_to_builder(&self, mut scan_builder: ScanBuilder) -> ScanBuilder - where - A: 'static + Send, - { + pub fn apply_to_builder(&self, mut scan_builder: ScanBuilder) -> ScanBuilder { let Self { selection } = self; if let Some(selection) = selection { diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index d93a849e75a..6675c4b8267 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -32,7 +32,6 @@ use futures::TryStreamExt; use futures::stream; use object_store::path::Path; use tracing::Instrument; -use vortex::array::ArrayRef; use vortex::array::VortexSessionExecute; use vortex::array::arrow::ArrowArrayExecutor; use vortex::error::VortexError; @@ -362,17 +361,21 @@ impl FileOpener for VortexOpener { scan_builder = scan_builder.with_concurrency(concurrency); } + let stream_schema = Arc::new(stream_schema); + let stream = scan_builder .with_metrics_registry(metrics_registry) .with_projection(scan_projection) .with_some_filter(filter) .with_ordered(has_output_ordering) + .into_stream() + .map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))? .map(move |chunk| { let mut ctx = session.create_execution_ctx(); - chunk.execute_record_batch(&stream_schema, &mut ctx) + chunk.and_then(|chunk| { + chunk.execute_record_batch(stream_schema.as_ref(), &mut ctx) + }) }) - .into_stream() - .map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))? .map_ok(move |rb| { // We try and slice the stream into respecting datafusion's configured batch size. stream::iter( @@ -426,8 +429,8 @@ fn apply_byte_range( file_range: FileRange, total_size: u64, row_count: u64, - scan_builder: ScanBuilder, -) -> ScanBuilder { + scan_builder: ScanBuilder, +) -> ScanBuilder { let row_range = byte_range_to_row_range( file_range.start as u64..file_range.end as u64, row_count, diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index 5e26eeb1a1f..484e2c8e308 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -278,7 +278,7 @@ pub fn vortex_file::VortexFile::layout_reader(&self) -> vortex_error::VortexResu pub fn vortex_file::VortexFile::row_count(&self) -> u64 -pub fn vortex_file::VortexFile::scan(&self) -> vortex_error::VortexResult> +pub fn vortex_file::VortexFile::scan(&self) -> vortex_error::VortexResult pub fn vortex_file::VortexFile::segment_source(&self) -> alloc::sync::Arc diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index 7257676b05c..1eb7d4411c0 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -10,7 +10,6 @@ use std::ops::Range; use std::sync::Arc; use itertools::Itertools; -use vortex_array::ArrayRef; use vortex_array::Columnar; use vortex_array::VortexSessionExecute; use vortex_array::dtype::DType; @@ -110,7 +109,7 @@ impl VortexFile { } /// Initiate a scan of the file, returning a builder for configuring the scan. - pub fn scan(&self) -> VortexResult> { + pub fn scan(&self) -> VortexResult { Ok(ScanBuilder::new( self.session.clone(), self.layout_reader()?, diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index e42d5671a00..305321ddc62 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -1223,7 +1223,7 @@ async fn write_nullable_top_level_struct() { async fn round_trip( array: &ArrayRef, - f: impl Fn(ScanBuilder) -> VortexResult>, + f: impl Fn(ScanBuilder) -> VortexResult, ) -> VortexResult { let mut writer = vec![]; SESSION diff --git a/vortex-layout/public-api.lock b/vortex-layout/public-api.lock index 22494a81f23..5dec7ee6164 100644 --- a/vortex-layout/public-api.lock +++ b/vortex-layout/public-api.lock @@ -1042,89 +1042,77 @@ pub fn vortex_layout::scan::multi::LayoutReaderFactory::open<'life0, 'async_trai pub mod vortex_layout::scan::repeated_scan -pub struct vortex_layout::scan::repeated_scan::RepeatedScan +pub struct vortex_layout::scan::repeated_scan::RepeatedScan -impl vortex_layout::scan::repeated_scan::RepeatedScan +impl vortex_layout::scan::repeated_scan::RepeatedScan -pub fn vortex_layout::scan::repeated_scan::RepeatedScan::dtype(&self) -> &vortex_array::dtype::DType +pub fn vortex_layout::scan::repeated_scan::RepeatedScan::dtype(&self) -> &vortex_array::dtype::DType -pub fn vortex_layout::scan::repeated_scan::RepeatedScan::execute_array_iter(&self, row_range: core::option::Option>, runtime: &B) -> vortex_error::VortexResult +pub fn vortex_layout::scan::repeated_scan::RepeatedScan::execute_array_iter(&self, row_range: core::option::Option>, runtime: &B) -> vortex_error::VortexResult -pub fn vortex_layout::scan::repeated_scan::RepeatedScan::execute_array_stream(&self, row_range: core::option::Option>) -> vortex_error::VortexResult +pub fn vortex_layout::scan::repeated_scan::RepeatedScan::execute_array_stream(&self, row_range: core::option::Option>) -> vortex_error::VortexResult -impl vortex_layout::scan::repeated_scan::RepeatedScan - -pub fn vortex_layout::scan::repeated_scan::RepeatedScan::execute(&self, row_range: core::option::Option>) -> vortex_error::VortexResult>>>> - -pub fn vortex_layout::scan::repeated_scan::RepeatedScan::execute_stream(&self, row_range: core::option::Option>) -> vortex_error::VortexResult> + core::marker::Send + 'static + use> - -pub fn vortex_layout::scan::repeated_scan::RepeatedScan::new(session: vortex_session::VortexSession, layout_reader: vortex_layout::LayoutReaderRef, projection: vortex_array::expr::expression::Expression, filter: core::option::Option, ordered: bool, row_range: core::option::Option>, selection: vortex_scan::selection::Selection, splits: vortex_layout::scan::splits::Splits, concurrency: usize, map_fn: alloc::sync::Arc<(dyn core::ops::function::Fn(vortex_array::array::erased::ArrayRef) -> vortex_error::VortexResult + core::marker::Send + core::marker::Sync)>, limit: core::option::Option, dtype: vortex_array::dtype::DType) -> Self +pub fn vortex_layout::scan::repeated_scan::RepeatedScan::new(session: vortex_session::VortexSession, layout_reader: vortex_layout::LayoutReaderRef, projection: vortex_array::expr::expression::Expression, filter: core::option::Option, ordered: bool, row_range: core::option::Option>, selection: vortex_scan::selection::Selection, splits: vortex_layout::scan::splits::Splits, concurrency: usize, limit: core::option::Option, dtype: vortex_array::dtype::DType) -> Self pub mod vortex_layout::scan::scan_builder -pub struct vortex_layout::scan::scan_builder::ScanBuilder - -impl vortex_layout::scan::scan_builder::ScanBuilder - -pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_array_iter(self, runtime: &B) -> vortex_error::VortexResult - -pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_array_stream(self) -> vortex_error::VortexResult +pub struct vortex_layout::scan::scan_builder::ScanBuilder -pub fn vortex_layout::scan::scan_builder::ScanBuilder::new(session: vortex_session::VortexSession, layout_reader: alloc::sync::Arc) -> Self +impl vortex_layout::scan::scan_builder::ScanBuilder -impl vortex_layout::scan::scan_builder::ScanBuilder +pub fn vortex_layout::scan::scan_builder::ScanBuilder::concurrency(&self) -> usize -pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_record_batch_reader(self, schema: arrow_schema::schema::SchemaRef, runtime: &B) -> vortex_error::VortexResult +pub fn vortex_layout::scan::scan_builder::ScanBuilder::dtype(&self) -> vortex_error::VortexResult -pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_record_batch_stream(self, schema: arrow_schema::schema::SchemaRef) -> vortex_error::VortexResult> + core::marker::Send + 'static> +pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_array_iter(self, runtime: &B) -> vortex_error::VortexResult -impl vortex_layout::scan::scan_builder::ScanBuilder +pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_array_stream(self) -> vortex_error::VortexResult -pub fn vortex_layout::scan::scan_builder::ScanBuilder::build(self) -> vortex_error::VortexResult>>>> +pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_iter(self, runtime: &B) -> vortex_error::VortexResult> + 'static> -pub fn vortex_layout::scan::scan_builder::ScanBuilder::concurrency(&self) -> usize +pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_stream(self) -> vortex_error::VortexResult> + core::marker::Send + 'static> -pub fn vortex_layout::scan::scan_builder::ScanBuilder::dtype(&self) -> vortex_error::VortexResult +pub fn vortex_layout::scan::scan_builder::ScanBuilder::new(session: vortex_session::VortexSession, layout_reader: alloc::sync::Arc) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_iter(self, runtime: &B) -> vortex_error::VortexResult> + 'static> +pub fn vortex_layout::scan::scan_builder::ScanBuilder::ordered(&self) -> bool -pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_stream(self) -> vortex_error::VortexResult> + core::marker::Send + 'static + use> +pub fn vortex_layout::scan::scan_builder::ScanBuilder::prepare(self) -> vortex_error::VortexResult -pub fn vortex_layout::scan::scan_builder::ScanBuilder::map(self, map_fn: impl core::ops::function::Fn(A) -> vortex_error::VortexResult + 'static + core::marker::Send + core::marker::Sync) -> vortex_layout::scan::scan_builder::ScanBuilder +pub fn vortex_layout::scan::scan_builder::ScanBuilder::session(&self) -> &vortex_session::VortexSession -pub fn vortex_layout::scan::scan_builder::ScanBuilder::ordered(&self) -> bool +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_concurrency(self, concurrency: usize) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::prepare(self) -> vortex_error::VortexResult> +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_filter(self, filter: vortex_array::expr::expression::Expression) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::session(&self) -> &vortex_session::VortexSession +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_limit(self, limit: u64) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_concurrency(self, concurrency: usize) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_metrics_registry(self, metrics: alloc::sync::Arc) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_filter(self, filter: vortex_array::expr::expression::Expression) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_ordered(self, ordered: bool) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_limit(self, limit: u64) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_projection(self, projection: vortex_array::expr::expression::Expression) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_metrics_registry(self, metrics: alloc::sync::Arc) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_row_indices(self, row_indices: vortex_buffer::buffer::Buffer) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_ordered(self, ordered: bool) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_row_offset(self, row_offset: u64) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_projection(self, projection: vortex_array::expr::expression::Expression) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_row_range(self, row_range: core::ops::range::Range) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_row_indices(self, row_indices: vortex_buffer::buffer::Buffer) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_selection(self, selection: vortex_scan::selection::Selection) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_row_offset(self, row_offset: u64) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_some_filter(self, filter: core::option::Option) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_row_range(self, row_range: core::ops::range::Range) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_some_limit(self, limit: core::option::Option) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_selection(self, selection: vortex_scan::selection::Selection) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_some_metrics_registry(self, metrics: core::option::Option>) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_some_filter(self, filter: core::option::Option) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_split_by(self, split_by: vortex_layout::scan::split_by::SplitBy) -> Self -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_some_limit(self, limit: core::option::Option) -> Self +impl vortex_layout::scan::scan_builder::ScanBuilder -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_some_metrics_registry(self, metrics: core::option::Option>) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_record_batch_reader(self, schema: arrow_schema::schema::SchemaRef, runtime: &B) -> vortex_error::VortexResult -pub fn vortex_layout::scan::scan_builder::ScanBuilder::with_split_by(self, split_by: vortex_layout::scan::split_by::SplitBy) -> Self +pub fn vortex_layout::scan::scan_builder::ScanBuilder::into_record_batch_stream(self, schema: arrow_schema::schema::SchemaRef) -> vortex_error::VortexResult> + core::marker::Send + 'static> pub fn vortex_layout::scan::scan_builder::filter_and_projection_masks(projection: &vortex_array::expr::expression::Expression, filter: core::option::Option<&vortex_array::expr::expression::Expression>, dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<(alloc::vec::Vec, alloc::vec::Vec)> diff --git a/vortex-layout/src/scan/arrow.rs b/vortex-layout/src/scan/arrow.rs index 875db04e4df..2448c78412f 100644 --- a/vortex-layout/src/scan/arrow.rs +++ b/vortex-layout/src/scan/arrow.rs @@ -10,6 +10,7 @@ use arrow_schema::ArrowError; use arrow_schema::DataType; use arrow_schema::SchemaRef; use futures::Stream; +use futures::StreamExt; use futures::TryStreamExt; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; @@ -20,7 +21,7 @@ use vortex_io::runtime::BlockingRuntime; use crate::scan::scan_builder::ScanBuilder; -impl ScanBuilder { +impl ScanBuilder { /// Creates a new `RecordBatchReader` from the scan builder. /// /// The `schema` parameter is used to define the schema of the resulting record batches. In @@ -35,11 +36,11 @@ impl ScanBuilder { let session = self.session().clone(); let iter = self + .into_iter(runtime)? .map(move |chunk| { let mut ctx = session.create_execution_ctx(); - to_record_batch(chunk, &data_type, &mut ctx) + chunk.and_then(|chunk| to_record_batch(chunk, &data_type, &mut ctx)) }) - .into_iter(runtime)? .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e)))); Ok(RecordBatchIteratorAdapter { iter, schema }) @@ -53,11 +54,11 @@ impl ScanBuilder { let session = self.session().clone(); let stream = self + .into_stream()? .map(move |chunk| { let mut ctx = session.create_execution_ctx(); - to_record_batch(chunk, &data_type, &mut ctx) + chunk.and_then(|chunk| to_record_batch(chunk, &data_type, &mut ctx)) }) - .into_stream()? .map_err(|e| ArrowError::ExternalError(Box::new(e))); Ok(stream) diff --git a/vortex-layout/src/scan/limit.rs b/vortex-layout/src/scan/limit.rs new file mode 100644 index 00000000000..cc99d0f6c6b --- /dev/null +++ b/vortex-layout/src/scan/limit.rs @@ -0,0 +1,74 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use futures::Stream; +use futures::StreamExt; +use futures::stream; +use futures::stream::BoxStream; +use vortex_array::ArrayRef; +use vortex_error::VortexResult; + +pub(crate) fn limit_array_stream( + stream: S, + limit: Option, +) -> BoxStream<'static, VortexResult> +where + S: Stream> + Send + 'static, +{ + match limit { + Some(limit) => RowLimitedStream::new(stream.boxed(), limit).boxed(), + None => stream.boxed(), + } +} + +struct RowLimitedStream { + inner: BoxStream<'static, VortexResult>, + remaining: u64, +} + +impl RowLimitedStream { + fn new(inner: BoxStream<'static, VortexResult>, remaining: u64) -> Self { + Self { inner, remaining } + } + + fn abort_pending(&mut self) { + let inner = std::mem::replace(&mut self.inner, stream::empty().boxed()); + drop(inner); + } +} + +impl Stream for RowLimitedStream { + type Item = VortexResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.remaining == 0 { + return Poll::Ready(None); + } + + match self.inner.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { + let chunk_len = chunk.len() as u64; + if chunk_len <= self.remaining { + self.remaining -= chunk_len; + if self.remaining == 0 { + self.abort_pending(); + } + Poll::Ready(Some(Ok(chunk))) + } else { + let limit = match usize::try_from(self.remaining) { + Ok(limit) => limit, + Err(_) => unreachable!("remaining rows cannot exceed the current chunk"), + }; + self.remaining = 0; + self.abort_pending(); + Poll::Ready(Some(chunk.slice(0..limit))) + } + } + other => other, + } + } +} diff --git a/vortex-layout/src/scan/mod.rs b/vortex-layout/src/scan/mod.rs index 98fd1918a42..e08e4cb3f31 100644 --- a/vortex-layout/src/scan/mod.rs +++ b/vortex-layout/src/scan/mod.rs @@ -4,6 +4,7 @@ pub mod arrow; mod filter; pub mod layout; +mod limit; pub mod multi; pub mod repeated_scan; pub mod scan_builder; diff --git a/vortex-layout/src/scan/repeated_scan.rs b/vortex-layout/src/scan/repeated_scan.rs index 1d5caaa66cf..461ec4d0637 100644 --- a/vortex-layout/src/scan/repeated_scan.rs +++ b/vortex-layout/src/scan/repeated_scan.rs @@ -6,8 +6,9 @@ use std::iter; use std::ops::Range; use std::sync::Arc; +use futures::FutureExt; use futures::Stream; -use futures::future::BoxFuture; +use futures::StreamExt; use itertools::Either; use itertools::Itertools; use vortex_array::ArrayRef; @@ -25,15 +26,17 @@ use vortex_session::VortexSession; use crate::LayoutReaderRef; use crate::scan::filter::FilterExpr; +use crate::scan::limit::limit_array_stream; use crate::scan::splits::Splits; use crate::scan::tasks::TaskContext; +use crate::scan::tasks::TaskFuture; use crate::scan::tasks::split_exec; /// A projected subset (by indices, range, and filter) of rows from a Vortex data source. /// /// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this /// data source. -pub struct RepeatedScan { +pub struct RepeatedScan { session: VortexSession, layout_reader: LayoutReaderRef, projection: Expression, @@ -47,15 +50,13 @@ pub struct RepeatedScan { splits: Splits, /// The number of splits to make progress on concurrently **per-thread**. concurrency: usize, - /// Function to apply to each [`ArrayRef`] within the spawned split tasks. - map_fn: Arc VortexResult + Send + Sync>, /// Maximal number of rows to read (after filtering) limit: Option, /// The dtype of the projected arrays. dtype: DType, } -impl RepeatedScan { +impl RepeatedScan { pub fn dtype(&self) -> &DType { &self.dtype } @@ -79,9 +80,6 @@ impl RepeatedScan { let stream = self.execute_stream(row_range)?; Ok(ArrayStreamAdapter::new(dtype, stream)) } -} - -impl RepeatedScan { /// Constructor just to allow `scan_builder` to create a `RepeatedScan`. #[expect( clippy::too_many_arguments, @@ -97,7 +95,6 @@ impl RepeatedScan { selection: Selection, splits: Splits, concurrency: usize, - map_fn: Arc VortexResult + Send + Sync>, limit: Option, dtype: DType, ) -> Self { @@ -111,33 +108,21 @@ impl RepeatedScan { selection, splits, concurrency, - map_fn, limit, dtype, } } - pub fn execute( - &self, - row_range: Option>, - ) -> VortexResult>>>> { - let ctx = Arc::new(TaskContext { - selection: self.selection.clone(), - filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))), - reader: Arc::clone(&self.layout_reader), - projection: self.projection.clone(), - mapper: Arc::clone(&self.map_fn), - }); - + fn split_ranges(&self, row_range: Option>) -> Vec> { let row_range = intersect_ranges(self.row_range.as_ref(), row_range); - let ranges = match &self.splits { + match &self.splits { Splits::Natural(btree_set) => { let splits_iter = match row_range { None => Either::Left(btree_set.iter().copied()), Some(range) => { if range.is_empty() { - return Ok(Vec::new()); + return Vec::new(); } Either::Right( iter::once(range.start) @@ -147,27 +132,45 @@ impl RepeatedScan { } }; - Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end)) + splits_iter + .tuple_windows() + .map(|(start, end)| start..end) + .collect() } - Splits::Ranges(ranges) => Either::Right(match row_range { - None => Either::Left(ranges.iter().cloned()), + Splits::Ranges(ranges) => match row_range { + None => ranges.to_vec(), Some(range) => { if range.is_empty() { - return Ok(Vec::new()); + return Vec::new(); } - Either::Right(ranges.iter().filter_map(move |r| { - let start = cmp::max(r.start, range.start); - let end = cmp::min(r.end, range.end); - (start < end).then_some(start..end) - })) + ranges + .iter() + .filter_map(move |r| { + let start = cmp::max(r.start, range.start); + let end = cmp::min(r.end, range.end); + (start < end).then_some(start..end) + }) + .collect() } - }), - }; + }, + } + } + + pub(crate) fn execute( + &self, + row_range: Option>, + ) -> VortexResult>>> { + let ctx = Arc::new(TaskContext { + selection: self.selection.clone(), + filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))), + reader: Arc::clone(&self.layout_reader), + projection: self.projection.clone(), + }); let mut limit = self.limit; let mut tasks = Vec::new(); - for range in ranges { + for range in self.split_ranges(row_range) { if range.start >= range.end { continue; } @@ -182,27 +185,58 @@ impl RepeatedScan { Ok(tasks) } - pub fn execute_stream( + pub(crate) fn execute_stream( &self, row_range: Option>, - ) -> VortexResult> + Send + 'static + use> { - use futures::StreamExt; + ) -> VortexResult> + Send + 'static> { let num_workers = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); - let concurrency = self.concurrency * num_workers; let handle = self.session.handle(); + if self.filter.is_some() && self.limit.is_some() { + let ctx = Arc::new(TaskContext { + selection: self.selection.clone(), + filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))), + reader: Arc::clone(&self.layout_reader), + projection: self.projection.clone(), + }); + let ordered = self.ordered; + let limit = self.limit; + let stream = futures::stream::iter(self.split_ranges(row_range)).map(move |range| { + let ctx = Arc::clone(&ctx); + let handle = handle.clone(); + async move { + let task = split_exec(ctx, range, None)?; + handle.spawn(task).await + } + .boxed() + }); + let stream = if ordered { + stream.buffered(1).boxed() + } else { + stream.buffer_unordered(1).boxed() + }; + + return Ok(limit_array_stream( + stream.filter_map(|chunk| async move { chunk.transpose() }), + limit, + )); + } + let stream = futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task)); - + let concurrency = self.concurrency * num_workers; let stream = if self.ordered { stream.buffered(concurrency).boxed() } else { stream.buffer_unordered(concurrency).boxed() }; - Ok(stream.filter_map(|chunk| async move { chunk.transpose() })) + Ok(limit_array_stream( + stream.filter_map(|chunk| async move { chunk.transpose() }), + self.limit, + )) } } diff --git a/vortex-layout/src/scan/scan_builder.rs b/vortex-layout/src/scan/scan_builder.rs index 6053cd6ebf5..25203f6aa74 100644 --- a/vortex-layout/src/scan/scan_builder.rs +++ b/vortex-layout/src/scan/scan_builder.rs @@ -10,7 +10,6 @@ use std::task::ready; use futures::Stream; use futures::StreamExt; -use futures::future::BoxFuture; use futures::stream::BoxStream; use itertools::Itertools; use vortex_array::ArrayRef; @@ -24,15 +23,12 @@ use vortex_array::expr::analysis::immediate_access::immediate_scope_access; use vortex_array::expr::root; use vortex_array::iter::ArrayIterator; use vortex_array::iter::ArrayIteratorAdapter; -use vortex_array::stats::StatsSet; use vortex_array::stream::ArrayStream; use vortex_array::stream::ArrayStreamAdapter; use vortex_buffer::Buffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; -use vortex_error::vortex_bail; use vortex_io::runtime::BlockingRuntime; -use vortex_io::runtime::Handle; use vortex_io::runtime::Task; use vortex_io::session::RuntimeSessionExt; use vortex_metrics::MetricsRegistry; @@ -48,7 +44,7 @@ use crate::scan::splits::Splits; use crate::scan::splits::attempt_split_ranges; /// A struct for building a scan operation. -pub struct ScanBuilder { +pub struct ScanBuilder { session: VortexSession, layout_reader: LayoutReaderRef, projection: Expression, @@ -64,11 +60,7 @@ pub struct ScanBuilder { split_by: SplitBy, /// The number of splits to make progress on concurrently **per-thread**. concurrency: usize, - /// Function to apply to each [`ArrayRef`] within the spawned split tasks. - map_fn: Arc VortexResult + Send + Sync>, metrics_registry: Option>, - /// Should we try to prune the file (using stats) on open. - file_stats: Option>, /// Maximal number of rows to read (after filtering) limit: Option, /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression, @@ -76,7 +68,7 @@ pub struct ScanBuilder { row_offset: u64, } -impl ScanBuilder { +impl ScanBuilder { pub fn new(session: VortexSession, layout_reader: Arc) -> Self { Self { session, @@ -90,9 +82,7 @@ impl ScanBuilder { // We default to four tasks per worker thread, which allows for some I/O lookahead // without too much impact on work-stealing. concurrency: 4, - map_fn: Arc::new(Ok), metrics_registry: None, - file_stats: None, limit: None, row_offset: 0, } @@ -119,9 +109,7 @@ impl ScanBuilder { runtime.block_on_stream(stream), )) } -} -impl ScanBuilder { pub fn with_filter(mut self, filter: Expression) -> Self { self.filter = Some(filter); self @@ -213,37 +201,9 @@ impl ScanBuilder { &self.session } - /// Map each split of the scan. The function will be run on the spawned task. - pub fn map( - self, - map_fn: impl Fn(A) -> VortexResult + 'static + Send + Sync, - ) -> ScanBuilder { - let old_map_fn = self.map_fn; - ScanBuilder { - session: self.session, - layout_reader: self.layout_reader, - projection: self.projection, - filter: self.filter, - ordered: self.ordered, - row_range: self.row_range, - selection: self.selection, - split_by: self.split_by, - concurrency: self.concurrency, - metrics_registry: self.metrics_registry, - file_stats: self.file_stats, - limit: self.limit, - row_offset: self.row_offset, - map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)), - } - } - - pub fn prepare(self) -> VortexResult> { + pub fn prepare(self) -> VortexResult { let dtype = self.dtype()?; - if self.filter.is_some() && self.limit.is_some() { - vortex_bail!("Vortex doesn't support scans with both a filter and a limit") - } - // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform // conjunction splitting if a filter is provided. let mut layout_reader = self.layout_reader; @@ -295,26 +255,15 @@ impl ScanBuilder { self.selection, splits, self.concurrency, - self.map_fn, self.limit, dtype, )) } - /// Constructs a task per row split of the scan, returned as a vector of futures. - pub fn build(self) -> VortexResult>>>> { - // The ultimate short circuit - if self.limit.is_some_and(|l| l == 0) { - return Ok(vec![]); - } - - self.prepare()?.execute(None) - } - /// Returns a [`Stream`] with tasks spawned onto the session's runtime handle. pub fn into_stream( self, - ) -> VortexResult> + Send + 'static + use> { + ) -> VortexResult> + Send + 'static> { Ok(LazyScanStream::new(self)) } @@ -322,84 +271,55 @@ impl ScanBuilder { pub fn into_iter( self, runtime: &B, - ) -> VortexResult> + 'static> { + ) -> VortexResult> + 'static> { let stream = self.into_stream()?; Ok(runtime.block_on_stream(stream)) } } -enum LazyScanState { - Builder(Option>>), - Preparing(PreparingScan), - Stream(BoxStream<'static, VortexResult>), +enum LazyScanState { + Builder(Option>), + Preparing(PreparingScan), + Stream(BoxStream<'static, VortexResult>), Error(Option), } -type PreparedScanTasks = Vec>>>; - -struct PreparingScan { - ordered: bool, - concurrency: usize, - handle: Handle, - task: Task>>, +struct PreparingScan { + task: Task>, } -struct LazyScanStream { - state: LazyScanState, +struct LazyScanStream { + state: LazyScanState, } -impl LazyScanStream { - fn new(builder: ScanBuilder) -> Self { +impl LazyScanStream { + fn new(builder: ScanBuilder) -> Self { Self { state: LazyScanState::Builder(Some(Box::new(builder))), } } } -impl Unpin for LazyScanStream {} +impl Unpin for LazyScanStream {} -impl Stream for LazyScanStream { - type Item = VortexResult; +impl Stream for LazyScanStream { + type Item = VortexResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match &mut self.state { LazyScanState::Builder(builder) => { let builder = builder.take().vortex_expect("polled after completion"); - let ordered = builder.ordered; - let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - let concurrency = builder.concurrency * num_workers; let handle = builder.session.handle(); - let task = handle.spawn_blocking(move || { - builder.prepare().and_then(|scan| scan.execute(None)) - }); - self.state = LazyScanState::Preparing(PreparingScan { - ordered, - concurrency, - handle, - task, - }); + let task = handle.spawn_blocking(move || builder.prepare()); + self.state = LazyScanState::Preparing(PreparingScan { task }); } LazyScanState::Preparing(preparing) => { match ready!(Pin::new(&mut preparing.task).poll(cx)) { - Ok(tasks) => { - let ordered = preparing.ordered; - let concurrency = preparing.concurrency; - let handle = preparing.handle.clone(); - let stream = - futures::stream::iter(tasks).map(move |task| handle.spawn(task)); - let stream = if ordered { - stream.buffered(concurrency).boxed() - } else { - stream.buffer_unordered(concurrency).boxed() - }; - let stream = stream - .filter_map(|chunk| async move { chunk.transpose() }) - .boxed(); - self.state = LazyScanState::Stream(stream); - } + Ok(scan) => match scan.execute_stream(None) { + Ok(stream) => self.state = LazyScanState::Stream(stream.boxed()), + Err(err) => self.state = LazyScanState::Error(Some(err)), + }, Err(err) => self.state = LazyScanState::Error(Some(err)), } } @@ -464,6 +384,7 @@ mod test { use futures::Stream; use futures::task::noop_waker_ref; use parking_lot::Mutex; + use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::MaskFuture; #[expect(deprecated)] @@ -474,6 +395,7 @@ mod test { use vortex_array::dtype::Nullability; use vortex_array::dtype::PType; use vortex_array::expr::Expression; + use vortex_array::expr::root; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_io::runtime::BlockingRuntime; @@ -684,6 +606,166 @@ mod test { Ok(()) } + #[derive(Debug)] + struct FilteringLayoutReader { + name: Arc, + dtype: DType, + row_count: u64, + keep_row: fn(u64) -> bool, + } + + impl FilteringLayoutReader { + fn new(row_count: u64, keep_row: fn(u64) -> bool) -> Self { + Self { + name: Arc::from("filtering"), + dtype: DType::Primitive(PType::I32, Nullability::NonNullable), + row_count, + keep_row, + } + } + } + + impl LayoutReader for FilteringLayoutReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + for split in ((row_range.start + 2)..row_range.end).step_by(2) { + splits.insert(split); + } + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + row_range: &Range, + _expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let row_range = row_range.clone(); + let keep_row = self.keep_row; + let row_count = usize::try_from(row_range.end - row_range.start) + .map_err(|_| vortex_err!("row range must fit in usize"))?; + + Ok(MaskFuture::new(row_count, async move { + let input_mask = mask.await?; + let filtered = (row_range.start..row_range.end) + .enumerate() + .map(|(idx, row)| input_mask.value(idx) && keep_row(row)); + Ok(Mask::from_iter(filtered)) + })) + } + + fn projection_evaluation( + &self, + row_range: &Range, + _expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let row_range = row_range.clone(); + + Ok(Box::pin(async move { + let start = i32::try_from(row_range.start) + .map_err(|_| vortex_err!("row_range.start must fit in i32"))?; + let end = i32::try_from(row_range.end) + .map_err(|_| vortex_err!("row_range.end must fit in i32"))?; + + let array = PrimitiveArray::from_iter(start..end).into_array(); + array.filter(mask.await?) + })) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + } + + fn collect_scan_values(iter: I) -> VortexResult> + where + I: IntoIterator>, + { + let mut values = Vec::new(); + for chunk in iter { + #[expect(deprecated)] + let primitive = chunk?.to_primitive(); + values.extend(primitive.into_buffer::()); + } + Ok(values) + } + + fn drain_runtime(runtime: &SingleThreadRuntime) { + for _ in 0..4 { + let mut yielded = false; + runtime.block_on(futures::future::poll_fn(move |cx| { + if yielded { + Poll::Ready(()) + } else { + yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + })); + } + } + + #[test] + fn into_stream_limits_filtered_results() -> VortexResult<()> { + let runtime = SingleThreadRuntime::default(); + let session = crate::scan::test::session_with_handle(runtime.handle()); + let reader = Arc::new(FilteringLayoutReader::new(8, |_| true)); + + let stream = ScanBuilder::new(session, reader) + .with_filter(root()) + .with_limit(3) + .into_stream()?; + let values = collect_scan_values(runtime.block_on_stream(stream))?; + drain_runtime(&runtime); + + assert_eq!(values, [0, 1, 2]); + Ok(()) + } + + #[test] + fn prepared_scan_limits_filtered_results() -> VortexResult<()> { + let runtime = SingleThreadRuntime::default(); + let session = crate::scan::test::session_with_handle(runtime.handle()); + let reader = Arc::new(FilteringLayoutReader::new(8, |row| row % 2 == 1)); + + let scan = ScanBuilder::new(session, reader) + .with_filter(root()) + .with_limit(3) + .prepare()?; + let values = collect_scan_values(scan.execute_array_iter(None, &runtime)?)?; + drain_runtime(&runtime); + + assert_eq!(values, [1, 3, 5]); + Ok(()) + } + #[derive(Debug)] struct BlockingSplitsLayoutReader { name: Arc, diff --git a/vortex-layout/src/scan/tasks.rs b/vortex-layout/src/scan/tasks.rs index dc0b489b1f2..9606fa7b5db 100644 --- a/vortex-layout/src/scan/tasks.rs +++ b/vortex-layout/src/scan/tasks.rs @@ -21,7 +21,7 @@ use vortex_scan::selection::Selection; use crate::LayoutReader; use crate::scan::filter::FilterExpr; -pub type TaskFuture = BoxFuture<'static, VortexResult>; +pub type TaskFuture = BoxFuture<'static, VortexResult>; /// Logic for executing a single split reading task. /// @@ -33,13 +33,12 @@ pub type TaskFuture = BoxFuture<'static, VortexResult>; /// The intersected row range is then further reduced via expression-based pruning. After pruning /// has eliminated more blocks, the full filter is executed over the remainder of the split. /// -/// This mask is then provided to the reader to perform a filtered projection over the split data, -/// finally mapping the Vortex columnar record batches into some result type `A`. -pub fn split_exec( - ctx: Arc>, +/// This mask is then provided to the reader to perform a filtered projection over the split data. +pub fn split_exec( + ctx: Arc, split: Range, limit: Option<&mut u64>, -) -> VortexResult>> { +) -> VortexResult>> { // Apply the selection to calculate a read mask let read_mask = ctx.selection.row_mask(&split); let row_range = read_mask.row_range(); @@ -141,22 +140,20 @@ pub fn split_exec( ctx.reader .projection_evaluation(&row_range, &ctx.projection, filter_mask.clone())?; - let mapper = Arc::clone(&ctx.mapper); let array_fut = async move { let mask = filter_mask.await?; if mask.all_false() { return Ok(None); } - let array = projection_future.await?; - mapper(array).map(Some) + projection_future.await.map(Some) }; Ok(array_fut.boxed()) } /// Information needed to execute a single split task. -pub struct TaskContext { +pub struct TaskContext { /// A row selection to apply. pub selection: Selection, /// The shared filter expression. @@ -165,6 +162,4 @@ pub struct TaskContext { pub reader: Arc, /// The projection expression to apply to gather the scanned rows. pub projection: Expression, - /// Function that maps into an A. - pub mapper: Arc VortexResult + Send + Sync>, } diff --git a/vortex-python/src/file.rs b/vortex-python/src/file.rs index 458736db0e5..488f26f70c5 100644 --- a/vortex-python/src/file.rs +++ b/vortex-python/src/file.rs @@ -202,7 +202,7 @@ impl PyVortexFile { limit: Option, indices: Option, batch_size: Option, - ) -> VortexResult> { + ) -> VortexResult { let mut builder = self .vxf .scan()? diff --git a/vortex-python/src/scan.rs b/vortex-python/src/scan.rs index 1fd53d96131..9fae217147b 100644 --- a/vortex-python/src/scan.rs +++ b/vortex-python/src/scan.rs @@ -3,7 +3,6 @@ use pyo3::exceptions::PyIndexError; use pyo3::prelude::*; -use vortex::array::ArrayRef; use vortex::array::LEGACY_SESSION; use vortex::array::VortexSessionExecute; use vortex::layout::scan::repeated_scan::RepeatedScan; @@ -26,7 +25,7 @@ pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { #[pyclass(name = "RepeatedScan", module = "vortex", frozen)] pub struct PyRepeatedScan { - pub scan: RepeatedScan, + pub scan: RepeatedScan, pub row_count: u64, } From a748b68a8bb37ca274cc04343b6a50c2767b561d Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 20 Apr 2026 12:55:39 +0100 Subject: [PATCH 2/4] schedule work Signed-off-by: Adam Gutglick --- vortex-cxx/src/read.rs | 19 +------------------ vortex-datafusion/src/persistent/opener.rs | 15 +++++++++++---- vortex-datafusion/src/v2/source.rs | 6 +++--- vortex-layout/src/scan/arrow.rs | 14 ++++++++++++-- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/vortex-cxx/src/read.rs b/vortex-cxx/src/read.rs index b060c607cca..55e86f57640 100644 --- a/vortex-cxx/src/read.rs +++ b/vortex-cxx/src/read.rs @@ -4,18 +4,11 @@ use std::sync::Arc; use anyhow::Result; -use arrow_array::RecordBatch; use arrow_array::RecordBatchReader; -use arrow_array::cast::AsArray; use arrow_array::ffi::FFI_ArrowSchema; use arrow_array::ffi_stream::FFI_ArrowArrayStream; -use arrow_schema::ArrowError; -use arrow_schema::DataType; use arrow_schema::Schema; use arrow_schema::SchemaRef; -use futures::StreamExt; -use futures::stream::TryStreamExt; -use vortex::array::arrow::IntoArrowArray; use vortex::buffer::Buffer; use vortex::file::OpenOptionsSessionExt; use vortex::io::runtime::BlockingRuntime; @@ -158,19 +151,9 @@ pub(crate) fn scan_builder_into_threadsafe_cloneable_reader( Arc::new(arrow_schema) } }; - let data_type = DataType::Struct(schema.fields().clone()); - let stream = builder .inner - .into_stream()? - .map(move |result| { - result.and_then(|chunk| { - chunk - .into_arrow(&data_type) - .map(|struct_array| RecordBatch::from(struct_array.as_struct())) - }) - }) - .map_err(|e| ArrowError::ExternalError(Box::new(e))); + .into_record_batch_stream(Arc::clone(&schema))?; let iter = RUNTIME.block_on_stream_thread_safe(|_h| stream); let rbr = RecordBatchIteratorAdapter::new(iter, schema); diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 6675c4b8267..3ed93fe49e2 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -37,6 +37,7 @@ use vortex::array::arrow::ArrowArrayExecutor; use vortex::error::VortexError; use vortex::file::OpenOptionsSessionExt; use vortex::io::InstrumentedReadAt; +use vortex::io::session::RuntimeSessionExt; use vortex::layout::LayoutReader; use vortex::layout::scan::scan_builder::ScanBuilder; use vortex::metrics::Label; @@ -362,6 +363,7 @@ impl FileOpener for VortexOpener { } let stream_schema = Arc::new(stream_schema); + let handle = session.handle(); let stream = scan_builder .with_metrics_registry(metrics_registry) @@ -370,10 +372,15 @@ impl FileOpener for VortexOpener { .with_ordered(has_output_ordering) .into_stream() .map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))? - .map(move |chunk| { - let mut ctx = session.create_execution_ctx(); - chunk.and_then(|chunk| { - chunk.execute_record_batch(stream_schema.as_ref(), &mut ctx) + .then(move |chunk| { + let session = session.clone(); + let stream_schema = Arc::clone(&stream_schema); + let handle = handle.clone(); + handle.spawn_blocking(move || { + let mut ctx = session.create_execution_ctx(); + chunk.and_then(|chunk| { + chunk.execute_record_batch(stream_schema.as_ref(), &mut ctx) + }) }) }) .map_ok(move |rb| { diff --git a/vortex-datafusion/src/v2/source.rs b/vortex-datafusion/src/v2/source.rs index 76a78845bab..07bef39745f 100644 --- a/vortex-datafusion/src/v2/source.rs +++ b/vortex-datafusion/src/v2/source.rs @@ -429,15 +429,15 @@ impl DataSource for VortexDataSource { let handle = session.handle(); let stream = scan_streams .try_flatten_unordered(Some(num_partitions.get() * 2)) - .map(move |result| { + .then(move |result| { let session = session.clone(); let schema = Arc::clone(&projected_schema); - handle.spawn_cpu(move || { + let handle = handle.clone(); + handle.spawn_blocking(move || { let mut ctx = session.create_execution_ctx(); result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx)) }) }) - .buffered(num_partitions.get()) .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e)))); // Apply leftover projection (expressions that couldn't be pushed into Vortex). diff --git a/vortex-layout/src/scan/arrow.rs b/vortex-layout/src/scan/arrow.rs index 2448c78412f..054f8615902 100644 --- a/vortex-layout/src/scan/arrow.rs +++ b/vortex-layout/src/scan/arrow.rs @@ -18,6 +18,7 @@ use vortex_array::VortexSessionExecute; use vortex_array::arrow::ArrowArrayExecutor; use vortex_error::VortexResult; use vortex_io::runtime::BlockingRuntime; +use vortex_io::session::RuntimeSessionExt; use crate::scan::scan_builder::ScanBuilder; @@ -52,13 +53,22 @@ impl ScanBuilder { ) -> VortexResult> + Send + 'static> { let data_type = DataType::Struct(schema.fields().clone()); let session = self.session().clone(); + let handle = session.handle(); + let concurrency = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); let stream = self .into_stream()? .map(move |chunk| { - let mut ctx = session.create_execution_ctx(); - chunk.and_then(|chunk| to_record_batch(chunk, &data_type, &mut ctx)) + let session = session.clone(); + let data_type = data_type.clone(); + handle.spawn_blocking(move || { + let mut ctx = session.create_execution_ctx(); + chunk.and_then(|chunk| to_record_batch(chunk, &data_type, &mut ctx)) + }) }) + .buffered(concurrency) .map_err(|e| ArrowError::ExternalError(Box::new(e))); Ok(stream) From 0374b7fd42b92f3b5e58b7bdee3f63a2b5ddb93b Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 20 Apr 2026 12:57:53 +0100 Subject: [PATCH 3/4] layout Signed-off-by: Adam Gutglick --- vortex-layout/src/scan/arrow.rs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/vortex-layout/src/scan/arrow.rs b/vortex-layout/src/scan/arrow.rs index 054f8615902..2448c78412f 100644 --- a/vortex-layout/src/scan/arrow.rs +++ b/vortex-layout/src/scan/arrow.rs @@ -18,7 +18,6 @@ use vortex_array::VortexSessionExecute; use vortex_array::arrow::ArrowArrayExecutor; use vortex_error::VortexResult; use vortex_io::runtime::BlockingRuntime; -use vortex_io::session::RuntimeSessionExt; use crate::scan::scan_builder::ScanBuilder; @@ -53,22 +52,13 @@ impl ScanBuilder { ) -> VortexResult> + Send + 'static> { let data_type = DataType::Struct(schema.fields().clone()); let session = self.session().clone(); - let handle = session.handle(); - let concurrency = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); let stream = self .into_stream()? .map(move |chunk| { - let session = session.clone(); - let data_type = data_type.clone(); - handle.spawn_blocking(move || { - let mut ctx = session.create_execution_ctx(); - chunk.and_then(|chunk| to_record_batch(chunk, &data_type, &mut ctx)) - }) + let mut ctx = session.create_execution_ctx(); + chunk.and_then(|chunk| to_record_batch(chunk, &data_type, &mut ctx)) }) - .buffered(concurrency) .map_err(|e| ArrowError::ExternalError(Box::new(e))); Ok(stream) From 17fa9ff92263afe4a36878eff04d729e05b36b23 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 20 Apr 2026 16:32:55 +0100 Subject: [PATCH 4/4] bufferd batch Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/opener.rs | 60 ++++++++++++---------- vortex-datafusion/src/persistent/stream.rs | 10 ++-- vortex-datafusion/src/v2/source.rs | 3 +- 3 files changed, 43 insertions(+), 30 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 3ed93fe49e2..fbeaaa094b6 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -9,6 +9,7 @@ use arrow_schema::Schema; use datafusion_common::DataFusionError; use datafusion_common::Result as DFResult; use datafusion_common::ScalarValue; +use datafusion_common::arrow::array::RecordBatch; use datafusion_common::exec_datafusion_err; use datafusion_datasource::FileRange; use datafusion_datasource::PartitionedFile; @@ -364,6 +365,7 @@ impl FileOpener for VortexOpener { let stream_schema = Arc::new(stream_schema); let handle = session.handle(); + let file_location = file.object_meta.location.clone(); let stream = scan_builder .with_metrics_registry(metrics_registry) @@ -372,7 +374,7 @@ impl FileOpener for VortexOpener { .with_ordered(has_output_ordering) .into_stream() .map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))? - .then(move |chunk| { + .map(move |chunk| { let session = session.clone(); let stream_schema = Arc::clone(&stream_schema); let handle = handle.clone(); @@ -383,33 +385,11 @@ impl FileOpener for VortexOpener { }) }) }) + .buffered(2) .map_ok(move |rb| { - // We try and slice the stream into respecting datafusion's configured batch size. - stream::iter( - (0..rb.num_rows().div_ceil(batch_size * 2)) - .flat_map(move |block_idx| { - let offset = block_idx * batch_size * 2; - - // If we have less than two batches worth of rows left, we keep them together as a single batch. - if rb.num_rows() - offset < 2 * batch_size { - let length = rb.num_rows() - offset; - [Some(rb.slice(offset, length)), None].into_iter() - } else { - let first = rb.slice(offset, batch_size); - let second = rb.slice(offset + batch_size, batch_size); - [Some(first), Some(second)].into_iter() - } - }) - .flatten() - .map(Ok), - ) - }) - .map_err(move |e: VortexError| { - DataFusionError::External(Box::new(e.with_context(format!( - "Failed to read Vortex file: {}", - file.object_meta.location - )))) + stream::iter(split_record_batch(rb, batch_size).into_iter().map(Ok)) }) + .map_err(move |e: VortexError| vortex_file_read_error(&file_location, e)) .try_flatten() .map(move |batch| { if projector.projection().as_ref().is_empty() { @@ -460,6 +440,33 @@ fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u start_row..u64::min(row_count, end_row) } +fn split_record_batch(rb: RecordBatch, batch_size: usize) -> Vec { + assert!(batch_size > 0, "batch size must be positive"); + + let mut batches = Vec::new(); + let mut offset = 0; + + while offset < rb.num_rows() { + let remaining = rb.num_rows() - offset; + if remaining < 2 * batch_size { + batches.push(rb.slice(offset, remaining)); + break; + } + + batches.push(rb.slice(offset, batch_size)); + batches.push(rb.slice(offset + batch_size, batch_size)); + offset += batch_size * 2; + } + + batches +} + +fn vortex_file_read_error(path: &Path, error: VortexError) -> DataFusionError { + DataFusionError::External(Box::new( + error.with_context(format!("Failed to read Vortex file: {path}")), + )) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -486,6 +493,7 @@ mod tests { use datafusion_expr::Operator; use datafusion_physical_expr::expressions as df_expr; use datafusion_physical_expr::projection::ProjectionExpr; + use futures::TryStreamExt; use insta::assert_snapshot; use itertools::Itertools; use object_store::ObjectStore; diff --git a/vortex-datafusion/src/persistent/stream.rs b/vortex-datafusion/src/persistent/stream.rs index 52d9b7daecc..846291eb8fa 100644 --- a/vortex-datafusion/src/persistent/stream.rs +++ b/vortex-datafusion/src/persistent/stream.rs @@ -15,14 +15,14 @@ use futures::stream::BoxStream; /// Utility to end a stream early if its backing [`PartitionFile`] can be pruned away by an updated dynamic expression. pub(crate) struct PrunableStream { file_pruner: FilePruner, - stream: BoxStream<'static, DFResult>, + stream: Option>>, } impl PrunableStream { pub fn new(file_pruner: FilePruner, stream: BoxStream<'static, DFResult>) -> Self { Self { file_pruner, - stream, + stream: Some(stream), } } } @@ -32,9 +32,13 @@ impl Stream for PrunableStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.as_mut().file_pruner.should_prune()? { + self.stream.take(); Poll::Ready(None) } else { - self.stream.poll_next_unpin(cx) + match self.stream.as_mut() { + Some(stream) => stream.poll_next_unpin(cx), + None => Poll::Ready(None), + } } } } diff --git a/vortex-datafusion/src/v2/source.rs b/vortex-datafusion/src/v2/source.rs index 07bef39745f..b1dd5a89cc8 100644 --- a/vortex-datafusion/src/v2/source.rs +++ b/vortex-datafusion/src/v2/source.rs @@ -429,7 +429,7 @@ impl DataSource for VortexDataSource { let handle = session.handle(); let stream = scan_streams .try_flatten_unordered(Some(num_partitions.get() * 2)) - .then(move |result| { + .map(move |result| { let session = session.clone(); let schema = Arc::clone(&projected_schema); let handle = handle.clone(); @@ -438,6 +438,7 @@ impl DataSource for VortexDataSource { result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx)) }) }) + .buffered(2) .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e)))); // Apply leftover projection (expressions that couldn't be pushed into Vortex).