Conversation
9789b9f to
d5c4c5f
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
It's happening! |
This comment has been minimized.
This comment has been minimized.
Yes! Finally -- and no regressions this time! I have a pile of stacked PRs:
Now that I have some evidence that this PR make them faster, I'll go back and get them ready to review |
…er` (#21327) ~(Draft until I am sure I can use this API to make FileStream behave better)~ ## Which issue does this PR close? - part of #20529 - Needed for #21351 - Broken out of #20820 - Closes #21427 ## Rationale for this change I can get 10% faster on many ClickBench queries by reordeirng files at runtime. You can see it all working together here: #21351 To do do, I need to rework the FileStream so that it can reorder operations at runtime. Eventually that will include both CPU and IO. This PR is a step in the direction by introducing the main Morsel API and implementing it for Parquet. The next PR (#21342) rewrites FileStream in terms of the Morsel API ## What changes are included in this PR? 1. Add proposed `Morsel` API 2. Rewrite Parquet opener in terms of that API 3. Add an adapter layer (back to FileOpener, so I don't have to rewrite FileStream in the same PR) My next PR will rewrite the FileStream to use the Morsel API ## Are these changes tested? Yes by existing CI. I will work on adding additional tests for just Parquet opener in a follow on PR ## Are there any user-facing changes? No
acde88a to
6b79a6f
Compare
This comment has been minimized.
This comment has been minimized.
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (1b1b586) to 961c5fc (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Mind taking it out of draft if it is ready for review? |
Yes I will do so for sure. I wanted to make sure it was ready (passing tests, comments all polished) before doing so. I expect to mark it ready very soon |
|
Ok, this is ready for review. I am so sorry for the size of the PR but it is mostly comments and tests |
|
Awesome, so the PR changes who reads which file at runtime using morselizer, would be extremely interesting to try this on many small files environments. Do we expect improvements for even partitions(partition have the similar number of files with similar sizes)? Is it planned to morselize deeper to process row groups in parallel? This activity actually reminds me of #19815 benchmark. |
In my experience, there is always a some partition skew even for very balanced scans on local FS.
Yes - it is the plan to split morsels into sub-row-group morsels, so smaller datasets (e.g. TPC-DS at SF=1 which has single-row group files) or high-cpu machines (due to too little parallelism) will benefit more as well. Currently parallelism is limited in datasets with few row groups as we can't go beyond row groups. |
| /// | ||
| /// Streams that may share work across siblings use [`WorkSource::Shared`], | ||
| /// while streams that can not share work (e.g. because they must preserve file | ||
| /// order) use [`WorkSource::Local`]. |
There was a problem hiding this comment.
| /// order) use [`WorkSource::Local`]. | |
| /// order) use [`WorkSource::Local`]. |
|
run benchmark tpch_csv tpch_csv10 h2o_medium |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (5287210) to 961c5fc (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (5287210) to 961c5fc (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (5287210) to 961c5fc (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
| }) | ||
| } | ||
|
|
||
| fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> { |
There was a problem hiding this comment.
This doesn't hold back this PR, but I don't like the additions in some recent PRs of reset_state on plans / sources etc.
In my opinion we should avoid setting mutable state on ExecutionPlans (and related data structures) & move that to the inner streams.
In an ideal scenario:
- During any time we can
clonea plan and share / run it without expecting any side effects (e.g. partially executed plans) - Concurrent runs of a query (ideally
executes) should result in the same result, even while a query is running.
In my PR #20481 I combined the shared state using TaskContext (which, in normal usages, will be created on each new query run), which I think still not is 100% the desired end state but avoids a lot of scenario's.
There was a problem hiding this comment.
This is a good point . I think it is a bigger discussion so I filed a new ticket for that:
|
Perhaps would be great to test as well how well it works for different data sources ATM. |
|
Trying to see what morsel splitting & merging will look like in this commit ea3834e |
I also think we may need to adjust the Parquet PushDecoder somehow to communicate better its internal boundaries (e.g. so we can easily / efficiently pick off RowGroups without recreating it) We were talking about this here too: |
Co-authored-by: Oleks V <comphead@users.noreply.github.com>
|
I asked Claude to write some tests on Shared mode and it found some potential concerns |

Which issue does this PR close?
Rationale for this change
This PR finally enables dynamic work scheduling in the FileStream (so that if a task is done it can look at any remaining work)
This improves performance on queries that scan multiple files and the work is not balanced evenly across partitions in the plan (e.g. we have dynamic filtering that reduces work significantly)
It is the last of a sequence of several PRs:
ParquetOpenertoParquetMorselizer#21327What changes are included in this PR?
Note there are a bunch of other things that are NOT included in this PR, including
As @Dandandan proposes below, I expect we can work on those changes as follow on PRs.
Are these changes tested?
Yes by existing functional and benchmark tests, as well as new functional tests
Are there any user-facing changes?
Yes, faster performance (see benchmarks): #21351 (comment)