Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
#include "duckdb/execution/index/art/art_scanner.hpp"
#include "duckdb/execution/index/art/node.hpp"
#include "duckdb/execution/index/bound_index.hpp"
#include "duckdb/execution/index/unbound_index.hpp"
#include "duckdb/execution/operator/csv_scanner/csv_option.hpp"
#include "duckdb/execution/operator/csv_scanner/csv_state.hpp"
#include "duckdb/execution/reservoir_sample.hpp"
Expand Down Expand Up @@ -707,6 +708,24 @@ BlockState EnumUtil::FromString<BlockState>(const char *value) {
return static_cast<BlockState>(StringUtil::StringToEnum(GetBlockStateValues(), 2, "BlockState", value));
}

const StringUtil::EnumStringLiteral *GetBufferedIndexReplayValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(BufferedIndexReplay::INSERT_ENTRY), "INSERT_ENTRY" },
{ static_cast<uint32_t>(BufferedIndexReplay::DEL_ENTRY), "DEL_ENTRY" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<BufferedIndexReplay>(BufferedIndexReplay value) {
return StringUtil::EnumToString(GetBufferedIndexReplayValues(), 2, "BufferedIndexReplay", static_cast<uint32_t>(value));
}

template<>
BufferedIndexReplay EnumUtil::FromString<BufferedIndexReplay>(const char *value) {
return static_cast<BufferedIndexReplay>(StringUtil::StringToEnum(GetBufferedIndexReplayValues(), 2, "BufferedIndexReplay", value));
}

const StringUtil::EnumStringLiteral *GetCAPIResultSetTypeValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(CAPIResultSetType::CAPI_RESULT_TYPE_NONE), "CAPI_RESULT_TYPE_NONE" },
Expand Down
67 changes: 51 additions & 16 deletions src/duckdb/src/common/enums/compression_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,60 @@ vector<string> ListCompressionTypes(void) {
return compression_types;
}

bool CompressionTypeIsDeprecated(CompressionType compression_type, optional_ptr<StorageManager> storage_manager) {
vector<CompressionType> types({CompressionType::COMPRESSION_PATAS, CompressionType::COMPRESSION_CHIMP});
if (storage_manager) {
if (storage_manager->GetStorageVersion() >= 5) {
//! NOTE: storage_manager is an optional_ptr because it's called from ForceCompressionSetting, which doesn't
//! have guaranteed access to a StorageManager The introduction of DICT_FSST deprecates Dictionary and FSST
//! compression methods
types.emplace_back(CompressionType::COMPRESSION_DICTIONARY);
types.emplace_back(CompressionType::COMPRESSION_FSST);
} else {
types.emplace_back(CompressionType::COMPRESSION_DICT_FSST);
}
namespace {
struct CompressionMethodRequirements {
CompressionType type;
optional_idx minimum_storage_version;
optional_idx maximum_storage_version;
};
} // namespace

CompressionAvailabilityResult CompressionTypeIsAvailable(CompressionType compression_type,
optional_ptr<StorageManager> storage_manager) {
//! Max storage compatibility
vector<CompressionMethodRequirements> candidates({{CompressionType::COMPRESSION_PATAS, optional_idx(), 0},
{CompressionType::COMPRESSION_CHIMP, optional_idx(), 0},
{CompressionType::COMPRESSION_DICTIONARY, 0, 4},
{CompressionType::COMPRESSION_FSST, 0, 4},
{CompressionType::COMPRESSION_DICT_FSST, 5, optional_idx()}});

optional_idx current_storage_version;
if (storage_manager && storage_manager->HasStorageVersion()) {
current_storage_version = storage_manager->GetStorageVersion();
}
for (auto &type : types) {
if (type == compression_type) {
return true;
for (auto &candidate : candidates) {
auto &type = candidate.type;
if (type != compression_type) {
continue;
}
auto &min = candidate.minimum_storage_version;
auto &max = candidate.maximum_storage_version;

if (!min.IsValid()) {
//! Used to signal: always deprecated
return CompressionAvailabilityResult::Deprecated();
}

if (!current_storage_version.IsValid()) {
//! Can't determine in this call whether it's available or not, default to available
return CompressionAvailabilityResult();
}

auto current_version = current_storage_version.GetIndex();
D_ASSERT(min.IsValid());
if (min.GetIndex() > current_version) {
//! Minimum required storage version is higher than the current storage version, this method isn't available
//! yet
return CompressionAvailabilityResult::NotAvailableYet();
}
if (max.IsValid() && max.GetIndex() < current_version) {
//! Maximum supported storage version is lower than the current storage version, this method is no longer
//! available
return CompressionAvailabilityResult::Deprecated();
}
return CompressionAvailabilityResult();
}
return false;
return CompressionAvailabilityResult();
}

CompressionType CompressionTypeFromString(const string &str) {
Expand Down
9 changes: 7 additions & 2 deletions src/duckdb/src/common/exception/binder_exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ BinderException BinderException::ColumnNotFound(const string &name, const vector
extra_info["name"] = name;
if (!similar_bindings.empty()) {
extra_info["candidates"] = StringUtil::Join(similar_bindings, ",");
return BinderException(
StringUtil::Format("Referenced column \"%s\" not found in FROM clause!%s", name, candidate_str),
extra_info);
} else {
return BinderException(
StringUtil::Format("Referenced column \"%s\" was not found because the FROM clause is missing", name),
extra_info);
}
return BinderException(
StringUtil::Format("Referenced column \"%s\" not found in FROM clause!%s", name, candidate_str), extra_info);
}

BinderException BinderException::NoMatchingFunction(const string &catalog_name, const string &schema_name,
Expand Down
9 changes: 6 additions & 3 deletions src/duckdb/src/execution/index/art/art.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,9 @@ ErrorData ART::Insert(IndexLock &l, DataChunk &chunk, Vector &row_ids, IndexAppe
if (keys[i].Empty()) {
continue;
}
D_ASSERT(ARTOperator::Lookup(*this, tree, keys[i], 0));
auto leaf = ARTOperator::Lookup(*this, tree, keys[i], 0);
D_ASSERT(leaf);
D_ASSERT(ARTOperator::LookupInLeaf(*this, *leaf, row_id_keys[i]));
}
#endif
return ErrorData();
Expand Down Expand Up @@ -602,8 +604,9 @@ void ART::Delete(IndexLock &state, DataChunk &input, Vector &row_ids) {
continue;
}
auto leaf = ARTOperator::Lookup(*this, tree, keys[i], 0);
if (leaf && leaf->GetType() == NType::LEAF_INLINED) {
D_ASSERT(leaf->GetRowId() != row_id_keys[i].GetRowId());
if (leaf) {
auto contains_row_id = ARTOperator::LookupInLeaf(*this, *leaf, row_id_keys[i]);
D_ASSERT(!contains_row_id);
}
}
#endif
Expand Down
53 changes: 32 additions & 21 deletions src/duckdb/src/execution/index/bound_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,28 +154,39 @@ string BoundIndex::AppendRowError(DataChunk &input, idx_t index) {
return error;
}

void BoundIndex::ApplyBufferedAppends(const vector<LogicalType> &table_types, ColumnDataCollection &buffered_appends,
void BoundIndex::ApplyBufferedReplays(const vector<LogicalType> &table_types,
vector<BufferedIndexData> &buffered_replays,
const vector<StorageIndex> &mapped_column_ids) {
IndexAppendInfo index_append_info(IndexAppendMode::INSERT_DUPLICATES, nullptr);

ColumnDataScanState state;
buffered_appends.InitializeScan(state);

DataChunk scan_chunk;
buffered_appends.InitializeScanChunk(scan_chunk);
DataChunk table_chunk;
table_chunk.InitializeEmpty(table_types);

while (buffered_appends.Scan(state, scan_chunk)) {
for (idx_t i = 0; i < scan_chunk.ColumnCount() - 1; i++) {
auto col_id = mapped_column_ids[i].GetPrimaryIndex();
table_chunk.data[col_id].Reference(scan_chunk.data[i]);
}
table_chunk.SetCardinality(scan_chunk.size());

auto error = Append(table_chunk, scan_chunk.data.back(), index_append_info);
if (error.HasError()) {
throw InternalException("error while applying buffered appends: " + error.Message());
for (auto &replay : buffered_replays) {
ColumnDataScanState state;
auto &buffered_data = *replay.data;
buffered_data.InitializeScan(state);

DataChunk scan_chunk;
buffered_data.InitializeScanChunk(scan_chunk);
DataChunk table_chunk;
table_chunk.InitializeEmpty(table_types);

while (buffered_data.Scan(state, scan_chunk)) {
for (idx_t i = 0; i < scan_chunk.ColumnCount() - 1; i++) {
auto col_id = mapped_column_ids[i].GetPrimaryIndex();
table_chunk.data[col_id].Reference(scan_chunk.data[i]);
}
table_chunk.SetCardinality(scan_chunk.size());

switch (replay.type) {
case BufferedIndexReplay::INSERT_ENTRY: {
IndexAppendInfo index_append_info(IndexAppendMode::INSERT_DUPLICATES, nullptr);
auto error = Append(table_chunk, scan_chunk.data.back(), index_append_info);
if (error.HasError()) {
throw InternalException("error while applying buffered appends: " + error.Message());
}
continue;
}
case BufferedIndexReplay::DEL_ENTRY: {
Delete(table_chunk, scan_chunk.data.back());
}
}
}
}
}
Expand Down
29 changes: 20 additions & 9 deletions src/duckdb/src/execution/index/unbound_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

namespace duckdb {

BufferedIndexData::BufferedIndexData(BufferedIndexReplay replay_type, unique_ptr<ColumnDataCollection> data_p)
: type(replay_type), data(std::move(data_p)) {
}

UnboundIndex::UnboundIndex(unique_ptr<CreateInfo> create_info, IndexStorageInfo storage_info_p,
TableIOManager &table_io_manager, AttachedDatabase &db)
: Index(create_info->Cast<CreateIndexInfo>().column_ids, table_io_manager, db), create_info(std::move(create_info)),
Expand Down Expand Up @@ -35,26 +39,33 @@ void UnboundIndex::CommitDrop() {
}
}

void UnboundIndex::BufferChunk(DataChunk &chunk, Vector &row_ids, const vector<StorageIndex> &mapped_column_ids_p) {
void UnboundIndex::BufferChunk(DataChunk &index_column_chunk, Vector &row_ids,
const vector<StorageIndex> &mapped_column_ids_p, BufferedIndexReplay replay_type) {
D_ASSERT(!column_ids.empty());
auto types = chunk.GetTypes();
auto types = index_column_chunk.GetTypes(); // column types
types.push_back(LogicalType::ROW_TYPE);

if (!buffered_appends) {
auto &allocator = Allocator::Get(db);
buffered_appends = make_uniq<ColumnDataCollection>(allocator, types);
auto &allocator = Allocator::Get(db);

BufferedIndexData buffered_data(replay_type, make_uniq<ColumnDataCollection>(allocator, types));

//! First time we are buffering data, canonical column_id mapping is stored.
//! This should be a sorted list of all the physical offsets of Indexed columns on this table.
if (mapped_column_ids.empty()) {
mapped_column_ids = mapped_column_ids_p;
}
D_ASSERT(mapped_column_ids == mapped_column_ids_p);

// Combined chunk has all the indexed columns and rowids.
DataChunk combined_chunk;
combined_chunk.InitializeEmpty(types);
for (idx_t i = 0; i < chunk.ColumnCount(); i++) {
combined_chunk.data[i].Reference(chunk.data[i]);
for (idx_t i = 0; i < index_column_chunk.ColumnCount(); i++) {
combined_chunk.data[i].Reference(index_column_chunk.data[i]);
}
combined_chunk.data.back().Reference(row_ids);
combined_chunk.SetCardinality(chunk.size());
buffered_appends->Append(combined_chunk);
combined_chunk.SetCardinality(index_column_chunk.size());
buffered_data.data->Append(combined_chunk);
buffered_replays.emplace_back(std::move(buffered_data));
}

} // namespace duckdb
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "2-dev199"
#define DUCKDB_PATCH_VERSION "2-dev241"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 4
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.4.2-dev199"
#define DUCKDB_VERSION "v1.4.2-dev241"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "9ea6e07a29"
#define DUCKDB_SOURCE_ID "73c0d0db15"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
8 changes: 8 additions & 0 deletions src/duckdb/src/include/duckdb/common/enum_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ enum class BlockIteratorStateType : int8_t;

enum class BlockState : uint8_t;

enum class BufferedIndexReplay : uint8_t;

enum class CAPIResultSetType : uint8_t;

enum class CSVState : uint8_t;
Expand Down Expand Up @@ -528,6 +530,9 @@ const char* EnumUtil::ToChars<BlockIteratorStateType>(BlockIteratorStateType val
template<>
const char* EnumUtil::ToChars<BlockState>(BlockState value);

template<>
const char* EnumUtil::ToChars<BufferedIndexReplay>(BufferedIndexReplay value);

template<>
const char* EnumUtil::ToChars<CAPIResultSetType>(CAPIResultSetType value);

Expand Down Expand Up @@ -1150,6 +1155,9 @@ BlockIteratorStateType EnumUtil::FromString<BlockIteratorStateType>(const char *
template<>
BlockState EnumUtil::FromString<BlockState>(const char *value);

template<>
BufferedIndexReplay EnumUtil::FromString<BufferedIndexReplay>(const char *value);

template<>
CAPIResultSetType EnumUtil::FromString<CAPIResultSetType>(const char *value);

Expand Down
44 changes: 42 additions & 2 deletions src/duckdb/src/include/duckdb/common/enums/compression_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,48 @@ enum class CompressionType : uint8_t {
COMPRESSION_COUNT // This has to stay the last entry of the type!
};

bool CompressionTypeIsDeprecated(CompressionType compression_type,
optional_ptr<StorageManager> storage_manager = nullptr);
struct CompressionAvailabilityResult {
private:
enum class UnavailableReason : uint8_t {
AVAILABLE,
//! Introduced later, not available to this version
NOT_AVAILABLE_YET,
//! Used to be available, but isnt anymore
DEPRECATED
};

public:
CompressionAvailabilityResult() = default;
static CompressionAvailabilityResult Deprecated() {
return CompressionAvailabilityResult(UnavailableReason::DEPRECATED);
}
static CompressionAvailabilityResult NotAvailableYet() {
return CompressionAvailabilityResult(UnavailableReason::NOT_AVAILABLE_YET);
}

public:
bool IsAvailable() const {
return reason == UnavailableReason::AVAILABLE;
}
bool IsDeprecated() {
D_ASSERT(!IsAvailable());
return reason == UnavailableReason::DEPRECATED;
}
bool IsNotAvailableYet() {
D_ASSERT(!IsAvailable());
return reason == UnavailableReason::NOT_AVAILABLE_YET;
}

private:
explicit CompressionAvailabilityResult(UnavailableReason reason) : reason(reason) {
}

public:
UnavailableReason reason = UnavailableReason::AVAILABLE;
};

CompressionAvailabilityResult CompressionTypeIsAvailable(CompressionType compression_type,
optional_ptr<StorageManager> storage_manager = nullptr);
vector<string> ListCompressionTypes(void);
CompressionType CompressionTypeFromString(const string &str);
string CompressionTypeToString(CompressionType type);
Expand Down
Loading