Skip to content

chore(shuffle): add interleave_time metric and specify buffer size for output_data buffer writer#4599

Open
wForget wants to merge 2 commits into
apache:mainfrom
wForget:shuffle-writer-chore
Open

chore(shuffle): add interleave_time metric and specify buffer size for output_data buffer writer#4599
wForget wants to merge 2 commits into
apache:mainfrom
wForget:shuffle-writer-chore

Conversation

@wForget
Copy link
Copy Markdown
Member

@wForget wForget commented Jun 5, 2026

Which issue does this PR close?

Closes #.

Rationale for this change

Two improvements to the native shuffle writer:

  1. Specify buffer size for output_data buffer writer: shuffle_write uses the default 8 KB buffer size for the output data buffer writer, which can lead to excessive system calls. This change explicitly sets write_buffer_size to improve write efficiency.
  2. Add interleave_time metric: interleave_record_batch accounts for a significant portion of the shuffle execution time, so I'd like to add metrics for it.

What changes are included in this PR?

This PR includes the following changes:

  • adds a new interleave_time metric to ShufflePartitionerMetrics
  • calculate interleave_time during the PartitionedBatchIterator.next call.
  • changes the final shuffle data file writer to use BufWriter::with_capacity(self.write_buffer_size, ...) instead of the default buffer size

How are these changes tested?

Run shuffle benchmark locally:

cargo flamegraph --root --release --features shuffle-bench --bin shuffle_bench -- \
  --input benchmark_data/lineitem.parquet \
  --partitions 200 \
  --codec lz4 \
  --hash-columns 0,3 \
  --memory-limit 2147483648 \
  --output-dir comet_shuffle_bench

Test for specify buffer size for output_data buffer writer
before:

=== Results ===
Write:
  avg time:         42.037s
  throughput:       1,426,973 rows/s (total across 1 tasks)
  output size:      3.80 GiB

Input Metrics (last iteration):
  time_elapsed_scanning_total: 239.870s
  time_elapsed_opening: 0.003s
  time_elapsed_processing: 7.355s
  metadata_load_time: 0.000s
  output_bytes: 33.88 GiB
  bloom_filter_eval_time: 0.000s
  output_rows: 119,972,104
  bytes_scanned: 2.37 GiB
  page_index_eval_time: 0.000s
  time_elapsed_scanning_until_data: 0.349s
  output_batches: 14,956
  statistics_eval_time: 0.000s
  elapsed_compute: 0.000s
  row_pushdown_eval_time: 0.000s

Shuffle Metrics (last iteration):
  input batches:    7,478
  repart time:      0.668s (1.6%)
  encode time:      13.311s (31.7%)
  write time:       16.923s (40.3%)
  spill count:      8
  spilled bytes:    3.32 GiB
  data size:        16.95 GiB
dtrace: pid 27204 has exited
writing flamegraph to "flamegraph.svg"

after:

=== Results ===
Write:
  avg time:         35.921s
  throughput:       1,669,937 rows/s (total across 1 tasks)
  output size:      3.80 GiB

Input Metrics (last iteration):
  output_bytes: 33.88 GiB
  time_elapsed_scanning_until_data: 0.265s
  metadata_load_time: 0.001s
  bytes_scanned: 2.37 GiB
  time_elapsed_scanning_total: 234.757s
  bloom_filter_eval_time: 0.000s
  time_elapsed_opening: 0.003s
  statistics_eval_time: 0.000s
  time_elapsed_processing: 5.214s
  row_pushdown_eval_time: 0.000s
  elapsed_compute: 0.000s
  output_rows: 119,972,104
  page_index_eval_time: 0.000s
  output_batches: 14,956

Shuffle Metrics (last iteration):
  input batches:    7,478
  repart time:      0.637s (1.8%)
  encode time:      13.048s (36.3%)
  write time:       11.051s (30.8%)
  spill count:      8
  spilled bytes:    3.32 GiB
  data size:        16.95 GiB
dtrace: pid 37062 has exited
writing flamegraph to "flamegraph.svg"

Test for add interleave_time metric
after this:

=== Results ===
Write:
  avg time:         32.851s
  throughput:       1,826,014 rows/s (total across 1 tasks)
  output size:      3.80 GiB

Input Metrics (last iteration):
  page_index_eval_time: 0.000s
  elapsed_compute: 0.000s
  time_elapsed_opening: 0.002s
  output_bytes: 33.88 GiB
  row_pushdown_eval_time: 0.000s
  output_batches: 14,956
  statistics_eval_time: 0.000s
  bytes_scanned: 2.37 GiB
  bloom_filter_eval_time: 0.000s
  time_elapsed_processing: 5.287s
  metadata_load_time: 0.000s
  output_rows: 119,972,104
  time_elapsed_scanning_until_data: 0.261s
  time_elapsed_scanning_total: 226.113s

Shuffle Metrics (last iteration):
  input batches:    7,478
  repart time:      0.665s (2.0%)
  interleave time:  6.796s (20.7%)
  encode time:      12.600s (38.4%)
  write time:       8.844s (26.9%)
  spill count:      8
  spilled bytes:    3.32 GiB
  data size:        16.95 GiB
dtrace: pid 39163 has exited
writing flamegraph to "flamegraph.svg"

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves the native shuffle writer by (1) adding a new interleave_time metric to measure time spent in interleave_record_batch, and (2) increasing write efficiency by explicitly sizing the output_data BufWriter buffer.

Changes:

  • Add interleave_time to ShufflePartitionerMetrics and plumb it through shuffle write paths.
  • Refactor PartitionedBatchIterator to record interleave_record_batch time per produced batch.
  • Construct the final shuffle data writer with BufWriter::with_capacity(self.write_buffer_size, ...) instead of the default capacity.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
native/shuffle/src/writers/spill.rs Plumbs interleave_time into spill writing by using the iterator’s new next(&Time) API.
native/shuffle/src/partitioners/partitioned_batch_iterator.rs Adds timing around interleave_record_batch and changes the iteration API to accept a Time metric.
native/shuffle/src/partitioners/multi_partition.rs Uses BufWriter::with_capacity(...) for the output data file and records interleave_time during shuffle_write_partition.
native/shuffle/src/metrics.rs Adds the interleave_time metric to ShufflePartitionerMetrics and registers it in new().
native/shuffle/src/bin/shuffle_bench.rs Prints the new interleave_time metric in benchmark output.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 99 to 103
let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len());
let indices = &self.indices[self.pos..indices_end];
let _timer = interleave_time.timer();
match interleave_record_batch(&self.record_batches, indices) {
Ok(batch) => {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants