diff --git a/native/shuffle/src/bin/shuffle_bench.rs b/native/shuffle/src/bin/shuffle_bench.rs index bb8c2a0380..cc1c960848 100644 --- a/native/shuffle/src/bin/shuffle_bench.rs +++ b/native/shuffle/src/bin/shuffle_bench.rs @@ -277,6 +277,9 @@ fn print_shuffle_metrics(metrics: &MetricsSet, total_wall_time_secs: f64) { if let Some(nanos) = get_metric("repart_time") { println!(" repart time: {}", fmt_time(nanos)); } + if let Some(nanos) = get_metric("interleave_time") { + println!(" interleave time: {}", fmt_time(nanos)); + } if let Some(nanos) = get_metric("encode_time") { println!(" encode time: {}", fmt_time(nanos)); } diff --git a/native/shuffle/src/metrics.rs b/native/shuffle/src/metrics.rs index 1de751cf41..bda245fd93 100644 --- a/native/shuffle/src/metrics.rs +++ b/native/shuffle/src/metrics.rs @@ -27,6 +27,9 @@ pub(crate) struct ShufflePartitionerMetrics { /// Time to perform repartitioning pub(crate) repart_time: Time, + /// Time spent in `interleave_record_batch` gathering shuffled batches + pub(crate) interleave_time: Time, + /// Time encoding batches to IPC format pub(crate) encode_time: Time, @@ -51,6 +54,7 @@ impl ShufflePartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), + interleave_time: MetricBuilder::new(metrics).subset_time("interleave_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 7de9314f54..aa44cad4d5 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -434,10 +434,12 @@ impl MultiPartitionShuffleRepartitioner { Ok(()) } + #[allow(clippy::too_many_arguments)] fn shuffle_write_partition( partition_iter: &mut PartitionedBatchIterator, shuffle_block_writer: &mut ShuffleBlockWriter, output_data: &mut BufWriter, + interleave_time: &Time, encode_time: &Time, write_time: &Time, write_buffer_size: usize, @@ -449,7 +451,7 @@ impl MultiPartitionShuffleRepartitioner { write_buffer_size, batch_size, ); - for batch in partition_iter { + while let Some(batch) = partition_iter.next(interleave_time) { let batch = batch?; buf_batch_writer.write(&batch, encode_time, write_time)?; } @@ -573,7 +575,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { .open(data_file) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut output_data = BufWriter::new(output_data); + let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data); #[allow(clippy::needless_range_loop)] for i in 0..num_output_partitions { @@ -596,6 +598,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { &mut partition_iter, &mut self.shuffle_block_writer, &mut output_data, + &self.metrics.interleave_time, &self.metrics.encode_time, &self.metrics.write_time, self.write_buffer_size, diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 8309a8ed4a..c7f1781866 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -18,6 +18,7 @@ use arrow::array::RecordBatch; use arrow::compute::interleave_record_batch; use datafusion::common::DataFusionError; +use datafusion::physical_plan::metrics::Time; /// A helper struct to produce shuffled batches. /// This struct takes ownership of the buffered batches and partition indices from the @@ -85,19 +86,22 @@ impl<'a> PartitionedBatchIterator<'a> { pos: 0, } } -} - -impl Iterator for PartitionedBatchIterator<'_> { - type Item = datafusion::common::Result; - fn next(&mut self) -> Option { + /// Returns the next shuffled batch, recording the gather cost into `interleave_time`. + pub(crate) fn next( + &mut self, + interleave_time: &Time, + ) -> Option> { if self.pos >= self.indices.len() { return None; } let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; - match interleave_record_batch(&self.record_batches, indices) { + let mut timer = interleave_time.timer(); + let result = interleave_record_batch(&self.record_batches, indices); + timer.stop(); + match result { Ok(batch) => { self.pos = indices_end; Some(Ok(batch)) diff --git a/native/shuffle/src/writers/spill.rs b/native/shuffle/src/writers/spill.rs index c16caddbf9..624a45befe 100644 --- a/native/shuffle/src/writers/spill.rs +++ b/native/shuffle/src/writers/spill.rs @@ -83,7 +83,7 @@ impl PartitionWriter { write_buffer_size: usize, batch_size: usize, ) -> datafusion::common::Result { - if let Some(batch) = iter.next() { + if let Some(batch) = iter.next(&metrics.interleave_time) { self.ensure_spill_file_created(runtime)?; let total_bytes_written = { @@ -95,7 +95,7 @@ impl PartitionWriter { ); let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; - for batch in iter { + while let Some(batch) = iter.next(&metrics.interleave_time) { let batch = batch?; bytes_written += buf_batch_writer.write( &batch,