Skip to content

Commit c5fc4eb

Browse files
duckdblabs-botgithub-actions[bot]
authored andcommitted
Update vendored DuckDB sources to 129b1fe55e
1 parent d41a790 commit c5fc4eb

30 files changed

Lines changed: 238 additions & 207 deletions

src/duckdb/extension/core_functions/include/core_functions/aggregate/quantile_sort_tree.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ struct QuantileSortTree {
306306
QuantileSortTree(AggregateInputData &aggr_input_data, const WindowPartitionInput &partition) {
307307
// TODO: Two pass parallel sorting using Build
308308
auto &inputs = *partition.inputs;
309+
auto &interrupt = partition.interrupt_state;
309310
ColumnDataScanState scan;
310311
DataChunk sort;
311312
inputs.InitializeScan(scan, partition.column_ids);
@@ -338,12 +339,12 @@ struct QuantileSortTree {
338339
filter_sel[filtered++] = i;
339340
}
340341
}
341-
local_state.Sink(partition.context, sort, row_idx, filter_sel, filtered);
342+
local_state.Sink(partition.context, sort, row_idx, filter_sel, filtered, interrupt);
342343
} else {
343-
local_state.Sink(partition.context, sort, row_idx, nullptr, 0);
344+
local_state.Sink(partition.context, sort, row_idx, nullptr, 0, interrupt);
344345
}
345346
}
346-
local_state.Finalize(partition.context);
347+
local_state.Finalize(partition.context, interrupt);
347348
}
348349

349350
inline idx_t SelectNth(const SubFrames &frames, size_t n) const {

src/duckdb/src/execution/operator/aggregate/physical_window.cpp

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ class WindowLocalSourceState : public LocalSourceState {
649649
//! Assign the next task
650650
bool TryAssignTask();
651651
//! Execute a step in the current task
652-
void ExecuteTask(ExecutionContext &context, DataChunk &chunk);
652+
void ExecuteTask(ExecutionContext &context, DataChunk &chunk, InterruptState &interrupt);
653653

654654
//! The shared source state
655655
WindowGlobalSourceState &gsource;
@@ -667,9 +667,9 @@ class WindowLocalSourceState : public LocalSourceState {
667667
DataChunk output_chunk;
668668

669669
protected:
670-
void Sink(ExecutionContext &context);
671-
void Finalize(ExecutionContext &context);
672-
void GetData(ExecutionContext &context, DataChunk &chunk);
670+
void Sink(ExecutionContext &context, InterruptState &interrupt);
671+
void Finalize(ExecutionContext &context, InterruptState &interrupt);
672+
void GetData(ExecutionContext &context, DataChunk &chunk, InterruptState &interrupt);
673673

674674
//! Storage and evaluation for the fully materialised data
675675
unique_ptr<WindowBuilder> builder;
@@ -711,7 +711,7 @@ WindowHashGroup::ExecutorGlobalStates &WindowHashGroup::Initialize(ClientContext
711711
return gestates;
712712
}
713713

714-
void WindowLocalSourceState::Sink(ExecutionContext &context) {
714+
void WindowLocalSourceState::Sink(ExecutionContext &context, InterruptState &interrupt) {
715715
D_ASSERT(task);
716716
D_ASSERT(task->stage == WindowGroupStage::SINK);
717717

@@ -765,15 +765,15 @@ void WindowLocalSourceState::Sink(ExecutionContext &context) {
765765
}
766766

767767
for (idx_t w = 0; w < executors.size(); ++w) {
768-
executors[w]->Sink(context, sink_chunk, coll_chunk, input_idx, *gestates[w], *local_states[w]);
768+
executors[w]->Sink(context, sink_chunk, coll_chunk, input_idx, *gestates[w], *local_states[w], interrupt);
769769
}
770770

771771
window_hash_group->sunk += input_chunk.size();
772772
}
773773
scanner.reset();
774774
}
775775

776-
void WindowLocalSourceState::Finalize(ExecutionContext &context) {
776+
void WindowLocalSourceState::Finalize(ExecutionContext &context, InterruptState &interrupt) {
777777
D_ASSERT(task);
778778
D_ASSERT(task->stage == WindowGroupStage::FINALIZE);
779779

@@ -790,7 +790,7 @@ void WindowLocalSourceState::Finalize(ExecutionContext &context) {
790790
auto &gestates = window_hash_group->gestates;
791791
auto &local_states = window_hash_group->thread_states.at(task->thread_idx);
792792
for (idx_t w = 0; w < executors.size(); ++w) {
793-
executors[w]->Finalize(context, *gestates[w], *local_states[w], window_hash_group->collection);
793+
executors[w]->Finalize(context, *gestates[w], *local_states[w], window_hash_group->collection, interrupt);
794794
}
795795

796796
// Mark this range as done
@@ -898,7 +898,7 @@ bool WindowLocalSourceState::TryAssignTask() {
898898
return gsource.TryNextTask(task, task_local);
899899
}
900900

901-
void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &result) {
901+
void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &result, InterruptState &interrupt) {
902902
auto &gsink = gsource.gsink;
903903

904904
// Update the hash group
@@ -907,16 +907,16 @@ void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &r
907907
// Process the new state
908908
switch (task->stage) {
909909
case WindowGroupStage::SINK:
910-
Sink(context);
910+
Sink(context, interrupt);
911911
D_ASSERT(TaskFinished());
912912
break;
913913
case WindowGroupStage::FINALIZE:
914-
Finalize(context);
914+
Finalize(context, interrupt);
915915
D_ASSERT(TaskFinished());
916916
break;
917917
case WindowGroupStage::GETDATA:
918918
D_ASSERT(!TaskFinished());
919-
GetData(context, result);
919+
GetData(context, result, interrupt);
920920
break;
921921
default:
922922
throw InternalException("Invalid window source state.");
@@ -928,7 +928,7 @@ void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &r
928928
}
929929
}
930930

931-
void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &result) {
931+
void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &result, InterruptState &interrupt) {
932932
D_ASSERT(window_hash_group->GetStage() == WindowGroupStage::GETDATA);
933933

934934
window_hash_group->UpdateScanner(scanner, task->begin_idx);
@@ -953,7 +953,7 @@ void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &resul
953953
eval_chunk.Reset();
954954
eval_exec.Execute(input_chunk, eval_chunk);
955955
}
956-
executor.Evaluate(context, position, eval_chunk, result, lstate, gstate);
956+
executor.Evaluate(context, position, eval_chunk, result, lstate, gstate, interrupt);
957957
}
958958
output_chunk.SetCardinality(input_chunk);
959959
output_chunk.Verify();
@@ -1036,14 +1036,14 @@ OperatorPartitionData PhysicalWindow::GetPartitionData(ExecutionContext &context
10361036
}
10371037

10381038
SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &chunk,
1039-
OperatorSourceInput &input) const {
1040-
auto &gsource = input.global_state.Cast<WindowGlobalSourceState>();
1041-
auto &lsource = input.local_state.Cast<WindowLocalSourceState>();
1039+
OperatorSourceInput &source) const {
1040+
auto &gsource = source.global_state.Cast<WindowGlobalSourceState>();
1041+
auto &lsource = source.local_state.Cast<WindowLocalSourceState>();
10421042

10431043
while (gsource.HasUnfinishedTasks() && chunk.size() == 0) {
10441044
if (!lsource.TaskFinished() || lsource.TryAssignTask()) {
10451045
try {
1046-
lsource.ExecuteTask(context, chunk);
1046+
lsource.ExecuteTask(context, chunk, source.interrupt_state);
10471047
} catch (...) {
10481048
gsource.stopped = true;
10491049
throw;
@@ -1057,7 +1057,7 @@ SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &c
10571057
} else {
10581058
// there are more tasks available, but we can't execute them yet
10591059
// block the source
1060-
return gsource.BlockSource(guard, input.interrupt_state);
1060+
return gsource.BlockSource(guard, source.interrupt_state);
10611061
}
10621062
}
10631063
}

src/duckdb/src/execution/sample/base_reservoir_sample.cpp

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,6 @@
33

44
namespace duckdb {
55

6-
double BaseReservoirSampling::GetMinWeightFromTuplesSeen(idx_t rows_seen_total) {
7-
// this function was obtained using https://mycurvefit.com. Inputting multiple x, y values into
8-
// The
9-
switch (rows_seen_total) {
10-
case 0:
11-
return 0;
12-
case 1:
13-
return 0.000161;
14-
case 2:
15-
return 0.530136;
16-
case 3:
17-
return 0.693454;
18-
default: {
19-
return (0.99 - 0.355 * std::exp(-0.07 * static_cast<double>(rows_seen_total)));
20-
}
21-
}
22-
}
23-
246
BaseReservoirSampling::BaseReservoirSampling(int64_t seed) : random(seed) {
257
next_index_to_sample = 0;
268
min_weight_threshold = 0;
@@ -73,7 +55,7 @@ void BaseReservoirSampling::SetNextEntry() {
7355
//! since all our weights are 1 (uniform sampling), we can just determine the amount of elements to skip
7456
min_weight_threshold = t_w;
7557
min_weighted_entry_index = min_key.second;
76-
next_index_to_sample = MaxValue<idx_t>(1, idx_t(round(x_w)));
58+
next_index_to_sample = idx_t(ceil(x_w));
7759
num_entries_to_skip_b4_next_sample = 0;
7860
}
7961

@@ -118,15 +100,33 @@ void BaseReservoirSampling::UpdateMinWeightThreshold() {
118100
min_weight_threshold = 1;
119101
}
120102

103+
// Generate top k order statistics from n Uniform(0, 1) samples in O(k) time
104+
// This method leverages two key properties of order statistics from a Uniform(0, 1) distribution:
105+
// 1. The maximum of n independent Uniform(0, 1) samples, denoted as U(n), follows a Beta(n, 1) distribution.
106+
// 2. U(n-i) / U(n-i+1) ~ Beta(n-i, 1)
107+
// So we can use a recursive approach to generate the top k order statistics
108+
// (See: https://www.math.ntu.edu.tw/~hchen/teaching/LargeSample/notes/noteorder.pdf)
109+
static vector<double> GenerateTopKFromUniform(ReservoirRNG &random, idx_t n, idx_t k) {
110+
vector<double> top_k_values(k);
111+
double current_bound = 1.0;
112+
for (idx_t i = 0; i < k; i++) {
113+
// generate a sample from Beta(n - i, 1)
114+
double beta = std::pow(random.NextRandom(), 1.0 / double(n - i));
115+
current_bound *= beta;
116+
top_k_values[i] = current_bound;
117+
}
118+
return top_k_values;
119+
}
120+
121121
void BaseReservoirSampling::FillWeights(SelectionVector &sel, idx_t &sel_size) {
122122
if (!reservoir_weights.empty()) {
123123
return;
124124
}
125125
D_ASSERT(reservoir_weights.empty());
126-
auto num_entries_seen_normalized = num_entries_seen_total / FIXED_SAMPLE_SIZE;
127-
auto min_weight = GetMinWeightFromTuplesSeen(num_entries_seen_normalized);
126+
auto weights = GenerateTopKFromUniform(random, num_entries_seen_total, sel_size);
127+
std::shuffle(weights.begin(), weights.end(), random);
128128
for (idx_t i = 0; i < sel_size; i++) {
129-
auto weight = random.NextRandom(min_weight, 1);
129+
auto weight = weights[i];
130130
reservoir_weights.emplace(-weight, i);
131131
}
132132
D_ASSERT(reservoir_weights.size() <= sel_size);

src/duckdb/src/function/table/version/pragma_version.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#ifndef DUCKDB_PATCH_VERSION
2-
#define DUCKDB_PATCH_VERSION "0-dev3028"
2+
#define DUCKDB_PATCH_VERSION "0-dev3047"
33
#endif
44
#ifndef DUCKDB_MINOR_VERSION
55
#define DUCKDB_MINOR_VERSION 4
@@ -8,10 +8,10 @@
88
#define DUCKDB_MAJOR_VERSION 1
99
#endif
1010
#ifndef DUCKDB_VERSION
11-
#define DUCKDB_VERSION "v1.4.0-dev3028"
11+
#define DUCKDB_VERSION "v1.4.0-dev3047"
1212
#endif
1313
#ifndef DUCKDB_SOURCE_ID
14-
#define DUCKDB_SOURCE_ID "a8206a211f"
14+
#define DUCKDB_SOURCE_ID "129b1fe55e"
1515
#endif
1616
#include "duckdb/function/table/system_functions.hpp"
1717
#include "duckdb/main/database.hpp"

src/duckdb/src/function/window/window_aggregate_function.cpp

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ WindowAggregateExecutor::GetLocalState(ExecutionContext &context, const WindowEx
131131

132132
void WindowAggregateExecutor::Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk,
133133
const idx_t input_idx, WindowExecutorGlobalState &gstate,
134-
WindowExecutorLocalState &lstate) const {
134+
WindowExecutorLocalState &lstate, InterruptState &interrupt) const {
135135
auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
136136
auto &lastate = lstate.Cast<WindowAggregateExecutorLocalState>();
137137
auto &filter_sel = lastate.filter_sel;
@@ -147,9 +147,9 @@ void WindowAggregateExecutor::Sink(ExecutionContext &context, DataChunk &sink_ch
147147
D_ASSERT(aggregator);
148148
auto &gestate = *gastate.gsink;
149149
auto &lestate = *lastate.aggregator_state;
150-
aggregator->Sink(context, gestate, lestate, sink_chunk, coll_chunk, input_idx, filtering, filtered);
150+
aggregator->Sink(context, gestate, lestate, sink_chunk, coll_chunk, input_idx, filtering, filtered, interrupt);
151151

152-
WindowExecutor::Sink(context, sink_chunk, coll_chunk, input_idx, gstate, lstate);
152+
WindowExecutor::Sink(context, sink_chunk, coll_chunk, input_idx, gstate, lstate, interrupt);
153153
}
154154

155155
static void ApplyWindowStats(const WindowBoundary &boundary, FrameDelta &delta, BaseStatistics *base, bool is_start) {
@@ -216,8 +216,9 @@ static void ApplyWindowStats(const WindowBoundary &boundary, FrameDelta &delta,
216216
}
217217

218218
void WindowAggregateExecutor::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate,
219-
WindowExecutorLocalState &lstate, CollectionPtr collection) const {
220-
WindowExecutor::Finalize(context, gstate, lstate, collection);
219+
WindowExecutorLocalState &lstate, CollectionPtr collection,
220+
InterruptState &interrupt) const {
221+
WindowExecutor::Finalize(context, gstate, lstate, collection, interrupt);
221222

222223
auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
223224
auto &gsink = gastate.gsink;
@@ -239,20 +240,20 @@ void WindowAggregateExecutor::Finalize(ExecutionContext &context, WindowExecutor
239240
ApplyWindowStats(wexpr.end, stats[1], base, false);
240241

241242
auto &lastate = lstate.Cast<WindowAggregateExecutorLocalState>();
242-
aggregator->Finalize(context, *gsink, *lastate.aggregator_state, collection, stats);
243+
aggregator->Finalize(context, *gsink, *lastate.aggregator_state, collection, stats, interrupt);
243244
}
244245

245246
void WindowAggregateExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate,
246247
WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result,
247-
idx_t count, idx_t row_idx) const {
248+
idx_t count, idx_t row_idx, InterruptState &interrupt) const {
248249
auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
249250
auto &lastate = lstate.Cast<WindowAggregateExecutorLocalState>();
250251
auto &gsink = gastate.gsink;
251252
D_ASSERT(aggregator);
252253

253254
auto &agg_state = *lastate.aggregator_state;
254255

255-
aggregator->Evaluate(context, *gsink, agg_state, lastate.bounds, result, count, row_idx);
256+
aggregator->Evaluate(context, *gsink, agg_state, lastate.bounds, result, count, row_idx, interrupt);
256257
}
257258

258259
} // namespace duckdb

src/duckdb/src/function/window/window_aggregator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ void WindowAggregatorLocalState::Sink(ExecutionContext &context, WindowAggregato
4242

4343
void WindowAggregator::Sink(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate,
4444
DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx,
45-
optional_ptr<SelectionVector> filter_sel, idx_t filtered) {
45+
optional_ptr<SelectionVector> filter_sel, idx_t filtered, InterruptState &interrupt) {
4646
auto &gastate = gstate.Cast<WindowAggregatorGlobalState>();
4747
auto &lastate = lstate.Cast<WindowAggregatorLocalState>();
4848
lastate.Sink(context, gastate, sink_chunk, coll_chunk, input_idx);
@@ -80,7 +80,7 @@ void WindowAggregatorLocalState::Finalize(ExecutionContext &context, WindowAggre
8080
}
8181

8282
void WindowAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate,
83-
CollectionPtr collection, const FrameStats &stats) {
83+
CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt) {
8484
auto &gasink = gstate.Cast<WindowAggregatorGlobalState>();
8585
auto &lastate = lstate.Cast<WindowAggregatorLocalState>();
8686
lastate.Finalize(context, gasink, collection);

src/duckdb/src/function/window/window_constant_aggregator.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ unique_ptr<WindowAggregatorState> WindowConstantAggregator::GetGlobalState(Clien
208208

209209
void WindowConstantAggregator::Sink(ExecutionContext &context, WindowAggregatorState &gsink,
210210
WindowAggregatorState &lstate, DataChunk &sink_chunk, DataChunk &coll_chunk,
211-
idx_t input_idx, optional_ptr<SelectionVector> filter_sel, idx_t filtered) {
211+
idx_t input_idx, optional_ptr<SelectionVector> filter_sel, idx_t filtered,
212+
InterruptState &interrupt) {
212213
auto &lastate = lstate.Cast<WindowConstantAggregatorLocalState>();
213214

214215
lastate.Sink(context, sink_chunk, coll_chunk, input_idx, filter_sel, filtered);
@@ -300,7 +301,7 @@ void WindowConstantAggregatorLocalState::Sink(ExecutionContext &context, DataChu
300301

301302
void WindowConstantAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gstate,
302303
WindowAggregatorState &lstate, CollectionPtr collection,
303-
const FrameStats &stats) {
304+
const FrameStats &stats, InterruptState &interrupt) {
304305
auto &gastate = gstate.Cast<WindowConstantAggregatorGlobalState>();
305306
auto &lastate = lstate.Cast<WindowConstantAggregatorLocalState>();
306307

@@ -320,7 +321,7 @@ unique_ptr<WindowAggregatorState> WindowConstantAggregator::GetLocalState(const
320321

321322
void WindowConstantAggregator::Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink,
322323
WindowAggregatorState &lstate, const DataChunk &bounds, Vector &result,
323-
idx_t count, idx_t row_idx) const {
324+
idx_t count, idx_t row_idx, InterruptState &interrupt) const {
324325
auto &gasink = gsink.Cast<WindowConstantAggregatorGlobalState>();
325326
const auto &partition_offsets = gasink.partition_offsets;
326327
const auto &results = *gasink.results;

0 commit comments

Comments
 (0)