Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/duckdb/src/common/arrow/physical_arrow_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@

namespace duckdb {

PhysicalOperator &PhysicalArrowCollector::Create(ClientContext &context, PreparedStatementData &data,
idx_t batch_size) {
unique_ptr<PhysicalOperator> PhysicalArrowCollector::Create(ClientContext &context, PreparedStatementData &data,
idx_t batch_size) {
auto &physical_plan = *data.physical_plan;
auto &root = physical_plan.Root();

if (!PhysicalPlanGenerator::PreserveInsertionOrder(context, root)) {
// Not an order-preserving plan: use the parallel materialized collector.
return physical_plan.Make<PhysicalArrowCollector>(data, true, batch_size);
return make_uniq<PhysicalArrowCollector>(physical_plan, data, true, batch_size);
}

if (!PhysicalPlanGenerator::UseBatchIndex(context, root)) {
// Order-preserving plan, and we cannot use the batch index: use single-threaded result collector.
return physical_plan.Make<PhysicalArrowCollector>(data, false, batch_size);
return make_uniq<PhysicalArrowCollector>(physical_plan, data, false, batch_size);
}

// Order-preserving plan, and we can use the batch index: use a batch collector.
return physical_plan.Make<PhysicalArrowBatchCollector>(data, batch_size);
return make_uniq<PhysicalArrowBatchCollector>(physical_plan, data, batch_size);
}

SinkResultType PhysicalArrowCollector::Sink(ExecutionContext &context, DataChunk &chunk,
Expand Down
18 changes: 18 additions & 0 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5138,6 +5138,24 @@ TimestampCastResult EnumUtil::FromString<TimestampCastResult>(const char *value)
return static_cast<TimestampCastResult>(StringUtil::StringToEnum(GetTimestampCastResultValues(), 5, "TimestampCastResult", value));
}

const StringUtil::EnumStringLiteral *GetTransactionInvalidationPolicyValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(TransactionInvalidationPolicy::STANDARD_POLICY), "STANDARD_POLICY" },
{ static_cast<uint32_t>(TransactionInvalidationPolicy::ALL_ERRORS_INVALIDATE_TRANSACTION), "ALL_ERRORS_INVALIDATE_TRANSACTION" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<TransactionInvalidationPolicy>(TransactionInvalidationPolicy value) {
return StringUtil::EnumToString(GetTransactionInvalidationPolicyValues(), 2, "TransactionInvalidationPolicy", static_cast<uint32_t>(value));
}

template<>
TransactionInvalidationPolicy EnumUtil::FromString<TransactionInvalidationPolicy>(const char *value) {
return static_cast<TransactionInvalidationPolicy>(StringUtil::StringToEnum(GetTransactionInvalidationPolicyValues(), 2, "TransactionInvalidationPolicy", value));
}

const StringUtil::EnumStringLiteral *GetTransactionModifierTypeValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(TransactionModifierType::TRANSACTION_DEFAULT_MODIFIER), "TRANSACTION_DEFAULT_MODIFIER" },
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ int64_t FileHandle::Write(void *buffer, idx_t nr_bytes) {

int64_t FileHandle::Write(QueryContext context, void *buffer, idx_t nr_bytes) {
if (context.GetClientContext() != nullptr) {
context.GetClientContext()->client_data->profiler->AddToCounter(MetricType::TOTAL_BYTES_READ, nr_bytes);
context.GetClientContext()->client_data->profiler->AddToCounter(MetricType::TOTAL_BYTES_WRITTEN, nr_bytes);
}

return file_system.Write(*this, buffer, UnsafeNumericCast<int64_t>(nr_bytes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,32 @@ PhysicalResultCollector::PhysicalResultCollector(PhysicalPlan &physical_plan, Pr
types = data.types;
}

PhysicalOperator &PhysicalResultCollector::GetResultCollector(ClientContext &context, PreparedStatementData &data) {
unique_ptr<PhysicalOperator> PhysicalResultCollector::GetResultCollector(ClientContext &context,
PreparedStatementData &data) {
auto &physical_plan = *data.physical_plan;
auto &root = physical_plan.Root();

if (!PhysicalPlanGenerator::PreserveInsertionOrder(context, root)) {
// Not an order-preserving plan: use the parallel materialized collector.
if (data.output_type == QueryResultOutputType::ALLOW_STREAMING) {
return physical_plan.Make<PhysicalBufferedCollector>(data, true);
return make_uniq<PhysicalBufferedCollector>(physical_plan, data, true);
}
return physical_plan.Make<PhysicalMaterializedCollector>(data, true);
return make_uniq<PhysicalMaterializedCollector>(physical_plan, data, true);
}

if (!PhysicalPlanGenerator::UseBatchIndex(context, root)) {
// Order-preserving plan, and we cannot use the batch index: use single-threaded result collector.
if (data.output_type == QueryResultOutputType::ALLOW_STREAMING) {
return physical_plan.Make<PhysicalBufferedCollector>(data, false);
return make_uniq<PhysicalBufferedCollector>(physical_plan, data, false);
}
return physical_plan.Make<PhysicalMaterializedCollector>(data, false);
return make_uniq<PhysicalMaterializedCollector>(physical_plan, data, false);
}

// Order-preserving plan, and we can use the batch index: use a batch collector.
if (data.output_type == QueryResultOutputType::ALLOW_STREAMING) {
return physical_plan.Make<PhysicalBufferedBatchCollector>(data);
return make_uniq<PhysicalBufferedBatchCollector>(physical_plan, data);
}
return physical_plan.Make<PhysicalBatchCollector>(data);
return make_uniq<PhysicalBatchCollector>(physical_plan, data);
}

vector<const_reference<PhysicalOperator>> PhysicalResultCollector::GetChildren() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ SourceResultType PhysicalTransaction::GetDataInternal(ExecutionContext &context,
if (info->modifier == TransactionModifierType::TRANSACTION_READ_ONLY) {
client.transaction.SetReadOnly();
}
client.transaction.SetInvalidationPolicy(info->invalidation_policy);
client.transaction.SetAutoRollback(info->auto_rollback);
if (Settings::Get<ImmediateTransactionModeSetting>(context.client)) {
// if immediate transaction mode is enabled then start all transactions immediately
auto databases = DatabaseManager::Get(client).GetDatabases(client);
Expand Down
4 changes: 3 additions & 1 deletion src/duckdb/src/function/scalar/list/list_resize.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "duckdb/common/operator/add.hpp"
#include "duckdb/common/types/data_chunk.hpp"
#include "duckdb/function/scalar/nested_functions.hpp"
#include "duckdb/function/scalar/list_functions.hpp"
Expand Down Expand Up @@ -41,7 +42,8 @@ static void ListResizeFunction(DataChunk &args, ExpressionState &, Vector &resul
auto new_size_idx = new_sizes_data.sel->get_index(row_idx);

if (lists_data.validity.RowIsValid(list_idx) && new_sizes_data.validity.RowIsValid(new_size_idx)) {
child_vector_size += new_size_entries[new_size_idx];
child_vector_size = AddOperatorOverflowCheck::Operation<idx_t, idx_t, idx_t>(
child_vector_size, new_size_entries[new_size_idx]);
}
}
ListVector::Reserve(result, child_vector_size);
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "1-dev332"
#define DUCKDB_PATCH_VERSION "1-dev399"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 5
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.5.1-dev332"
#define DUCKDB_VERSION "v1.5.1-dev399"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "31ea662805"
#define DUCKDB_SOURCE_ID "222ea44eb4"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class PhysicalArrowCollector : public PhysicalResultCollector {
}

public:
static PhysicalOperator &Create(ClientContext &context, PreparedStatementData &data, idx_t batch_size);
static unique_ptr<PhysicalOperator> Create(ClientContext &context, PreparedStatementData &data, idx_t batch_size);
SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const override;
SinkCombineResultType Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const override;
unique_ptr<QueryResult> GetResult(GlobalSinkState &state) const override;
Expand Down
8 changes: 8 additions & 0 deletions src/duckdb/src/include/duckdb/common/enum_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ enum class ThreadPinMode : uint8_t;

enum class TimestampCastResult : uint8_t;

enum class TransactionInvalidationPolicy : uint8_t;

enum class TransactionModifierType : uint8_t;

enum class TransactionType : uint8_t;
Expand Down Expand Up @@ -1142,6 +1144,9 @@ const char* EnumUtil::ToChars<ThreadPinMode>(ThreadPinMode value);
template<>
const char* EnumUtil::ToChars<TimestampCastResult>(TimestampCastResult value);

template<>
const char* EnumUtil::ToChars<TransactionInvalidationPolicy>(TransactionInvalidationPolicy value);

template<>
const char* EnumUtil::ToChars<TransactionModifierType>(TransactionModifierType value);

Expand Down Expand Up @@ -1848,6 +1853,9 @@ ThreadPinMode EnumUtil::FromString<ThreadPinMode>(const char *value);
template<>
TimestampCastResult EnumUtil::FromString<TimestampCastResult>(const char *value);

template<>
TransactionInvalidationPolicy EnumUtil::FromString<TransactionInvalidationPolicy>(const char *value);

template<>
TransactionModifierType EnumUtil::FromString<TransactionModifierType>(const char *value);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// duckdb/common/enums/current_transaction_state.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

namespace duckdb {

enum CurrentTransactionState { IN_ACTIVE_TRANSACTION, NOT_IN_ACTIVE_TRANSACTION };

} // namespace duckdb
2 changes: 2 additions & 0 deletions src/duckdb/src/include/duckdb/execution/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Executor {
static Executor &Get(ClientContext &context);

void Initialize(PhysicalOperator &physical_plan);
void Initialize(unique_ptr<PhysicalOperator> physical_plan);

void CancelTasks();
PendingExecutionResult ExecuteTask(bool dry_run = false);
Expand Down Expand Up @@ -147,6 +148,7 @@ class Executor {

private:
optional_ptr<PhysicalOperator> physical_plan;
unique_ptr<PhysicalOperator> owned_plan;

mutex executor_lock;
//! All pipelines of the query plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class PhysicalResultCollector : public PhysicalOperator {
vector<string> names;

public:
static PhysicalOperator &GetResultCollector(ClientContext &context, PreparedStatementData &data);
static unique_ptr<PhysicalOperator> GetResultCollector(ClientContext &context, PreparedStatementData &data);

public:
//! The final method used to fetch the query result from this operator
Expand Down
3 changes: 2 additions & 1 deletion src/duckdb/src/include/duckdb/main/client_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class ClientContext;
class PhysicalResultCollector;
class PreparedStatementData;

typedef std::function<PhysicalOperator &(ClientContext &context, PreparedStatementData &data)> get_result_collector_t;
typedef std::function<unique_ptr<PhysicalOperator>(ClientContext &context, PreparedStatementData &data)>
get_result_collector_t;

struct ClientConfig {
//! If the query profiler is enabled or not.
Expand Down
4 changes: 3 additions & 1 deletion src/duckdb/src/include/duckdb/main/client_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class ClientContext : public enable_shared_from_this<ClientContext> {

//! Extract the logical plan of a query
DUCKDB_API unique_ptr<LogicalOperator> ExtractPlan(const string &query);
DUCKDB_API void HandlePragmaStatements(vector<unique_ptr<SQLStatement>> &statements);
DUCKDB_API void PreprocessStatements(vector<unique_ptr<SQLStatement>> &statements);

//! Runs a function with a valid transaction context, potentially starting a transaction if the context is in auto
//! commit mode.
Expand Down Expand Up @@ -311,6 +311,8 @@ class ClientContext : public enable_shared_from_this<ClientContext> {
unique_ptr<SQLStatement> statement,
PendingQueryParameters parameters);

bool ErrorInvalidatesTransaction(ExceptionType type);

private:
//! Lock on using the ClientContext in parallel
mutex context_lock;
Expand Down
Loading
Loading