Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/duckdb/extension/json/json_functions/json_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ struct CreateJSONValue<uhugeint_t, string_t> {
};

template <class T>
inline yyjson_mut_val *CreateJSONValueFromJSON(yyjson_mut_doc *doc, const T &value) {
static inline yyjson_mut_val *CreateJSONValueFromJSON(yyjson_mut_doc *doc, const T &value) {
return nullptr; // This function should only be called with string_t as template
}

Expand Down Expand Up @@ -578,7 +578,8 @@ static void CreateValues(const StructNames &names, yyjson_mut_doc *doc, yyjson_m
case LogicalTypeId::TIMESTAMP_NS:
case LogicalTypeId::TIMESTAMP_MS:
case LogicalTypeId::TIMESTAMP_SEC:
case LogicalTypeId::UUID: {
case LogicalTypeId::UUID:
case LogicalTypeId::GEOMETRY: {
Vector string_vector(LogicalTypeId::VARCHAR, count);
VectorOperations::DefaultCast(value_v, string_vector, count);
TemplatedCreateValues<string_t, string_t>(doc, vals, string_vector, count);
Expand Down Expand Up @@ -616,7 +617,6 @@ static void CreateValues(const StructNames &names, yyjson_mut_doc *doc, yyjson_m
case LogicalTypeId::VALIDITY:
case LogicalTypeId::TABLE:
case LogicalTypeId::LAMBDA:
case LogicalTypeId::GEOMETRY: // TODO! Add support for GEOMETRY
throw InternalException("Unsupported type arrived at JSON create function");
}
}
Expand Down Expand Up @@ -789,7 +789,7 @@ static bool AnyToJSONCast(Vector &source, Vector &result, idx_t count, CastParam
return true;
}

BoundCastInfo AnyToJSONCastBind(BindCastInput &input, const LogicalType &source, const LogicalType &target) {
static BoundCastInfo AnyToJSONCastBind(BindCastInput &input, const LogicalType &source, const LogicalType &target) {
auto cast_data = make_uniq<NestedToJSONCastData>();
GetJSONType(cast_data->const_struct_names, source);
return BoundCastInfo(AnyToJSONCast, std::move(cast_data), JSONFunctionLocalState::InitCastLocalState);
Expand Down
9 changes: 5 additions & 4 deletions src/duckdb/extension/json/json_functions/json_transform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ static bool TransformObjectToMap(yyjson_val *objects[], yyjson_alc *alc, Vector
return success;
}

bool TransformToJSON(yyjson_val *vals[], yyjson_alc *alc, Vector &result, const idx_t count) {
static bool TransformToJSON(yyjson_val *vals[], yyjson_alc *alc, Vector &result, const idx_t count) {
auto data = FlatVector::GetData<string_t>(result);
auto &validity = FlatVector::Validity(result);
for (idx_t i = 0; i < count; i++) {
Expand All @@ -780,8 +780,8 @@ bool TransformToJSON(yyjson_val *vals[], yyjson_alc *alc, Vector &result, const
return true;
}

bool TransformValueIntoUnion(yyjson_val **vals, yyjson_alc *alc, Vector &result, const idx_t count,
JSONTransformOptions &options) {
static bool TransformValueIntoUnion(yyjson_val **vals, yyjson_alc *alc, Vector &result, const idx_t count,
JSONTransformOptions &options) {
auto type = result.GetType();

auto fields = UnionType::CopyMemberTypes(type);
Expand Down Expand Up @@ -923,6 +923,7 @@ bool JSONTransform::Transform(yyjson_val *vals[], yyjson_alc *alc, Vector &resul
case LogicalTypeId::TIMESTAMP_MS:
case LogicalTypeId::TIMESTAMP_SEC:
case LogicalTypeId::UUID:
case LogicalTypeId::GEOMETRY:
return TransformFromString(vals, result, count, options);
case LogicalTypeId::VARCHAR:
case LogicalTypeId::BLOB:
Expand Down Expand Up @@ -1024,7 +1025,7 @@ static bool JSONToAnyCast(Vector &source, Vector &result, idx_t count, CastParam
return success;
}

BoundCastInfo JSONToAnyCastBind(BindCastInput &input, const LogicalType &source, const LogicalType &target) {
static BoundCastInfo JSONToAnyCastBind(BindCastInput &input, const LogicalType &source, const LogicalType &target) {
return BoundCastInfo(JSONToAnyCast, nullptr, JSONFunctionLocalState::InitCastLocalState);
}

Expand Down
8 changes: 6 additions & 2 deletions src/duckdb/extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,10 @@ void ColumnReader::ApplyPendingSkips(data_ptr_t define_out, data_ptr_t repeat_ou
pending_skips = 0;

auto to_skip = num_values;
data_t skip_defines[STANDARD_VECTOR_SIZE] = {};
data_t skip_repeats[STANDARD_VECTOR_SIZE];
data_ptr_t skip_define_out = HasDefines() ? skip_defines : define_out;
data_ptr_t skip_repeat_out = HasRepeats() ? skip_repeats : repeat_out;
// start reading but do not apply skips (we are skipping now)
BeginRead(nullptr, nullptr);

Expand All @@ -785,9 +789,9 @@ void ColumnReader::ApplyPendingSkips(data_ptr_t define_out, data_ptr_t repeat_ou
to_skip -= skip_now;
continue;
}
const auto all_valid = PrepareRead(skip_now, define_out, repeat_out, 0);
const auto all_valid = PrepareRead(skip_now, skip_define_out, skip_repeat_out, 0);

const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(define_out);
const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(skip_define_out);
switch (encoding) {
case ColumnEncoding::DICTIONARY:
dictionary_decoder.Skip(define_ptr, skip_now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ struct ParquetMultiFileInfo : MultiFileReaderInterface {
shared_ptr<BaseFileReader> CreateReader(ClientContext &context, const OpenFileInfo &file,
BaseFileReaderOptions &options,
const MultiFileOptions &file_options) override;
unique_ptr<NodeStatistics> GetCardinality(const MultiFileBindData &bind_data, idx_t file_count) override;
unique_ptr<NodeStatistics> GetCardinality(ClientContext &context, const MultiFileBindData &bind_data,
idx_t file_count) override;
void GetVirtualColumns(ClientContext &context, MultiFileBindData &bind_data, virtual_column_map_t &result) override;
unique_ptr<MultiFileReaderInterface> Copy() override;
FileGlobInput GetGlobInput() override;
Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/extension/parquet/include/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ struct ParquetUnionData : public BaseUnionData {
}
~ParquetUnionData() override;

optional_idx TryGetCardinalityEstimate() const override;
unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, const string &name) override;

ParquetOptions options;
Expand Down Expand Up @@ -177,6 +178,8 @@ class ParquetReader : public BaseFileReader {

idx_t NumRows() const;
idx_t NumRowGroups() const;
idx_t GetFileSize() const;
idx_t GetDataSize() const;

const duckdb_parquet::FileMetaData *GetFileMetadata() const;
string static GetUniqueFileIdentifier(const duckdb_parquet::EncryptionAlgorithm &encryption_algorithm);
Expand Down
54 changes: 37 additions & 17 deletions src/duckdb/extension/parquet/parquet_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ class ParquetMetaDataOperator {
template <ParquetMetadataOperatorType OP_TYPE>
static unique_ptr<LocalTableFunctionState> InitLocal(ExecutionContext &context, TableFunctionInitInput &input,
GlobalTableFunctionState *global_state);
template <ParquetMetadataOperatorType OP_TYPE>
static void Function(ClientContext &context, TableFunctionInput &data_p, DataChunk &output);
static double Progress(ClientContext &context, const FunctionData *bind_data_p,
const GlobalTableFunctionState *global_state);

template <ParquetMetadataOperatorType OP_TYPE>
static void BindSchema(vector<LogicalType> &return_types, vector<string> &names);

static OperatorPartitionData GetPartitionData(ClientContext &context, TableFunctionGetPartitionInput &input);
};

struct ParquetMetadataGlobalState : public GlobalTableFunctionState {
Expand Down Expand Up @@ -112,6 +113,8 @@ struct ParquetMetadataGlobalState : public GlobalTableFunctionState {

unique_ptr<ParquetMetadataFilePaths> file_paths;
idx_t max_threads;
mutex lock;
idx_t current_file = 0;
};

struct ParquetMetadataLocalState : public LocalTableFunctionState {
Expand All @@ -120,13 +123,15 @@ struct ParquetMetadataLocalState : public LocalTableFunctionState {
bool file_exhausted = true;
idx_t row_idx = 0;
idx_t total_rows = 0;
optional_idx file_idx;

void Initialize(ClientContext &context, OpenFileInfo &file_info) {
void Initialize(ClientContext &context, OpenFileInfo &file_info, idx_t next_file_idx) {
ParquetOptions parquet_options(context);
reader = make_uniq<ParquetReader>(context, file_info, parquet_options);
processor->Initialize(context, *reader);
total_rows = processor->TotalRowCount(*reader);
row_idx = 0;
file_idx = next_file_idx;
file_exhausted = false;
}
};
Expand Down Expand Up @@ -1025,7 +1030,6 @@ unique_ptr<LocalTableFunctionState> ParquetMetaDataOperator::InitLocal(Execution
return unique_ptr_cast<LocalTableFunctionState, ParquetMetadataLocalState>(std::move(res));
}

template <ParquetMetadataOperatorType OP_TYPE>
void ParquetMetaDataOperator::Function(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
auto &global_state = data_p.global_state->Cast<ParquetMetadataGlobalState>();
auto &local_state = data_p.local_state->Cast<ParquetMetadataLocalState>();
Expand All @@ -1040,12 +1044,21 @@ void ParquetMetaDataOperator::Function(ClientContext &context, TableFunctionInpu
while (output_count < STANDARD_VECTOR_SIZE) {
// Check if we need a new file
if (local_state.file_exhausted) {
if (output_count > 0) {
// we already have rows - emit them first
break;
}
idx_t next_file_idx;
OpenFileInfo next_file;
if (!global_state.file_paths->NextFile(next_file)) {
break; // No more files to process
{
lock_guard<mutex> guard(global_state.lock);
if (!global_state.file_paths->NextFile(next_file)) {
break; // No more files to process
}
next_file_idx = global_state.current_file++;
}

local_state.Initialize(context, next_file);
local_state.Initialize(context, next_file, next_file_idx);
}

idx_t left_in_vector = STANDARD_VECTOR_SIZE - output_count;
Expand Down Expand Up @@ -1073,63 +1086,70 @@ void ParquetMetaDataOperator::Function(ClientContext &context, TableFunctionInpu
}
}

OperatorPartitionData ParquetMetaDataOperator::GetPartitionData(ClientContext &context,
TableFunctionGetPartitionInput &input) {
auto &local_state = input.local_state->Cast<ParquetMetadataLocalState>();
return OperatorPartitionData(local_state.file_idx.GetIndex());
}

double ParquetMetaDataOperator::Progress(ClientContext &context, const FunctionData *bind_data_p,
const GlobalTableFunctionState *global_state) {
auto &global_data = global_state->Cast<ParquetMetadataGlobalState>();
return global_data.GetProgress() * 100.0;
}

ParquetMetaDataFunction::ParquetMetaDataFunction()
: TableFunction("parquet_metadata", {LogicalType::VARCHAR},
ParquetMetaDataOperator::Function<ParquetMetadataOperatorType::META_DATA>,
: TableFunction("parquet_metadata", {LogicalType::VARCHAR}, ParquetMetaDataOperator::Function,
ParquetMetaDataOperator::Bind<ParquetMetadataOperatorType::META_DATA>,
ParquetMetaDataOperator::InitGlobal,
ParquetMetaDataOperator::InitLocal<ParquetMetadataOperatorType::META_DATA>) {
table_scan_progress = ParquetMetaDataOperator::Progress;
get_partition_data = ParquetMetaDataOperator::GetPartitionData;
}

ParquetSchemaFunction::ParquetSchemaFunction()
: TableFunction("parquet_schema", {LogicalType::VARCHAR},
ParquetMetaDataOperator::Function<ParquetMetadataOperatorType::SCHEMA>,
: TableFunction("parquet_schema", {LogicalType::VARCHAR}, ParquetMetaDataOperator::Function,
ParquetMetaDataOperator::Bind<ParquetMetadataOperatorType::SCHEMA>,
ParquetMetaDataOperator::InitGlobal,
ParquetMetaDataOperator::InitLocal<ParquetMetadataOperatorType::SCHEMA>) {
table_scan_progress = ParquetMetaDataOperator::Progress;
get_partition_data = ParquetMetaDataOperator::GetPartitionData;
}

ParquetKeyValueMetadataFunction::ParquetKeyValueMetadataFunction()
: TableFunction("parquet_kv_metadata", {LogicalType::VARCHAR},
ParquetMetaDataOperator::Function<ParquetMetadataOperatorType::KEY_VALUE_META_DATA>,
: TableFunction("parquet_kv_metadata", {LogicalType::VARCHAR}, ParquetMetaDataOperator::Function,
ParquetMetaDataOperator::Bind<ParquetMetadataOperatorType::KEY_VALUE_META_DATA>,
ParquetMetaDataOperator::InitGlobal,
ParquetMetaDataOperator::InitLocal<ParquetMetadataOperatorType::KEY_VALUE_META_DATA>) {
table_scan_progress = ParquetMetaDataOperator::Progress;
get_partition_data = ParquetMetaDataOperator::GetPartitionData;
}

ParquetFileMetadataFunction::ParquetFileMetadataFunction()
: TableFunction("parquet_file_metadata", {LogicalType::VARCHAR},
ParquetMetaDataOperator::Function<ParquetMetadataOperatorType::FILE_META_DATA>,
: TableFunction("parquet_file_metadata", {LogicalType::VARCHAR}, ParquetMetaDataOperator::Function,
ParquetMetaDataOperator::Bind<ParquetMetadataOperatorType::FILE_META_DATA>,
ParquetMetaDataOperator::InitGlobal,
ParquetMetaDataOperator::InitLocal<ParquetMetadataOperatorType::FILE_META_DATA>) {
table_scan_progress = ParquetMetaDataOperator::Progress;
get_partition_data = ParquetMetaDataOperator::GetPartitionData;
}

ParquetBloomProbeFunction::ParquetBloomProbeFunction()
: TableFunction("parquet_bloom_probe", {LogicalType::VARCHAR, LogicalType::VARCHAR, LogicalType::ANY},
ParquetMetaDataOperator::Function<ParquetMetadataOperatorType::BLOOM_PROBE>,
ParquetMetaDataOperator::Function,
ParquetMetaDataOperator::Bind<ParquetMetadataOperatorType::BLOOM_PROBE>,
ParquetMetaDataOperator::InitGlobal,
ParquetMetaDataOperator::InitLocal<ParquetMetadataOperatorType::BLOOM_PROBE>) {
table_scan_progress = ParquetMetaDataOperator::Progress;
get_partition_data = ParquetMetaDataOperator::GetPartitionData;
}

ParquetFullMetadataFunction::ParquetFullMetadataFunction()
: TableFunction("parquet_full_metadata", {LogicalType::VARCHAR},
ParquetMetaDataOperator::Function<ParquetMetadataOperatorType::FULL_METADATA>,
: TableFunction("parquet_full_metadata", {LogicalType::VARCHAR}, ParquetMetaDataOperator::Function,
ParquetMetaDataOperator::Bind<ParquetMetadataOperatorType::FULL_METADATA>,
ParquetMetaDataOperator::InitGlobal,
ParquetMetaDataOperator::InitLocal<ParquetMetadataOperatorType::FULL_METADATA>) {
table_scan_progress = ParquetMetaDataOperator::Progress;
get_partition_data = ParquetMetaDataOperator::GetPartitionData;
}
} // namespace duckdb
Loading
Loading