Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions be/src/exec/operator/multi_cast_data_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "common/status.h"
#include "core/block/block.h"
#include "exec/operator/multi_cast_data_stream_source.h"
#include "exec/operator/spill_utils.h"
#include "exec/pipeline/dependency.h"
#include "exec/spill/spill_file_manager.h"
#include "exec/spill/spill_file_reader.h"
Expand Down Expand Up @@ -121,9 +120,7 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b
};

l.unlock();
// spill is synchronous; the profile passed to the runnable was only
// used for counters that are now tracked externally, so call helper
return run_spill_task(state, catch_exception_func);
return catch_exception_func();
}

auto& pos_to_pull = _sender_pos_to_read[sender_idx];
Expand Down Expand Up @@ -279,7 +276,7 @@ Status MultiCastDataStreamer::_start_spill_task(RuntimeState* state, SpillFileSP
return status;
};

return run_spill_task(state, exception_catch_func);
return exception_catch_func();
}

Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, bool eos) {
Expand Down
7 changes: 0 additions & 7 deletions be/src/exec/operator/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,6 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {

_spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
Base::custom_profile(), profile::SPILL_WRITE_FILE_CURRENT_BYTES, TUnit::BYTES, 1);
_spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
Base::custom_profile(), profile::SPILL_WRITE_FILE_CURRENT_COUNT, TUnit::UNIT, 1);
}

// Total time of spill, including spill task scheduling time,
Expand All @@ -441,9 +439,6 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
RuntimeProfile::Counter* _spill_file_total_count = nullptr;
RuntimeProfile::Counter* _spill_file_current_count = nullptr;
// Spilled file total size
RuntimeProfile::Counter* _spill_file_total_size = nullptr;
// Current spilled file size
RuntimeProfile::Counter* _spill_file_current_size = nullptr;

Expand Down Expand Up @@ -801,8 +796,6 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA
RuntimeProfile::Counter*& _spill_write_rows_count = _write_counters.spill_write_rows_count;

// Sink-only counters
// Spilled file total size
RuntimeProfile::Counter* _spill_file_total_size = nullptr;
// Total bytes written to spill files (required by SpillFileWriter)
RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
// Total number of spill files created (required by SpillFileWriter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,7 @@ Status PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) {
return status;
};

// old code used SpillSinkRunnable, but spills are synchronous and counters
// are tracked externally. Call the spill function directly.
return run_spill_task(state, std::move(spill_func));
return spill_func();
}

void PartitionedAggSinkLocalState::_reset_tmp_data() {
Expand Down
6 changes: 2 additions & 4 deletions be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include "common/status.h"
#include "exec/operator/iceberg_table_sink_operator.h"
#include "exec/operator/spill_utils.h"
#include "exec/sink/writer/iceberg/viceberg_sort_writer.h"
#include "exec/sink/writer/iceberg/viceberg_table_writer.h"

Expand Down Expand Up @@ -78,7 +77,7 @@ size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* sta
return sort_writer->data_size();
}

Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* /*state*/) {
if (!_writer) {
return Status::OK();
}
Expand All @@ -95,7 +94,7 @@ Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
return status;
};

return run_spill_task(state, exception_catch_func);
return exception_catch_func();
}

SpillIcebergTableSinkOperatorX::SpillIcebergTableSinkOperatorX(
Expand Down Expand Up @@ -169,7 +168,6 @@ void SpillIcebergTableSinkLocalState::_init_spill_counters() {
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadRows", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
}

} // namespace doris
6 changes: 1 addition & 5 deletions be/src/exec/operator/spill_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) {
Status SpillSortLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
if (_opened) {
return Status::OK();
}

RETURN_IF_ERROR(setup_in_memory_sort_op(state));
return Base::open(state);
}
Expand Down Expand Up @@ -262,4 +258,4 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, Block* block, bo
return Status::OK();
}

} // namespace doris
} // namespace doris
4 changes: 1 addition & 3 deletions be/src/exec/operator/spill_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ class SpillSortLocalState final : public PipelineXSpillLocalState<SpillSortShare
friend class SpillSortSourceOperatorX;
std::unique_ptr<RuntimeState> _runtime_state;

bool _opened = false;

std::vector<SpillFileSPtr> _current_merging_files;
/// Readers held alive during merge; one per SpillFile, reads parts sequentially.
std::vector<SpillFileReaderSPtr> _current_merging_readers;
Expand Down Expand Up @@ -90,4 +88,4 @@ class SpillSortSourceOperatorX : public OperatorX<SpillSortLocalState> {

std::unique_ptr<SortSourceOperatorX> _sort_source_operator;
};
} // namespace doris
} // namespace doris
15 changes: 0 additions & 15 deletions be/src/exec/operator/spill_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,6 @@ struct SpillContext {
}
};

// helper to execute a spill function synchronously. The old code used
// SpillRunnable/SpillSinkRunnable/SpillRecoverRunnable wrappers to track
// counters and optionally notify a SpillContext. Since spill operations are
// now performed synchronously and external code already maintains any
// necessary counters, those wrappers are no longer necessary. We keep a
// small utility to run the provided callbacks and forward cancellation.
inline Status run_spill_task(RuntimeState* state, std::function<Status()> exec_func,
std::function<Status()> fin_cb = {}) {
RETURN_IF_ERROR(exec_func());
if (fin_cb) {
RETURN_IF_ERROR(fin_cb());
}
return Status::OK();
}

template <bool accumulating>
inline void update_profile_from_inner_profile(const std::string& name,
RuntimeProfile* runtime_profile,
Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/spill/spill_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ Status SpillFileWriter::_close_current_part(const std::shared_ptr<SpillFile>& sp

int64_t meta_size = _part_meta.size();
_part_written_bytes += meta_size;
_total_written_bytes += meta_size;
COUNTER_UPDATE(_write_file_total_size, meta_size);
if (_resource_ctx) {
_resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(meta_size);
Expand All @@ -118,7 +117,6 @@ Status SpillFileWriter::_close_current_part(const std::shared_ptr<SpillFile>& sp

// Advance to next part
++_current_part_index;
++_total_parts;
if (spill_file) {
spill_file->increment_part_count();
}
Expand Down Expand Up @@ -251,7 +249,6 @@ Status SpillFileWriter::_write_internal(const Block& block,
}
COUNTER_UPDATE(_write_block_counter, 1);
_part_written_bytes += buff_size;
_total_written_bytes += buff_size;
++_part_written_blocks;
// Incrementally update SpillFile so gc() can always
// decrement the correct amount from _data_dir.
Expand All @@ -269,4 +266,4 @@ Status SpillFileWriter::_write_internal(const Block& block,
return status;
}

} // namespace doris
} // namespace doris
3 changes: 0 additions & 3 deletions be/src/exec/spill/spill_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ class SpillFileWriter {
size_t _part_max_sub_block_size = 0;
std::string _part_meta;

// ── Cumulative state ──
int64_t _total_written_bytes = 0;
size_t _total_parts = 0;
bool _closed = false;

// ── Counters ──
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/runtime_profile_counter_names.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ inline constexpr char SPILL_WRITE_ROWS[] = "SpillWriteRows";
inline constexpr char SPILL_WRITE_FILE_BYTES[] = "SpillWriteFileBytes";
inline constexpr char SPILL_WRITE_FILE_TOTAL_COUNT[] = "SpillWriteFileTotalCount";
inline constexpr char SPILL_WRITE_FILE_CURRENT_BYTES[] = "SpillWriteFileCurrentBytes";
inline constexpr char SPILL_WRITE_FILE_CURRENT_COUNT[] = "SpillWriteFileCurrentCount";

// ============================================================
// Spill read counters (Source-only)
Expand Down
3 changes: 1 addition & 2 deletions be/test/exec/operator/spillable_operator_test_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ void SpillableOperatorTestHelper::SetUp() {
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1);

operator_profile->add_child(custom_profile.get(), true);
Expand Down Expand Up @@ -107,4 +106,4 @@ void SpillableOperatorTestHelper::TearDown() {
SAFE_DELETE(ExecEnv::GetInstance()->_spill_file_mgr);
}

} // namespace doris
} // namespace doris
4 changes: 0 additions & 4 deletions be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ class MultiCastDataStreamerTest : public testing::Test {
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT,
1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT,
1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT,
1);
}
Expand Down Expand Up @@ -111,8 +109,6 @@ class MultiCastDataStreamerTest : public testing::Test {
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillWriteFileCurrentBytes",
TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillWriteFileCurrentCount",
TUnit::UNIT, 1);
multi_cast_data_streamer->set_source_profile(i, source_profiles[i].get());
}

Expand Down
1 change: 0 additions & 1 deletion be/test/vec/spill/spill_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class SpillFileTest : public testing::Test {
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1);

_profile->add_child(_custom_profile.get(), true);
Expand Down
1 change: 0 additions & 1 deletion be/test/vec/spill/spill_repartitioner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class SpillRepartitionerTest : public testing::Test {
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1);

_profile->add_child(_custom_profile.get(), true);
Expand Down
Loading