diff --git a/be/src/exec/operator/multi_cast_data_streamer.cpp b/be/src/exec/operator/multi_cast_data_streamer.cpp index 403d81110180bc..34b09bc712d860 100644 --- a/be/src/exec/operator/multi_cast_data_streamer.cpp +++ b/be/src/exec/operator/multi_cast_data_streamer.cpp @@ -30,7 +30,6 @@ #include "common/status.h" #include "core/block/block.h" #include "exec/operator/multi_cast_data_stream_source.h" -#include "exec/operator/spill_utils.h" #include "exec/pipeline/dependency.h" #include "exec/spill/spill_file_manager.h" #include "exec/spill/spill_file_reader.h" @@ -121,9 +120,8 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b }; l.unlock(); - // spill is synchronous; the profile passed to the runnable was only - // used for counters that are now tracked externally, so call helper - return run_spill_task(state, catch_exception_func); + RETURN_IF_CANCELLED(state); + return catch_exception_func(); } auto& pos_to_pull = _sender_pos_to_read[sender_idx]; @@ -279,7 +277,8 @@ Status MultiCastDataStreamer::_start_spill_task(RuntimeState* state, SpillFileSP return status; }; - return run_spill_task(state, exception_catch_func); + RETURN_IF_CANCELLED(state); + return exception_catch_func(); } Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, bool eos) { diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h index a09638f6d718e3..8585f0be741bb6 100644 --- a/be/src/exec/operator/operator.h +++ b/be/src/exec/operator/operator.h @@ -417,8 +417,6 @@ class PipelineXSpillLocalState : public PipelineXLocalState { _spill_file_current_size = ADD_COUNTER_WITH_LEVEL( Base::custom_profile(), profile::SPILL_WRITE_FILE_CURRENT_BYTES, TUnit::BYTES, 1); - _spill_file_current_count = ADD_COUNTER_WITH_LEVEL( - Base::custom_profile(), profile::SPILL_WRITE_FILE_CURRENT_COUNT, TUnit::UNIT, 1); } // Total time of spill, including spill task scheduling time, @@ -441,9 +439,6 @@ class PipelineXSpillLocalState : public PipelineXLocalState { // Total bytes of spill data written to disk file(after serialized) RuntimeProfile::Counter* _spill_write_file_total_size = nullptr; RuntimeProfile::Counter* _spill_file_total_count = nullptr; - RuntimeProfile::Counter* _spill_file_current_count = nullptr; - // Spilled file total size - RuntimeProfile::Counter* _spill_file_total_size = nullptr; // Current spilled file size RuntimeProfile::Counter* _spill_file_current_size = nullptr; @@ -806,8 +801,6 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState _runtime_state; - bool _opened = false; - std::vector _current_merging_files; /// Readers held alive during merge; one per SpillFile, reads parts sequentially. std::vector _current_merging_readers; @@ -90,4 +88,4 @@ class SpillSortSourceOperatorX : public OperatorX { std::unique_ptr _sort_source_operator; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/operator/spill_utils.h b/be/src/exec/operator/spill_utils.h index 7d9cb200bd16ce..dff39d5212feda 100644 --- a/be/src/exec/operator/spill_utils.h +++ b/be/src/exec/operator/spill_utils.h @@ -70,23 +70,6 @@ struct SpillContext { } }; -// helper to execute a spill function synchronously. The old code used -// SpillRunnable/SpillSinkRunnable/SpillRecoverRunnable wrappers to track -// counters and optionally notify a SpillContext. Since spill operations are -// now performed synchronously and external code already maintains any -// necessary counters, those wrappers are no longer necessary. We keep a -// small utility to run the provided callbacks and forward cancellation. -inline Status run_spill_task(RuntimeState* state, std::function exec_func, - std::function fin_cb = {}) { - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(exec_func()); - if (fin_cb) { - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(fin_cb()); - } - return Status::OK(); -} - template inline void update_profile_from_inner_profile(const std::string& name, RuntimeProfile* runtime_profile, diff --git a/be/src/exec/spill/spill_file_writer.cpp b/be/src/exec/spill/spill_file_writer.cpp index 60ddb68c26b6e0..d68d52c96401d6 100644 --- a/be/src/exec/spill/spill_file_writer.cpp +++ b/be/src/exec/spill/spill_file_writer.cpp @@ -97,7 +97,6 @@ Status SpillFileWriter::_close_current_part(const std::shared_ptr& sp int64_t meta_size = _part_meta.size(); _part_written_bytes += meta_size; - _total_written_bytes += meta_size; COUNTER_UPDATE(_write_file_total_size, meta_size); if (_resource_ctx) { _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(meta_size); @@ -118,7 +117,6 @@ Status SpillFileWriter::_close_current_part(const std::shared_ptr& sp // Advance to next part ++_current_part_index; - ++_total_parts; if (spill_file) { spill_file->increment_part_count(); } @@ -251,7 +249,6 @@ Status SpillFileWriter::_write_internal(const Block& block, } COUNTER_UPDATE(_write_block_counter, 1); _part_written_bytes += buff_size; - _total_written_bytes += buff_size; ++_part_written_blocks; // Incrementally update SpillFile so gc() can always // decrement the correct amount from _data_dir. @@ -269,4 +266,4 @@ Status SpillFileWriter::_write_internal(const Block& block, return status; } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/spill/spill_file_writer.h b/be/src/exec/spill/spill_file_writer.h index 215685933f4b49..eb8c7a9d9eb0a8 100644 --- a/be/src/exec/spill/spill_file_writer.h +++ b/be/src/exec/spill/spill_file_writer.h @@ -89,9 +89,6 @@ class SpillFileWriter { size_t _part_max_sub_block_size = 0; std::string _part_meta; - // ── Cumulative state ── - int64_t _total_written_bytes = 0; - size_t _total_parts = 0; bool _closed = false; // ── Counters ── diff --git a/be/src/runtime/runtime_profile_counter_names.h b/be/src/runtime/runtime_profile_counter_names.h index bd4d2e878a86f7..5813d5e0ea95f9 100644 --- a/be/src/runtime/runtime_profile_counter_names.h +++ b/be/src/runtime/runtime_profile_counter_names.h @@ -104,7 +104,6 @@ inline constexpr char SPILL_WRITE_ROWS[] = "SpillWriteRows"; inline constexpr char SPILL_WRITE_FILE_BYTES[] = "SpillWriteFileBytes"; inline constexpr char SPILL_WRITE_FILE_TOTAL_COUNT[] = "SpillWriteFileTotalCount"; inline constexpr char SPILL_WRITE_FILE_CURRENT_BYTES[] = "SpillWriteFileCurrentBytes"; -inline constexpr char SPILL_WRITE_FILE_CURRENT_COUNT[] = "SpillWriteFileCurrentCount"; // ============================================================ // Spill read counters (Source-only) diff --git a/be/test/exec/operator/spill_sort_sink_operator_test.cpp b/be/test/exec/operator/spill_sort_sink_operator_test.cpp index ead792a1e32e1d..77ac2785cc712b 100644 --- a/be/test/exec/operator/spill_sort_sink_operator_test.cpp +++ b/be/test/exec/operator/spill_sort_sink_operator_test.cpp @@ -25,7 +25,6 @@ #include "core/block/block.h" #include "core/data_type/data_type_number.h" #include "exec/operator/spill_sort_test_helper.h" -#include "exec/operator/spill_utils.h" #include "exec/pipeline/dependency.h" #include "exec/pipeline/pipeline_task.h" #include "testutil/column_helper.h" @@ -89,34 +88,6 @@ TEST_F(SpillSortSinkOperatorTest, RevokeMemoryReturnsCancelAtEntry) { expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get())); } -TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAtEntry) { - cancel_state(_helper.runtime_state.get()); - - bool executed = false; - expect_cancelled(run_spill_task(_helper.runtime_state.get(), [&]() { - executed = true; - return Status::OK(); - })); - EXPECT_FALSE(executed); -} - -TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAfterCallback) { - bool finalized = false; - - auto status = run_spill_task( - _helper.runtime_state.get(), - [&]() { - cancel_state(_helper.runtime_state.get()); - return Status::OK(); - }, - [&]() { - finalized = true; - return Status::OK(); - }); - expect_cancelled(status); - EXPECT_FALSE(finalized); -} - TEST_F(SpillSortSinkOperatorTest, Basic) { auto [source_operator, sink_operator] = _helper.create_operators(); ASSERT_TRUE(source_operator != nullptr); diff --git a/be/test/exec/operator/spillable_operator_test_helper.cpp b/be/test/exec/operator/spillable_operator_test_helper.cpp index fb87b156545db0..d51d6d17be16c5 100644 --- a/be/test/exec/operator/spillable_operator_test_helper.cpp +++ b/be/test/exec/operator/spillable_operator_test_helper.cpp @@ -60,7 +60,6 @@ void SpillableOperatorTestHelper::SetUp() { ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); - ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1); operator_profile->add_child(custom_profile.get(), true); @@ -107,4 +106,4 @@ void SpillableOperatorTestHelper::TearDown() { SAFE_DELETE(ExecEnv::GetInstance()->_spill_file_mgr); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp index 61589327909d63..e9f1a58aeccf78 100644 --- a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp +++ b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp @@ -65,8 +65,6 @@ class MultiCastDataStreamerTest : public testing::Test { ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); - ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, - 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1); } @@ -111,8 +109,6 @@ class MultiCastDataStreamerTest : public testing::Test { TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillWriteFileCurrentBytes", TUnit::BYTES, 1); - ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillWriteFileCurrentCount", - TUnit::UNIT, 1); multi_cast_data_streamer->set_source_profile(i, source_profiles[i].get()); } diff --git a/be/test/vec/spill/spill_file_test.cpp b/be/test/vec/spill/spill_file_test.cpp index 67740acdcd2e97..09173a7a343c18 100644 --- a/be/test/vec/spill/spill_file_test.cpp +++ b/be/test/vec/spill/spill_file_test.cpp @@ -72,7 +72,6 @@ class SpillFileTest : public testing::Test { ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); - ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1); _profile->add_child(_custom_profile.get(), true); diff --git a/be/test/vec/spill/spill_repartitioner_test.cpp b/be/test/vec/spill/spill_repartitioner_test.cpp index 53ffcf9f0ac00c..01da4719cad0d0 100644 --- a/be/test/vec/spill/spill_repartitioner_test.cpp +++ b/be/test/vec/spill/spill_repartitioner_test.cpp @@ -74,7 +74,6 @@ class SpillRepartitionerTest : public testing::Test { ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); - ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1); _profile->add_child(_custom_profile.get(), true);