Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 76 additions & 5 deletions be/src/exec/operator/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_init_timer);
_agg_data = Base::_shared_state->agg_data.get();
_hash_table_size_counter = ADD_COUNTER(custom_profile(), "HashTableSize", TUnit::UNIT);
_hash_table_size_counter =
ADD_COUNTER_WITH_LEVEL(custom_profile(), "HashTableSize", TUnit::UNIT, 1);
_hash_table_memory_usage =
ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), "MemoryUsageHashTable", TUnit::BYTES, 1);
_serialize_key_arena_memory_usage = ADD_COUNTER_WITH_LEVEL(
Expand All @@ -69,15 +70,25 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
_merge_timer = ADD_TIMER(Base::custom_profile(), "MergeTime");
_expr_timer = ADD_TIMER(Base::custom_profile(), "ExprTime");
_deserialize_data_timer = ADD_TIMER(Base::custom_profile(), "DeserializeAndMergeTime");
_hash_table_compute_timer = ADD_TIMER(Base::custom_profile(), "HashTableComputeTime");
_hash_table_limit_compute_timer = ADD_TIMER(Base::custom_profile(), "DoLimitComputeTime");
_hash_table_emplace_timer = ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime");
_hash_table_compute_timer =
ADD_TIMER_WITH_LEVEL(Base::custom_profile(), "HashTableComputeTime", 1);
_hash_table_limit_compute_timer =
ADD_TIMER_WITH_LEVEL(Base::custom_profile(), "DoLimitComputeTime", 1);
_hash_table_emplace_timer =
ADD_TIMER_WITH_LEVEL(Base::custom_profile(), "HashTableEmplaceTime", 1);
_hash_table_input_counter =
ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT);
ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT, 1);

_memory_usage_container = ADD_COUNTER(custom_profile(), "MemoryUsageContainer", TUnit::BYTES);
_memory_usage_arena = ADD_COUNTER(custom_profile(), "MemoryUsageArena", TUnit::BYTES);

_mock_hash_table_compute_timer =
ADD_TIMER_WITH_LEVEL(Base::custom_profile(), "MockHashTableComputeTime", 1);
_mock_hash_table_emplace_timer =
ADD_TIMER_WITH_LEVEL(Base::custom_profile(), "MockHashTableEmplaceTime", 1);
_mock_hash_table_input_counter = ADD_COUNTER_WITH_LEVEL(
Base::custom_profile(), "MockHashTableInputCount", TUnit::UNIT, 1);

return Status::OK();
}

Expand Down Expand Up @@ -132,6 +143,13 @@ Status AggSinkLocalState::open(RuntimeState* state) {
p._align_aggregate_states);
}},
_agg_data->method_variant);

// Init mock hash table with the same key type for benchmark comparison
Base::_shared_state->mock_agg_data = std::make_unique<AggregatedDataVariants>();
RETURN_IF_ERROR(init_hash_method<AggregatedDataVariants>(
Base::_shared_state->mock_agg_data.get(),
get_data_types(Base::_shared_state->probe_expr_ctxs), p._is_first_phase));

if (p._is_merge) {
_executor = std::make_unique<Executor<false, true>>();
} else {
Expand Down Expand Up @@ -337,6 +355,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(Block* block) {
} else {
_emplace_into_hash_table(_places.data(), key_columns, (uint32_t)rows);
}
_mock_emplace_into_hash_table(key_columns, (uint32_t)rows);

if (need_do_agg) {
for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) {
Expand Down Expand Up @@ -494,8 +513,10 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(Block* block) {
rows)) {
RETURN_IF_ERROR(do_aggregate_evaluators());
}
_mock_emplace_into_hash_table(key_columns, rows);
} else {
_emplace_into_hash_table(_places.data(), key_columns, rows);
_mock_emplace_into_hash_table(key_columns, rows);
RETURN_IF_ERROR(do_aggregate_evaluators());

if (_should_limit_output && !Base::_shared_state->enable_spill) {
Expand Down Expand Up @@ -570,6 +591,56 @@ void AggSinkLocalState::_emplace_into_hash_table(AggregateDataPtr* places,
_agg_data->method_variant);
}

void AggSinkLocalState::_mock_emplace_into_hash_table(ColumnRawPtrs& key_columns,
uint32_t num_rows) {
auto* mock_agg_data = Base::_shared_state->mock_agg_data.get();
if (!mock_agg_data) {
return;
}
auto& mock_arena = Base::_shared_state->mock_arena;
auto& p = Base::_parent->template cast<AggSinkOperatorX>();
size_t agg_states_size = ((p._total_size_of_aggregate_states + p._align_aggregate_states - 1) /
p._align_aggregate_states) *
p._align_aggregate_states;

std::visit(Overload {[&](std::monostate& arg) -> void {
// do nothing for mock
},
[&](auto& agg_method) -> void {
SCOPED_TIMER(_mock_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key_and_origin(key, origin, mock_arena);
auto* mapped = mock_arena.aligned_alloc(
agg_states_size, p._align_aggregate_states);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
ctor(key, mapped);
};

auto creator_for_null_key = [&](auto& mapped) {
mapped = mock_arena.aligned_alloc(agg_states_size,
p._align_aggregate_states);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
};

SCOPED_TIMER(_mock_hash_table_emplace_timer);
lazy_emplace_batch(agg_method, state, num_rows, creator,
creator_for_null_key, [&](uint32_t, auto&) {});
COUNTER_UPDATE(_mock_hash_table_input_counter, num_rows);
}},
mock_agg_data->method_variant);
}

bool AggSinkLocalState::_emplace_into_hash_table_limit(AggregateDataPtr* places, Block* block,
const std::vector<int>& key_locs,
ColumnRawPtrs& key_columns,
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/operator/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
uint32_t num_rows);
void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns,
uint32_t num_rows);
void _mock_emplace_into_hash_table(ColumnRawPtrs& key_columns, uint32_t num_rows);
bool _emplace_into_hash_table_limit(AggregateDataPtr* places, Block* block,
const std::vector<int>& key_locs,
ColumnRawPtrs& key_columns, uint32_t num_rows);
Expand Down Expand Up @@ -114,6 +115,11 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
RuntimeProfile::Counter* _memory_usage_container = nullptr;
RuntimeProfile::Counter* _memory_usage_arena = nullptr;

// Mock benchmark timers: emplace into hash table without AggregateDataContainer
RuntimeProfile::Counter* _mock_hash_table_compute_timer = nullptr;
RuntimeProfile::Counter* _mock_hash_table_emplace_timer = nullptr;
RuntimeProfile::Counter* _mock_hash_table_input_counter = nullptr;

bool _should_limit_output = false;

PODArray<AggregateDataPtr> _places;
Expand Down
102 changes: 97 additions & 5 deletions be/src/exec/operator/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_get_results_timer = ADD_TIMER(custom_profile(), "GetResultsTime");
_hash_table_iterate_timer = ADD_TIMER(custom_profile(), "HashTableIterateTime");
_insert_keys_to_column_timer = ADD_TIMER(custom_profile(), "InsertKeysToColumnTime");
_insert_values_to_column_timer = ADD_TIMER(custom_profile(), "InsertValuesToColumnTime");
_hash_table_iterate_timer =
ADD_TIMER_WITH_LEVEL(custom_profile(), "HashTableIterateTime", 1);
_insert_keys_to_column_timer =
ADD_TIMER_WITH_LEVEL(custom_profile(), "InsertKeysToColumnTime", 1);
_insert_values_to_column_timer =
ADD_TIMER_WITH_LEVEL(custom_profile(), "InsertValuesToColumnTime", 1);

_merge_timer = ADD_TIMER(Base::custom_profile(), "MergeTime");
_deserialize_data_timer = ADD_TIMER(Base::custom_profile(), "DeserializeAndMergeTime");
_hash_table_compute_timer = ADD_TIMER(Base::custom_profile(), "HashTableComputeTime");
_hash_table_emplace_timer = ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime");
_hash_table_compute_timer =
ADD_TIMER_WITH_LEVEL(Base::custom_profile(), "HashTableComputeTime", 1);
_hash_table_emplace_timer =
ADD_TIMER_WITH_LEVEL(Base::custom_profile(), "HashTableEmplaceTime", 1);
_hash_table_input_counter =
ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT, 1);
_hash_table_memory_usage =
Expand All @@ -55,6 +60,11 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
_memory_usage_container = ADD_COUNTER(custom_profile(), "MemoryUsageContainer", TUnit::BYTES);
_memory_usage_arena = ADD_COUNTER(custom_profile(), "MemoryUsageArena", TUnit::BYTES);

_mock_hash_table_iterate_timer =
ADD_TIMER_WITH_LEVEL(custom_profile(), "MockHashTableIterateTime", 1);
_mock_container_iterate_timer =
ADD_TIMER_WITH_LEVEL(custom_profile(), "MockContainerIterateTime", 1);

auto& p = _parent->template cast<AggSourceOperatorX>();
if (p._without_key) {
if (p._needs_finalize) {
Expand Down Expand Up @@ -137,6 +147,47 @@ Status AggLocalState::_get_results_with_serialized_key(RuntimeState* state, Bloc

uint32_t num_rows = 0;
shared_state.aggregate_data_container->init_once();

// One-time mock benchmark: compare hash table iterate vs container iterate
if (!_mock_iterated) {
_mock_iterated = true;
// (a) Iterate mock hash table (without AggregateDataContainer)
if (shared_state.mock_agg_data) {
std::visit(
Overload {
[&](std::monostate&) -> void {},
[&](auto& mock_method) -> void {
SCOPED_TIMER(
_mock_hash_table_iterate_timer);
auto& mock_ht = *mock_method.hash_table;
for (auto it = mock_ht.begin();
it != mock_ht.end(); ++it) {
[[maybe_unused]] auto k =
it->get_first();
[[maybe_unused]] auto* v =
it->get_second();
asm volatile("" ::: "memory");
}
}},
shared_state.mock_agg_data->method_variant);
}
// (b) Iterate AggregateDataContainer (full traverse)
{
SCOPED_TIMER(_mock_container_iterate_timer);
auto mock_iter =
shared_state.aggregate_data_container->begin();
while (mock_iter !=
shared_state.aggregate_data_container->end()) {
[[maybe_unused]] auto k =
mock_iter.template get_key<KeyType>();
[[maybe_unused]] auto* v =
mock_iter.get_aggregate_data();
asm volatile("" ::: "memory");
++mock_iter;
}
}
}

auto& iter = shared_state.aggregate_data_container->iterator;

{
Expand Down Expand Up @@ -261,6 +312,47 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, Block

uint32_t num_rows = 0;
shared_state.aggregate_data_container->init_once();

// One-time mock benchmark: compare hash table iterate vs container iterate
if (!_mock_iterated) {
_mock_iterated = true;
// (a) Iterate mock hash table (without AggregateDataContainer)
if (shared_state.mock_agg_data) {
std::visit(
Overload {
[&](std::monostate&) -> void {},
[&](auto& mock_method) -> void {
SCOPED_TIMER(
_mock_hash_table_iterate_timer);
auto& mock_ht = *mock_method.hash_table;
for (auto it = mock_ht.begin();
it != mock_ht.end(); ++it) {
[[maybe_unused]] auto k =
it->get_first();
[[maybe_unused]] auto* v =
it->get_second();
asm volatile("" ::: "memory");
}
}},
shared_state.mock_agg_data->method_variant);
}
// (b) Iterate AggregateDataContainer (full traverse)
{
SCOPED_TIMER(_mock_container_iterate_timer);
auto mock_iter =
shared_state.aggregate_data_container->begin();
while (mock_iter !=
shared_state.aggregate_data_container->end()) {
[[maybe_unused]] auto k =
mock_iter.template get_key<KeyType>();
[[maybe_unused]] auto* v =
mock_iter.get_aggregate_data();
asm volatile("" ::: "memory");
++mock_iter;
}
}
}

auto& iter = shared_state.aggregate_data_container->iterator;

{
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/operator/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class AggLocalState MOCK_REMOVE(final) : public PipelineXLocalState<AggSharedSta
RuntimeProfile::Counter* _memory_usage_container = nullptr;
RuntimeProfile::Counter* _memory_usage_arena = nullptr;

// Mock benchmark timers: one-time full traverse comparison
RuntimeProfile::Counter* _mock_hash_table_iterate_timer = nullptr;
RuntimeProfile::Counter* _mock_container_iterate_timer = nullptr;
bool _mock_iterated = false;

using vectorized_get_result =
std::function<Status(RuntimeState* state, Block* block, bool* eos)>;

Expand Down
Loading
Loading