Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 67 additions & 6 deletions src/duckdb/extension/icu/icu_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
#include "duckdb/function/scalar_function.hpp"
#include "duckdb/main/config.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/main/database.hpp"
#include "duckdb/main/extension/extension_loader.hpp"
#include "duckdb/parser/parsed_data/create_collation_info.hpp"
#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp"
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
#include "duckdb/planner/expression/bound_function_expression.hpp"
#include "include/icu-current.hpp"
#include "include/icu-dateadd.hpp"
Expand All @@ -25,8 +22,6 @@
#include "include/icu_extension.hpp"
#include "unicode/calendar.h"
#include "unicode/coll.h"
#include "unicode/errorcode.h"
#include "unicode/sortkey.h"
#include "unicode/stringpiece.h"
#include "unicode/timezone.h"
#include "unicode/ucol.h"
Expand Down Expand Up @@ -209,14 +204,74 @@ static ScalarFunction GetICUCollateFunction(const string &collation, const strin
return result;
}

unique_ptr<icu::TimeZone> GetTimeZoneInternal(string &tz_str, vector<string> &candidates) {
unique_ptr<icu::TimeZone> GetKnownTimeZone(const string &tz_str) {
icu::StringPiece tz_name_utf8(tz_str);
const auto uid = icu::UnicodeString::fromUTF8(tz_name_utf8);
duckdb::unique_ptr<icu::TimeZone> tz(icu::TimeZone::createTimeZone(uid));
if (*tz != icu::TimeZone::getUnknown()) {
return tz;
}

return nullptr;
}

static string NormalizeTimeZone(const string &tz_str) {
if (GetKnownTimeZone(tz_str)) {
return tz_str;
}

// Map UTC±NN00 to Etc/UTC±N
do {
if (tz_str.size() <= 4) {
break;
}
if (tz_str.compare(0, 3, "UTC")) {
break;
}

idx_t pos = 3;
const auto sign = tz_str[pos++];
if (sign != '+' && sign != '-') {
break;
}

string mapped = "Etc/GMT";
mapped += sign;
const auto base_len = mapped.size();
for (; pos < tz_str.size(); ++pos) {
const auto digit = tz_str[pos];
// We could get fancy here and count colons and their locations, but I doubt anyone cares.
if (digit == '0' || digit == ':') {
continue;
}
if (!StringUtil::CharacterIsDigit(digit)) {
break;
}
mapped += digit;
}
if (pos < tz_str.size()) {
break;
}
// If we didn't add anything, then make it +0
if (mapped.size() == base_len) {
mapped.back() = '+';
mapped += '0';
}
// Final sanity check
if (GetKnownTimeZone(mapped)) {
return mapped;
}
} while (false);

return tz_str;
}

unique_ptr<icu::TimeZone> GetTimeZoneInternal(string &tz_str, vector<string> &candidates) {
auto tz = GetKnownTimeZone(tz_str);
if (tz) {
return tz;
}

// Try to be friendlier
// Go through all the zone names and look for a case insensitive match
// If we don't find one, make a suggestion
Expand Down Expand Up @@ -269,6 +324,7 @@ unique_ptr<icu::TimeZone> ICUHelpers::GetTimeZone(string &tz_str, string *error_

static void SetICUTimeZone(ClientContext &context, SetScope scope, Value &parameter) {
auto tz_str = StringValue::Get(parameter);
tz_str = NormalizeTimeZone(tz_str);
ICUHelpers::GetTimeZone(tz_str);
parameter = Value(tz_str);
}
Expand Down Expand Up @@ -405,6 +461,11 @@ static void LoadInternal(ExtensionLoader &loader) {
icu::UnicodeString tz_id;
std::string tz_string;
tz->getID(tz_id).toUTF8String(tz_string);
// If the environment TZ is invalid, look for some alternatives
tz_string = NormalizeTimeZone(tz_string);
if (!GetKnownTimeZone(tz_string)) {
tz_string = "UTC";
}
config.AddExtensionOption("TimeZone", "The current time zone", LogicalType::VARCHAR, Value(tz_string),
SetICUTimeZone);

Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "2-dev121"
#define DUCKDB_PATCH_VERSION "2-dev142"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 4
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.4.2-dev121"
#define DUCKDB_VERSION "v1.4.2-dev142"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "9069f5363c"
#define DUCKDB_SOURCE_ID "8d3a195080"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ class RelationManager {
//! Extract the set of relations referred to inside an expression
bool ExtractBindings(Expression &expression, unordered_set<idx_t> &bindings);
void AddRelation(LogicalOperator &op, optional_ptr<LogicalOperator> parent, const RelationStats &stats);

//! Add an unnest relation which can come from a logical unnest or a logical get which has an unnest function
void AddUnnestRelation(JoinOrderOptimizer &optimizer, LogicalOperator &op, LogicalOperator &input_op,
optional_ptr<LogicalOperator> parent, RelationStats &child_stats,
optional_ptr<LogicalOperator> limit_op,
vector<reference<LogicalOperator>> &datasource_filters);
void AddAggregateOrWindowRelation(LogicalOperator &op, optional_ptr<LogicalOperator> parent,
const RelationStats &stats, LogicalOperatorType op_type);
vector<unique_ptr<SingleJoinRelation>> GetRelations();
Expand Down
39 changes: 29 additions & 10 deletions src/duckdb/src/optimizer/join_order/relation_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ void RelationManager::AddRelation(LogicalOperator &op, optional_ptr<LogicalOpera
auto relation_id = relations.size();

auto table_indexes = op.GetTableIndex();
bool is_unnest_or_get_with_unnest = op.type == LogicalOperatorType::LOGICAL_UNNEST;
if (op.type == LogicalOperatorType::LOGICAL_GET) {
auto &get = op.Cast<LogicalGet>();
if (get.function.name == "unnest") {
is_unnest_or_get_with_unnest = true;
}
}
if (table_indexes.empty()) {
// relation represents a non-reorderable relation, most likely a join relation
// Get the tables referenced in the non-reorderable relation and add them to the relation mapping
Expand All @@ -65,7 +72,7 @@ void RelationManager::AddRelation(LogicalOperator &op, optional_ptr<LogicalOpera
D_ASSERT(relation_mapping.find(reference) == relation_mapping.end());
relation_mapping[reference] = relation_id;
}
} else if (op.type == LogicalOperatorType::LOGICAL_UNNEST) {
} else if (is_unnest_or_get_with_unnest) {
// logical unnest has a logical_unnest index, but other bindings can refer to
// columns that are not unnested.
auto bindings = op.GetColumnBindings();
Expand Down Expand Up @@ -182,6 +189,21 @@ static void ModifyStatsIfLimit(optional_ptr<LogicalOperator> limit_op, RelationS
}
}

void RelationManager::AddUnnestRelation(JoinOrderOptimizer &optimizer, LogicalOperator &op, LogicalOperator &input_op,
optional_ptr<LogicalOperator> parent, RelationStats &child_stats,
optional_ptr<LogicalOperator> limit_op,
vector<reference<LogicalOperator>> &datasource_filters) {
D_ASSERT(!op.children.empty());
auto child_optimizer = optimizer.CreateChildOptimizer();
op.children[0] = child_optimizer.Optimize(std::move(op.children[0]), &child_stats);
if (!datasource_filters.empty()) {
child_stats.cardinality = LossyNumericCast<idx_t>(static_cast<double>(child_stats.cardinality) *
RelationStatisticsHelper::DEFAULT_SELECTIVITY);
}
ModifyStatsIfLimit(limit_op.get(), child_stats);
AddRelation(input_op, parent, child_stats);
}

bool RelationManager::ExtractJoinRelations(JoinOrderOptimizer &optimizer, LogicalOperator &input_op,
vector<reference<LogicalOperator>> &filter_operators,
optional_ptr<LogicalOperator> parent) {
Expand Down Expand Up @@ -279,15 +301,7 @@ bool RelationManager::ExtractJoinRelations(JoinOrderOptimizer &optimizer, Logica
case LogicalOperatorType::LOGICAL_UNNEST: {
// optimize children of unnest
RelationStats child_stats;
auto child_optimizer = optimizer.CreateChildOptimizer();
op->children[0] = child_optimizer.Optimize(std::move(op->children[0]), &child_stats);
// the extracted cardinality should be set for window
if (!datasource_filters.empty()) {
child_stats.cardinality = LossyNumericCast<idx_t>(static_cast<double>(child_stats.cardinality) *
RelationStatisticsHelper::DEFAULT_SELECTIVITY);
}
ModifyStatsIfLimit(limit_op.get(), child_stats);
AddRelation(input_op, parent, child_stats);
AddUnnestRelation(optimizer, *op, input_op, parent, child_stats, limit_op, datasource_filters);
return true;
}
case LogicalOperatorType::LOGICAL_COMPARISON_JOIN: {
Expand Down Expand Up @@ -345,6 +359,11 @@ bool RelationManager::ExtractJoinRelations(JoinOrderOptimizer &optimizer, Logica
case LogicalOperatorType::LOGICAL_GET: {
// TODO: Get stats from a logical GET
auto &get = op->Cast<LogicalGet>();
if (get.function.name == "unnest" && !op->children.empty()) {
RelationStats child_stats;
AddUnnestRelation(optimizer, *op, input_op, parent, child_stats, limit_op, datasource_filters);
return true;
}
auto stats = RelationStatisticsHelper::ExtractGetStats(get, context);
// if there is another logical filter that could not be pushed down into the
// table scan, apply another selectivity.
Expand Down
41 changes: 29 additions & 12 deletions src/duckdb/src/storage/compression/zstd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class ZSTDCompressionState : public CompressionState {

public:
void ResetOutBuffer() {
D_ASSERT(GetCurrentOffset() <= GetWritableSpace(info));
out_buffer.dst = current_buffer_ptr;
out_buffer.pos = 0;

Expand Down Expand Up @@ -347,6 +348,7 @@ class ZSTDCompressionState : public CompressionState {
void InitializeVector() {
D_ASSERT(!in_vector);
if (vector_count + 1 >= total_vector_count) {
//! Last vector
vector_size = analyze_state->count - (ZSTD_VECTOR_SIZE * vector_count);
} else {
vector_size = ZSTD_VECTOR_SIZE;
Expand All @@ -355,6 +357,7 @@ class ZSTDCompressionState : public CompressionState {
current_offset = UnsafeNumericCast<page_offset_t>(
AlignValue<idx_t, sizeof(string_length_t)>(UnsafeNumericCast<idx_t>(current_offset)));
current_buffer_ptr = current_buffer->Ptr() + current_offset;
D_ASSERT(GetCurrentOffset() <= GetWritableSpace(info));
compressed_size = 0;
uncompressed_size = 0;

Expand Down Expand Up @@ -413,15 +416,11 @@ class ZSTDCompressionState : public CompressionState {
throw InvalidInputException("ZSTD Compression failed: %s",
duckdb_zstd::ZSTD_getErrorName(compress_result));
}
D_ASSERT(GetCurrentOffset() <= GetWritableSpace(info));
if (compress_result == 0) {
// Finished
break;
}
if (out_buffer.pos != out_buffer.size) {
throw InternalException("Expected ZSTD_compressStream2 to fully utilize the current buffer, but pos is "
"%d, while size is %d",
out_buffer.pos, out_buffer.size);
}
NewPage();
}
}
Expand Down Expand Up @@ -691,7 +690,7 @@ struct ZSTDScanState : public SegmentScanState {
explicit ZSTDScanState(ColumnSegment &segment)
: state(segment.GetSegmentState()->Cast<UncompressedStringSegmentState>()),
block_manager(segment.GetBlockManager()), buffer_manager(BufferManager::GetBufferManager(segment.db)),
segment_block_offset(segment.GetBlockOffset()) {
segment_block_offset(segment.GetBlockOffset()), segment(segment) {
decompression_context = duckdb_zstd::ZSTD_createDCtx();
segment_handle = buffer_manager.Pin(segment.block);

Expand Down Expand Up @@ -791,14 +790,23 @@ struct ZSTDScanState : public SegmentScanState {

auto vector_size = metadata.count;

auto string_lengths_size = (sizeof(string_length_t) * vector_size);
scan_state.string_lengths = reinterpret_cast<string_length_t *>(scan_state.current_buffer_ptr);
scan_state.current_buffer_ptr += (sizeof(string_length_t) * vector_size);
scan_state.current_buffer_ptr += string_lengths_size;

// Update the in_buffer to point to the start of the compressed data frame
idx_t current_offset = UnsafeNumericCast<idx_t>(scan_state.current_buffer_ptr - handle_start);
scan_state.in_buffer.src = scan_state.current_buffer_ptr;
scan_state.in_buffer.pos = 0;
scan_state.in_buffer.size = block_manager.GetBlockSize() - sizeof(block_id_t) - current_offset;
if (scan_state.metadata.block_offset + string_lengths_size + scan_state.metadata.compressed_size >
(segment.SegmentSize() - sizeof(block_id_t))) {
//! We know that the compressed size is too big to fit on the current page
scan_state.in_buffer.size =
MinValue(metadata.compressed_size, block_manager.GetBlockSize() - sizeof(block_id_t) - current_offset);
} else {
scan_state.in_buffer.size =
MinValue(metadata.compressed_size, block_manager.GetBlockSize() - current_offset);
}

// Initialize the context for streaming decompression
duckdb_zstd::ZSTD_DCtx_reset(decompression_context, duckdb_zstd::ZSTD_reset_session_only);
Expand Down Expand Up @@ -832,7 +840,7 @@ struct ZSTDScanState : public SegmentScanState {
scan_state.in_buffer.src = ptr;
scan_state.in_buffer.pos = 0;

idx_t page_size = block_manager.GetBlockSize() - sizeof(block_id_t);
idx_t page_size = segment.SegmentSize() - sizeof(block_id_t);
idx_t remaining_compressed_data = scan_state.metadata.compressed_size - scan_state.compressed_scan_count;
scan_state.in_buffer.size = MinValue<idx_t>(page_size, remaining_compressed_data);
}
Expand All @@ -842,25 +850,33 @@ struct ZSTDScanState : public SegmentScanState {
return;
}

auto &in_buffer = scan_state.in_buffer;
duckdb_zstd::ZSTD_outBuffer out_buffer;

out_buffer.dst = destination;
out_buffer.pos = 0;
out_buffer.size = uncompressed_length;

while (true) {
idx_t old_pos = scan_state.in_buffer.pos;
idx_t old_pos = in_buffer.pos;
size_t res = duckdb_zstd::ZSTD_decompressStream(
/* zds = */ decompression_context,
/* output =*/&out_buffer,
/* input =*/&scan_state.in_buffer);
scan_state.compressed_scan_count += scan_state.in_buffer.pos - old_pos;
/* input =*/&in_buffer);
scan_state.compressed_scan_count += in_buffer.pos - old_pos;
if (duckdb_zstd::ZSTD_isError(res)) {
throw InvalidInputException("ZSTD Decompression failed: %s", duckdb_zstd::ZSTD_getErrorName(res));
}
if (out_buffer.pos == out_buffer.size) {
//! Done decompressing the relevant portion
break;
}
if (!res) {
D_ASSERT(out_buffer.pos == out_buffer.size);
D_ASSERT(in_buffer.pos == in_buffer.size);
break;
}
D_ASSERT(in_buffer.pos == in_buffer.size);
// Did not fully decompress, it needs a new page to read from
LoadNextPageForVector(scan_state);
}
Expand Down Expand Up @@ -956,6 +972,7 @@ struct ZSTDScanState : public SegmentScanState {
idx_t segment_count;
//! The amount of tuples consumed
idx_t scanned_count = 0;
ColumnSegment &segment;

//! Buffer for skipping data
AllocatedData skip_buffer;
Expand Down