Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions src/duckdb/src/common/csv_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CSVWriterState::CSVWriterState()
}

CSVWriterState::CSVWriterState(ClientContext &context, idx_t flush_size_p)
: flush_size(flush_size_p), stream(make_uniq<MemoryStream>(Allocator::Get(context))) {
: flush_size(flush_size_p), stream(make_uniq<MemoryStream>(Allocator::Get(context), flush_size)) {
}

CSVWriterState::CSVWriterState(DatabaseInstance &db, idx_t flush_size_p)
Expand Down Expand Up @@ -198,18 +198,6 @@ void CSVWriter::ResetInternal(optional_ptr<CSVWriterState> local_state) {
bytes_written = 0;
}

unique_ptr<CSVWriterState> CSVWriter::InitializeLocalWriteState(ClientContext &context, idx_t flush_size) {
auto res = make_uniq<CSVWriterState>(context, flush_size);
res->stream = make_uniq<MemoryStream>();
return res;
}

unique_ptr<CSVWriterState> CSVWriter::InitializeLocalWriteState(DatabaseInstance &db, idx_t flush_size) {
auto res = make_uniq<CSVWriterState>(db, flush_size);
res->stream = make_uniq<MemoryStream>();
return res;
}

idx_t CSVWriter::BytesWritten() {
if (shared) {
lock_guard<mutex> flock(lock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,11 @@ void UnscentedKalmanFilter::UpdateInternal(double measured_progress) {
}

// Ensure progress stays in bounds
x[0] = std::max(0.0, std::min(1.0, x[0]));
x[0] = std::max(0.0, std::min(scale_factor, x[0]));
}

double UnscentedKalmanFilter::GetProgress() const {
return x[0];
return x[0] / scale_factor;
}

double UnscentedKalmanFilter::GetVelocity() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ scalar_function_t GetStringDecompressFunctionSwitch(const LogicalType &input_typ
case LogicalTypeId::UHUGEINT:
return GetStringDecompressFunction<uhugeint_t>(input_type);
case LogicalTypeId::HUGEINT:
return GetStringCompressFunction<hugeint_t>(input_type);
return GetStringDecompressFunction<hugeint_t>(input_type);
default:
throw InternalException("Unexpected type in GetStringDecompressFunctionSwitch");
}
Expand Down
32 changes: 28 additions & 4 deletions src/duckdb/src/function/table/copy_csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,31 @@ struct GlobalWriteCSVData : public GlobalFunctionData {
return writer.FileSize();
}

unique_ptr<CSVWriterState> GetLocalState(ClientContext &context, const idx_t flush_size) {
{
lock_guard<mutex> guard(local_state_lock);
if (!local_states.empty()) {
auto result = std::move(local_states.back());
local_states.pop_back();
return result;
}
}
auto result = make_uniq<CSVWriterState>(context, flush_size);
result->require_manual_flush = true;
return result;
}

void StoreLocalState(unique_ptr<CSVWriterState> lstate) {
lock_guard<mutex> guard(local_state_lock);
lstate->Reset();
local_states.push_back(std::move(lstate));
}

CSVWriter writer;

private:
mutex local_state_lock;
vector<unique_ptr<CSVWriterState>> local_states;
};

static unique_ptr<LocalFunctionData> WriteCSVInitializeLocal(ExecutionContext &context, FunctionData &bind_data) {
Expand Down Expand Up @@ -371,9 +395,7 @@ CopyFunctionExecutionMode WriteCSVExecutionMode(bool preserve_insertion_order, b
// Prepare Batch
//===--------------------------------------------------------------------===//
struct WriteCSVBatchData : public PreparedBatchData {
explicit WriteCSVBatchData(ClientContext &context, const idx_t flush_size)
: writer_local_state(make_uniq<CSVWriterState>(context, flush_size)) {
writer_local_state->require_manual_flush = true;
explicit WriteCSVBatchData(unique_ptr<CSVWriterState> writer_state) : writer_local_state(std::move(writer_state)) {
}

//! The thread-local buffer to write data into
Expand All @@ -397,7 +419,8 @@ unique_ptr<PreparedBatchData> WriteCSVPrepareBatch(ClientContext &context, Funct
auto &global_state = gstate.Cast<GlobalWriteCSVData>();

// write CSV chunks to the batch data
auto batch = make_uniq<WriteCSVBatchData>(context, NextPowerOfTwo(collection->SizeInBytes()));
auto local_writer_state = global_state.GetLocalState(context, NextPowerOfTwo(collection->SizeInBytes()));
auto batch = make_uniq<WriteCSVBatchData>(std::move(local_writer_state));
for (auto &chunk : collection->Chunks()) {
WriteCSVChunkInternal(global_state.writer, *batch->writer_local_state, cast_chunk, chunk, executor);
}
Expand All @@ -412,6 +435,7 @@ void WriteCSVFlushBatch(ClientContext &context, FunctionData &bind_data, GlobalF
auto &csv_batch = batch.Cast<WriteCSVBatchData>();
auto &global_state = gstate.Cast<GlobalWriteCSVData>();
global_state.writer.Flush(*csv_batch.writer_local_state);
global_state.StoreLocalState(std::move(csv_batch.writer_local_state));
}

//===--------------------------------------------------------------------===//
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "2-dev86"
#define DUCKDB_PATCH_VERSION "2-dev99"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 4
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.4.2-dev86"
#define DUCKDB_VERSION "v1.4.2-dev99"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "d13e53bbb6"
#define DUCKDB_SOURCE_ID "62a68d6f7b"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
3 changes: 0 additions & 3 deletions src/duckdb/src/include/duckdb/common/csv_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ class CSVWriter {
//! Closes the writer, optionally writes a postfix
void Close();

unique_ptr<CSVWriterState> InitializeLocalWriteState(ClientContext &context, idx_t flush_size);
unique_ptr<CSVWriterState> InitializeLocalWriteState(DatabaseInstance &db, idx_t flush_size);

vector<unique_ptr<Expression>> string_casts;

idx_t BytesWritten();
Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/src/include/duckdb/parser/query_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class QueryNode {
virtual void Serialize(Serializer &serializer) const;
static unique_ptr<QueryNode> Deserialize(Deserializer &deserializer);

//! TEMPORARY BUG FIX WORKAROUND: extract elements from the CommonTableExpressionMap and construct CTENodes
static void ExtractCTENodes(unique_ptr<QueryNode> &query_node);

protected:
//! Copy base QueryNode properties from another expression to this one,
//! used in Copy method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ class DeserializedStatementVerifier : public StatementVerifier {
optional_ptr<case_insensitive_map_t<BoundParameterData>> parameters);
static unique_ptr<StatementVerifier> Create(const SQLStatement &statement,
optional_ptr<case_insensitive_map_t<BoundParameterData>> parameters);

public:
// TEMPORARY FIX: work-around for CTE serialization for v1.4.X
bool RequireEquality() const override {
return false;
}
};

} // namespace duckdb
80 changes: 80 additions & 0 deletions src/duckdb/src/parser/query_node/cte_node.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "duckdb/parser/query_node/cte_node.hpp"
#include "duckdb/common/serializer/serializer.hpp"
#include "duckdb/common/serializer/deserializer.hpp"
#include "duckdb/parser/statement/select_statement.hpp"

namespace duckdb {

Expand Down Expand Up @@ -39,4 +40,83 @@ unique_ptr<QueryNode> CTENode::Copy() const {
return std::move(result);
}

// TEMPORARY BUGFIX WARNING - none of this code should make it into main - this is a temporary work-around for v1.4
// TEMPORARY BUGFIX START
// the below code fixes backwards and forwards compatibility of CTEs with the somewhat broken version of CTEs in v1.4
// all of this code has been made obsolete with the CTE binding rework
void QueryNode::ExtractCTENodes(unique_ptr<QueryNode> &query_node) {
if (query_node->cte_map.map.empty()) {
return;
}
vector<unique_ptr<CTENode>> materialized_ctes;
for (auto &cte : query_node->cte_map.map) {
auto &cte_entry = cte.second;
auto mat_cte = make_uniq<CTENode>();
mat_cte->ctename = cte.first;
mat_cte->query = cte_entry->query->node->Copy();
mat_cte->aliases = cte_entry->aliases;
mat_cte->materialized = cte_entry->materialized;
materialized_ctes.push_back(std::move(mat_cte));
}

auto root = std::move(query_node);
while (!materialized_ctes.empty()) {
unique_ptr<CTENode> node_result;
node_result = std::move(materialized_ctes.back());
node_result->cte_map = root->cte_map.Copy();
node_result->child = std::move(root);
root = std::move(node_result);
materialized_ctes.pop_back();
}
query_node = std::move(root);
}

void CTENode::Serialize(Serializer &serializer) const {
if (materialized == CTEMaterialize::CTE_MATERIALIZE_NEVER) {
// for non-materialized CTEs - don't serialize CTENode
// older DuckDB versions only expect a CTENode to be there for materialized CTEs
child->Serialize(serializer);
return;
}
QueryNode::Serialize(serializer);
serializer.WritePropertyWithDefault<string>(200, "cte_name", ctename);
serializer.WritePropertyWithDefault<unique_ptr<QueryNode>>(201, "query", query);
serializer.WritePropertyWithDefault<unique_ptr<QueryNode>>(202, "child", child);
serializer.WritePropertyWithDefault<vector<string>>(203, "aliases", aliases);
}

// In QueryNode::ExtractCTENodes we create a bunch of CTENodes from the CommonTableExpressionMap in the QueryNode
// however, we might ALSO have a CTENode present depending on how the serialization is set up
// if we ended up creating duplicate CTE nodes in QueryNode::ExtractCTENodes - this ends up de-duplicating them again
void EraseDuplicateCTE(unique_ptr<QueryNode> &node, const string &ctename) {
if (node->type != QueryNodeType::CTE_NODE) {
// not a CTE
return;
}
auto &cte_node = node->Cast<CTENode>();
if (cte_node.ctename == ctename) {
// duplicate CTE - erase this CTE node
node = std::move(cte_node.child);
EraseDuplicateCTE(node, ctename);
} else {
// not a duplicate - recurse into child
EraseDuplicateCTE(cte_node.child, ctename);
}
}

unique_ptr<QueryNode> CTENode::Deserialize(Deserializer &deserializer) {
auto result = duckdb::unique_ptr<CTENode>(new CTENode());
deserializer.ReadPropertyWithDefault<string>(200, "cte_name", result->ctename);
deserializer.ReadPropertyWithDefault<unique_ptr<QueryNode>>(201, "query", result->query);
deserializer.ReadPropertyWithDefault<unique_ptr<QueryNode>>(202, "child", result->child);
deserializer.ReadPropertyWithDefault<vector<string>>(203, "aliases", result->aliases);
// v1.4.0 and v1.4.1 wrote this property - deserialize it for BC with these versions
deserializer.ReadPropertyWithExplicitDefault<CTEMaterialize>(204, "materialized", result->materialized,
CTEMaterialize::CTE_MATERIALIZE_DEFAULT);
EraseDuplicateCTE(result->child, result->ctename);
return std::move(result);
}
// TEMPORARY BUGFIX WARNING - none of this code should make it into main - this is a temporary work-around for v1.4
// TEMPORARY BUGFIX END

} // namespace duckdb
28 changes: 16 additions & 12 deletions src/duckdb/src/planner/binder/statement/bind_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ unique_ptr<MergeIntoStatement> Binder::GenerateMergeInto(InsertStatement &stmt,
auto storage_info = table.GetStorageInfo(context);
auto &columns = table.GetColumns();
// set up the columns on which to join
vector<string> distinct_on_columns;
vector<vector<string>> all_distinct_on_columns;
if (on_conflict_info.indexed_columns.empty()) {
// When omitting the conflict target, we derive the join columns from the primary key/unique constraints
// traverse the primary key/unique constraints
Expand All @@ -292,6 +292,7 @@ unique_ptr<MergeIntoStatement> Binder::GenerateMergeInto(InsertStatement &stmt,

vector<unique_ptr<ParsedExpression>> and_children;
auto &indexed_columns = index.column_set;
vector<string> distinct_on_columns;
for (auto &column : columns.Physical()) {
if (!indexed_columns.count(column.Physical().index)) {
continue;
Expand All @@ -303,6 +304,7 @@ unique_ptr<MergeIntoStatement> Binder::GenerateMergeInto(InsertStatement &stmt,
and_children.push_back(std::move(new_condition));
distinct_on_columns.push_back(column.Name());
}
all_distinct_on_columns.push_back(std::move(distinct_on_columns));
if (and_children.empty()) {
continue;
}
Expand Down Expand Up @@ -377,7 +379,7 @@ unique_ptr<MergeIntoStatement> Binder::GenerateMergeInto(InsertStatement &stmt,
throw BinderException("The specified columns as conflict target are not referenced by a UNIQUE/PRIMARY KEY "
"CONSTRAINT or INDEX");
}
distinct_on_columns = on_conflict_info.indexed_columns;
all_distinct_on_columns.push_back(on_conflict_info.indexed_columns);
merge_into->using_columns = std::move(on_conflict_info.indexed_columns);
}

Expand Down Expand Up @@ -445,17 +447,19 @@ unique_ptr<MergeIntoStatement> Binder::GenerateMergeInto(InsertStatement &stmt,
}
}
// push DISTINCT ON(unique_columns)
auto distinct_stmt = make_uniq<SelectStatement>();
auto select_node = make_uniq<SelectNode>();
auto distinct = make_uniq<DistinctModifier>();
for (auto &col : distinct_on_columns) {
distinct->distinct_on_targets.push_back(make_uniq<ColumnRefExpression>(col));
for (auto &distinct_on_columns : all_distinct_on_columns) {
auto distinct_stmt = make_uniq<SelectStatement>();
auto select_node = make_uniq<SelectNode>();
auto distinct = make_uniq<DistinctModifier>();
for (auto &col : distinct_on_columns) {
distinct->distinct_on_targets.push_back(make_uniq<ColumnRefExpression>(col));
}
select_node->modifiers.push_back(std::move(distinct));
select_node->select_list.push_back(make_uniq<StarExpression>());
select_node->from_table = std::move(source);
distinct_stmt->node = std::move(select_node);
source = make_uniq<SubqueryRef>(std::move(distinct_stmt), "excluded");
}
select_node->modifiers.push_back(std::move(distinct));
select_node->select_list.push_back(make_uniq<StarExpression>());
select_node->from_table = std::move(source);
distinct_stmt->node = std::move(select_node);
source = make_uniq<SubqueryRef>(std::move(distinct_stmt), "excluded");

merge_into->source = std::move(source);

Expand Down
24 changes: 0 additions & 24 deletions src/duckdb/src/planner/binder/tableref/bind_basetableref.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,30 +326,6 @@ unique_ptr<BoundTableRef> Binder::Bind(BaseTableRef &ref) {

// The view may contain CTEs, but maybe only in the cte_map, so we need create CTE nodes for them
auto query = view_catalog_entry.GetQuery().Copy();
auto &select_stmt = query->Cast<SelectStatement>();

vector<unique_ptr<CTENode>> materialized_ctes;
for (auto &cte : select_stmt.node->cte_map.map) {
auto &cte_entry = cte.second;
auto mat_cte = make_uniq<CTENode>();
mat_cte->ctename = cte.first;
mat_cte->query = cte_entry->query->node->Copy();
mat_cte->aliases = cte_entry->aliases;
mat_cte->materialized = cte_entry->materialized;
materialized_ctes.push_back(std::move(mat_cte));
}

auto root = std::move(select_stmt.node);
while (!materialized_ctes.empty()) {
unique_ptr<CTENode> node_result;
node_result = std::move(materialized_ctes.back());
node_result->cte_map = root->cte_map.Copy();
node_result->child = std::move(root);
root = std::move(node_result);
materialized_ctes.pop_back();
}
select_stmt.node = std::move(root);

SubqueryRef subquery(unique_ptr_cast<SQLStatement, SelectStatement>(std::move(query)));

subquery.alias = ref.alias;
Expand Down
20 changes: 1 addition & 19 deletions src/duckdb/src/storage/serialization/serialize_query_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,10 @@ unique_ptr<QueryNode> QueryNode::Deserialize(Deserializer &deserializer) {
}
result->modifiers = std::move(modifiers);
result->cte_map = std::move(cte_map);
ExtractCTENodes(result);
return result;
}

void CTENode::Serialize(Serializer &serializer) const {
QueryNode::Serialize(serializer);
serializer.WritePropertyWithDefault<string>(200, "cte_name", ctename);
serializer.WritePropertyWithDefault<unique_ptr<QueryNode>>(201, "query", query);
serializer.WritePropertyWithDefault<unique_ptr<QueryNode>>(202, "child", child);
serializer.WritePropertyWithDefault<vector<string>>(203, "aliases", aliases);
serializer.WritePropertyWithDefault<CTEMaterialize>(204, "materialized", materialized, CTEMaterialize::CTE_MATERIALIZE_DEFAULT);
}

unique_ptr<QueryNode> CTENode::Deserialize(Deserializer &deserializer) {
auto result = duckdb::unique_ptr<CTENode>(new CTENode());
deserializer.ReadPropertyWithDefault<string>(200, "cte_name", result->ctename);
deserializer.ReadPropertyWithDefault<unique_ptr<QueryNode>>(201, "query", result->query);
deserializer.ReadPropertyWithDefault<unique_ptr<QueryNode>>(202, "child", result->child);
deserializer.ReadPropertyWithDefault<vector<string>>(203, "aliases", result->aliases);
deserializer.ReadPropertyWithExplicitDefault<CTEMaterialize>(204, "materialized", result->materialized, CTEMaterialize::CTE_MATERIALIZE_DEFAULT);
return std::move(result);
}

void RecursiveCTENode::Serialize(Serializer &serializer) const {
QueryNode::Serialize(serializer);
serializer.WritePropertyWithDefault<string>(200, "cte_name", ctename);
Expand Down