Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/duckdb/src/catalog/default/default_table_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ FROM histogram_values(source, col_name, bin_count := bin_count, technique := tec
{DEFAULT_SCHEMA, "duckdb_logs_parsed", {"log_type"}, {}, R"(
SELECT * EXCLUDE (message), UNNEST(parse_duckdb_log_message(log_type, message))
FROM duckdb_logs(denormalized_table=1)
WHERE type = log_type
WHERE type ILIKE log_type
)"},
{nullptr, nullptr, {nullptr}, {{nullptr, nullptr}}, nullptr}
};
Expand Down
6 changes: 4 additions & 2 deletions src/duckdb/src/function/scalar/system/parse_log_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ void ParseLogMessageFunction(DataChunk &args, ExpressionState &state, Vector &re
} // namespace

ScalarFunction ParseLogMessage::GetFunction() {
return ScalarFunction({LogicalType::VARCHAR, LogicalType::VARCHAR}, LogicalType::ANY, ParseLogMessageFunction,
ParseLogMessageBind, nullptr, nullptr, nullptr, LogicalType(LogicalTypeId::INVALID));
auto fun = ScalarFunction({LogicalType::VARCHAR, LogicalType::VARCHAR}, LogicalType::ANY, ParseLogMessageFunction,
ParseLogMessageBind, nullptr, nullptr, nullptr, LogicalType(LogicalTypeId::INVALID));
fun.errors = FunctionErrors::CAN_THROW_RUNTIME_ERROR;
return fun;
}

} // namespace duckdb
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "2-dev266"
#define DUCKDB_PATCH_VERSION "2-dev279"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 4
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.4.2-dev266"
#define DUCKDB_VERSION "v1.4.2-dev279"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "2d69f075ee"
#define DUCKDB_SOURCE_ID "783f08ffd8"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
9 changes: 9 additions & 0 deletions src/duckdb/src/include/duckdb/main/settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,15 @@ struct DebugSkipCheckpointOnCommitSetting {
static constexpr SetScope DefaultScope = SetScope::GLOBAL;
};

struct DebugVerifyBlocksSetting {
using RETURN_TYPE = bool;
static constexpr const char *Name = "debug_verify_blocks";
static constexpr const char *Description = "DEBUG SETTING: verify block metadata during checkpointing";
static constexpr const char *InputType = "BOOLEAN";
static constexpr const char *DefaultValue = "false";
static constexpr SetScope DefaultScope = SetScope::GLOBAL;
};

struct DebugVerifyVectorSetting {
using RETURN_TYPE = DebugVectorVerification;
static constexpr const char *Name = "debug_verify_vector";
Expand Down
5 changes: 5 additions & 0 deletions src/duckdb/src/include/duckdb/storage/block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ struct MetaBlockPointer {
return block_pointer == rhs.block_pointer && offset == rhs.offset;
}

friend std::ostream &operator<<(std::ostream &os, const MetaBlockPointer &obj) {
return os << "{block_id: " << obj.GetBlockId() << " index: " << obj.GetBlockIndex() << " offset: " << obj.offset
<< "}";
}

void Serialize(Serializer &serializer) const;
static MetaBlockPointer Deserialize(Deserializer &source);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class MetadataManager {
//! Flush all blocks to disk
void Flush();

bool BlockHasBeenCleared(const MetaBlockPointer &ptr);

void MarkBlocksAsModified();
void ClearModifiedBlocks(const vector<MetaBlockPointer> &pointers);

Expand Down
8 changes: 6 additions & 2 deletions src/duckdb/src/include/duckdb/storage/table/row_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ struct RowGroupWriteInfo {
struct RowGroupWriteData {
vector<unique_ptr<ColumnCheckpointState>> states;
vector<BaseStatistics> statistics;
vector<MetaBlockPointer> existing_pointers;
bool reuse_existing_metadata_blocks = false;
vector<idx_t> existing_extra_metadata_blocks;
};

class RowGroup : public SegmentBase<RowGroup> {
Expand Down Expand Up @@ -94,7 +95,10 @@ class RowGroup : public SegmentBase<RowGroup> {
return collection.get();
}
//! Returns the list of meta block pointers used by the columns
vector<MetaBlockPointer> GetColumnPointers();
vector<idx_t> GetOrComputeExtraMetadataBlocks(bool force_compute = false);

const vector<MetaBlockPointer> &GetColumnStartPointers() const;

//! Returns the list of meta block pointers used by the deletes
const vector<MetaBlockPointer> &GetDeletesPointers() const {
return deletes_pointers;
Expand Down
11 changes: 6 additions & 5 deletions src/duckdb/src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ static const ConfigurationOption internal_options[] = {
DUCKDB_LOCAL(DebugForceExternalSetting),
DUCKDB_SETTING(DebugForceNoCrossProductSetting),
DUCKDB_SETTING(DebugSkipCheckpointOnCommitSetting),
DUCKDB_SETTING(DebugVerifyBlocksSetting),
DUCKDB_SETTING_CALLBACK(DebugVerifyVectorSetting),
DUCKDB_SETTING_CALLBACK(DebugWindowModeSetting),
DUCKDB_GLOBAL(DefaultBlockSizeSetting),
Expand Down Expand Up @@ -177,12 +178,12 @@ static const ConfigurationOption internal_options[] = {
DUCKDB_GLOBAL(ZstdMinStringLengthSetting),
FINAL_SETTING};

static const ConfigurationAlias setting_aliases[] = {DUCKDB_SETTING_ALIAS("memory_limit", 83),
DUCKDB_SETTING_ALIAS("null_order", 33),
DUCKDB_SETTING_ALIAS("profiling_output", 102),
DUCKDB_SETTING_ALIAS("user", 116),
static const ConfigurationAlias setting_aliases[] = {DUCKDB_SETTING_ALIAS("memory_limit", 84),
DUCKDB_SETTING_ALIAS("null_order", 34),
DUCKDB_SETTING_ALIAS("profiling_output", 103),
DUCKDB_SETTING_ALIAS("user", 117),
DUCKDB_SETTING_ALIAS("wal_autocheckpoint", 20),
DUCKDB_SETTING_ALIAS("worker_threads", 115),
DUCKDB_SETTING_ALIAS("worker_threads", 116),
FINAL_ALIAS};

vector<ConfigurationOption> DBConfig::GetOptions() {
Expand Down
14 changes: 8 additions & 6 deletions src/duckdb/src/storage/checkpoint/table_data_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
#include "duckdb/common/serializer/binary_serializer.hpp"
#include "duckdb/main/database.hpp"
#include "duckdb/main/settings.hpp"
#include "duckdb/parallel/task_scheduler.hpp"
#include "duckdb/storage/table/column_checkpoint_state.hpp"
#include "duckdb/storage/table/table_statistics.hpp"
Expand Down Expand Up @@ -119,15 +120,16 @@ void SingleFileTableDataWriter::FinalizeTable(const TableStatistics &global_stat
}
auto index_storage_infos = info.GetIndexes().SerializeToDisk(context, options);

#ifdef DUCKDB_BLOCK_VERIFICATION
for (auto &entry : index_storage_infos) {
for (auto &allocator : entry.allocator_infos) {
for (auto &block : allocator.block_pointers) {
checkpoint_manager.verify_block_usage_count[block.block_id]++;
auto debug_verify_blocks = DBConfig::GetSetting<DebugVerifyBlocksSetting>(GetDatabase());
if (debug_verify_blocks) {
for (auto &entry : index_storage_infos) {
for (auto &allocator : entry.allocator_infos) {
for (auto &block : allocator.block_pointers) {
checkpoint_manager.verify_block_usage_count[block.block_id]++;
}
}
}
}
#endif

// write empty block pointers for forwards compatibility
vector<BlockPointer> compat_block_pointers;
Expand Down
46 changes: 24 additions & 22 deletions src/duckdb/src/storage/checkpoint_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,33 +214,35 @@ void SingleFileCheckpointWriter::CreateCheckpoint() {
header.vector_size = STANDARD_VECTOR_SIZE;
block_manager.WriteHeader(context, header);

#ifdef DUCKDB_BLOCK_VERIFICATION
// extend verify_block_usage_count
auto metadata_info = storage_manager.GetMetadataInfo();
for (auto &info : metadata_info) {
verify_block_usage_count[info.block_id]++;
}
for (auto &entry_ref : catalog_entries) {
auto &entry = entry_ref.get();
if (entry.type == CatalogType::TABLE_ENTRY) {
auto &table = entry.Cast<DuckTableEntry>();
auto &storage = table.GetStorage();
auto segment_info = storage.GetColumnSegmentInfo();
for (auto &segment : segment_info) {
verify_block_usage_count[segment.block_id]++;
if (StringUtil::Contains(segment.segment_info, "Overflow String Block Ids: ")) {
auto overflow_blocks = StringUtil::Replace(segment.segment_info, "Overflow String Block Ids: ", "");
auto splits = StringUtil::Split(overflow_blocks, ", ");
for (auto &split : splits) {
auto overflow_block_id = std::stoll(split);
verify_block_usage_count[overflow_block_id]++;
auto debug_verify_blocks = DBConfig::GetSetting<DebugVerifyBlocksSetting>(db.GetDatabase());
if (debug_verify_blocks) {
// extend verify_block_usage_count
auto metadata_info = storage_manager.GetMetadataInfo();
for (auto &info : metadata_info) {
verify_block_usage_count[info.block_id]++;
}
for (auto &entry_ref : catalog_entries) {
auto &entry = entry_ref.get();
if (entry.type == CatalogType::TABLE_ENTRY) {
auto &table = entry.Cast<DuckTableEntry>();
auto &storage = table.GetStorage();
auto segment_info = storage.GetColumnSegmentInfo();
for (auto &segment : segment_info) {
verify_block_usage_count[segment.block_id]++;
if (StringUtil::Contains(segment.segment_info, "Overflow String Block Ids: ")) {
auto overflow_blocks =
StringUtil::Replace(segment.segment_info, "Overflow String Block Ids: ", "");
auto splits = StringUtil::Split(overflow_blocks, ", ");
for (auto &split : splits) {
auto overflow_block_id = std::stoll(split);
verify_block_usage_count[overflow_block_id]++;
}
}
}
}
}
block_manager.VerifyBlocks(verify_block_usage_count);
}
block_manager.VerifyBlocks(verify_block_usage_count);
#endif

if (debug_checkpoint_abort == CheckpointAbort::DEBUG_ABORT_BEFORE_TRUNCATE) {
throw FatalException("Checkpoint aborted before truncate because of PRAGMA checkpoint_abort flag");
Expand Down
12 changes: 12 additions & 0 deletions src/duckdb/src/storage/metadata/metadata_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,18 @@ void MetadataManager::ClearModifiedBlocks(const vector<MetaBlockPointer> &pointe
}
}

bool MetadataManager::BlockHasBeenCleared(const MetaBlockPointer &pointer) {
unique_lock<mutex> guard(block_lock);
auto block_id = pointer.GetBlockId();
auto block_index = pointer.GetBlockIndex();
auto entry = modified_blocks.find(block_id);
if (entry == modified_blocks.end()) {
throw InternalException("BlockHasBeenCleared - Block id %llu not found in modified_blocks", block_id);
}
auto &modified_list = entry->second;
return (modified_list & (1ULL << block_index)) == 0ULL;
}

vector<MetadataBlockInfo> MetadataManager::GetMetadataInfo() const {
vector<MetadataBlockInfo> result;
unique_lock<mutex> guard(block_lock);
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/storage/metadata/metadata_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ MetaBlockPointer MetadataWriter::GetMetaBlockPointer() {

void MetadataWriter::SetWrittenPointers(optional_ptr<vector<MetaBlockPointer>> written_pointers_p) {
written_pointers = written_pointers_p;
if (written_pointers && capacity > 0) {
if (written_pointers && capacity > 0 && offset < capacity) {
written_pointers->push_back(manager.GetDiskPointer(current_pointer));
}
}
Expand Down
70 changes: 40 additions & 30 deletions src/duckdb/src/storage/table/row_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
namespace duckdb {

RowGroup::RowGroup(RowGroupCollection &collection_p, idx_t start, idx_t count)
: SegmentBase<RowGroup>(start, count), collection(collection_p), version_info(nullptr), allocation_size(0),
row_id_is_loaded(false), has_changes(false) {
: SegmentBase<RowGroup>(start, count), collection(collection_p), version_info(nullptr), deletes_is_loaded(false),
allocation_size(0), row_id_is_loaded(false), has_changes(false) {
Verify();
}

RowGroup::RowGroup(RowGroupCollection &collection_p, RowGroupPointer pointer)
: SegmentBase<RowGroup>(pointer.row_start, pointer.tuple_count), collection(collection_p), version_info(nullptr),
allocation_size(0), row_id_is_loaded(false), has_changes(false) {
deletes_is_loaded(false), allocation_size(0), row_id_is_loaded(false), has_changes(false) {
// deserialize the columns
if (pointer.data_pointers.size() != collection_p.GetTypes().size()) {
throw IOException("Row group column count is unaligned with table column count. Corrupt file?");
Expand All @@ -45,7 +45,6 @@ RowGroup::RowGroup(RowGroupCollection &collection_p, RowGroupPointer pointer)
this->is_loaded[c] = false;
}
this->deletes_pointers = std::move(pointer.deletes_pointers);
this->deletes_is_loaded = false;
this->has_metadata_blocks = pointer.has_metadata_blocks;
this->extra_metadata_blocks = std::move(pointer.extra_metadata_blocks);

Expand All @@ -54,7 +53,7 @@ RowGroup::RowGroup(RowGroupCollection &collection_p, RowGroupPointer pointer)

RowGroup::RowGroup(RowGroupCollection &collection_p, PersistentRowGroupData &data)
: SegmentBase<RowGroup>(data.start, data.count), collection(collection_p), version_info(nullptr),
allocation_size(0), row_id_is_loaded(false), has_changes(false) {
deletes_is_loaded(false), allocation_size(0), row_id_is_loaded(false), has_changes(false) {
auto &block_manager = GetBlockManager();
auto &info = GetTableInfo();
auto &types = collection.get().GetTypes();
Expand Down Expand Up @@ -974,27 +973,15 @@ bool RowGroup::HasUnloadedDeletes() const {
return !deletes_is_loaded;
}

vector<MetaBlockPointer> RowGroup::GetColumnPointers() {
vector<MetaBlockPointer> result;
if (has_metadata_blocks) {
// we have the column metadata from the file itself - no need to deserialize metadata to fetch it
// read if from "column_pointers" and "extra_metadata_blocks"
for (auto block : column_pointers) {
block.offset = 0;
if (std::find(result.begin(), result.end(), block) != result.end()) {
continue;
}
result.push_back(block);
}
for (auto &block_pointer : extra_metadata_blocks) {
result.emplace_back(block_pointer, 0);
}
return result;
vector<idx_t> RowGroup::GetOrComputeExtraMetadataBlocks(bool force_compute) {
if (has_metadata_blocks && !force_compute) {
return extra_metadata_blocks;
}
if (column_pointers.empty()) {
// no pointers
return result;
return {};
}
vector<MetaBlockPointer> read_pointers;
// column_pointers stores the beginning of each column
// if columns are big - they may span multiple metadata blocks
// we need to figure out all blocks that this row group points to
Expand All @@ -1005,13 +992,25 @@ vector<MetaBlockPointer> RowGroup::GetColumnPointers() {
// for all but the last column pointer - we can just follow the linked list until we reach the last column
MetadataReader reader(metadata_manager, column_pointers[0]);
auto last_pointer = column_pointers[last_idx];
result = reader.GetRemainingBlocks(last_pointer);
read_pointers = reader.GetRemainingBlocks(last_pointer);
}
// for the last column we need to deserialize the column - because we don't know where it stops
auto &types = GetCollection().GetTypes();
MetadataReader reader(metadata_manager, column_pointers[last_idx], &result);
MetadataReader reader(metadata_manager, column_pointers[last_idx], &read_pointers);
ColumnData::Deserialize(GetBlockManager(), GetTableInfo(), last_idx, start, reader, types[last_idx]);
return result;

unordered_set<idx_t> result_as_set;
for (auto &ptr : read_pointers) {
result_as_set.emplace(ptr.block_pointer);
}
for (auto &ptr : column_pointers) {
result_as_set.erase(ptr.block_pointer);
}
return {result_as_set.begin(), result_as_set.end()};
}

const vector<MetaBlockPointer> &RowGroup::GetColumnStartPointers() const {
return column_pointers;
}

RowGroupWriteData RowGroup::WriteToDisk(RowGroupWriter &writer) {
Expand All @@ -1020,7 +1019,8 @@ RowGroupWriteData RowGroup::WriteToDisk(RowGroupWriter &writer) {
// we have existing metadata and the row group has not been changed
// re-use previous metadata
RowGroupWriteData result;
result.existing_pointers = GetColumnPointers();
result.reuse_existing_metadata_blocks = true;
result.existing_extra_metadata_blocks = GetOrComputeExtraMetadataBlocks();
return result;
}
auto &compression_types = writer.GetCompressionTypes();
Expand Down Expand Up @@ -1048,14 +1048,23 @@ RowGroupPointer RowGroup::Checkpoint(RowGroupWriteData write_data, RowGroupWrite
// construct the row group pointer and write the column meta data to disk
row_group_pointer.row_start = start;
row_group_pointer.tuple_count = count;
if (!write_data.existing_pointers.empty()) {
if (write_data.reuse_existing_metadata_blocks) {
// we are re-using the previous metadata
row_group_pointer.data_pointers = column_pointers;
row_group_pointer.has_metadata_blocks = has_metadata_blocks;
row_group_pointer.extra_metadata_blocks = extra_metadata_blocks;
row_group_pointer.has_metadata_blocks = true;
row_group_pointer.extra_metadata_blocks = write_data.existing_extra_metadata_blocks;
row_group_pointer.deletes_pointers = deletes_pointers;
metadata_manager->ClearModifiedBlocks(write_data.existing_pointers);
vector<MetaBlockPointer> extra_metadata_block_pointers;
extra_metadata_block_pointers.reserve(write_data.existing_extra_metadata_blocks.size());
for (auto &block_pointer : write_data.existing_extra_metadata_blocks) {
extra_metadata_block_pointers.emplace_back(block_pointer, 0);
}
metadata_manager->ClearModifiedBlocks(column_pointers);
metadata_manager->ClearModifiedBlocks(extra_metadata_block_pointers);
metadata_manager->ClearModifiedBlocks(deletes_pointers);
// remember metadata_blocks to avoid loading them on future checkpoints
has_metadata_blocks = true;
extra_metadata_blocks = row_group_pointer.extra_metadata_blocks;
return row_group_pointer;
}
D_ASSERT(write_data.states.size() == columns.size());
Expand Down Expand Up @@ -1120,6 +1129,7 @@ bool RowGroup::HasChanges() const {
// we have deletes
return true;
}
D_ASSERT(!deletes_is_loaded.load());
// check if any of the columns have changes
// avoid loading unloaded columns - unloaded columns can never have changes
for (idx_t c = 0; c < columns.size(); c++) {
Expand Down
Loading