Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/duckdb/extension/json/json_functions/read_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ class JSONSchemaTask : public BaseExecutorTask {
JSONStructure::ExtractStructure(val, node, true);
}
}
remaining -= next;
if (!node.ContainsVarchar()) { // Can't refine non-VARCHAR types
continue;
}
node.InitializeCandidateTypes(options.max_depth, options.convert_strings_to_integers);
node.RefineCandidateTypes(scan_state.values, next, string_vector, allocator,
auto_detect_state.date_format_map);
remaining -= next;
}
auto_detect_state.total_file_size += file_size;
auto_detect_state.bytes_scanned += total_read_size;
Expand Down
8 changes: 4 additions & 4 deletions src/duckdb/extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ const uint64_t ParquetDecodeUtils::BITPACK_MASKS_SIZE = sizeof(ParquetDecodeUtil

const uint8_t ParquetDecodeUtils::BITPACK_DLEN = 8;

ColumnReader::ColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema_p)
ColumnReader::ColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema_p)
: column_schema(schema_p), reader(reader), page_rows_available(0), dictionary_decoder(*this),
delta_binary_packed_decoder(*this), rle_decoder(*this), delta_length_byte_array_decoder(*this),
delta_byte_array_decoder(*this), byte_stream_split_decoder(*this), aad_crypto_metadata(reader.allocator) {
Expand All @@ -122,7 +122,7 @@ Allocator &ColumnReader::GetAllocator() {
return reader.allocator;
}

ParquetReader &ColumnReader::Reader() {
const ParquetReader &ColumnReader::Reader() {
return reader;
}

Expand Down Expand Up @@ -825,7 +825,7 @@ void ColumnReader::ApplyPendingSkips(data_ptr_t define_out, data_ptr_t repeat_ou
// Create Column Reader
//===--------------------------------------------------------------------===//
template <class T>
static unique_ptr<ColumnReader> CreateDecimalReader(ParquetReader &reader, const ParquetColumnSchema &schema) {
static unique_ptr<ColumnReader> CreateDecimalReader(const ParquetReader &reader, const ParquetColumnSchema &schema) {
switch (schema.type.InternalType()) {
case PhysicalType::INT16:
return make_uniq<TemplatedColumnReader<int16_t, TemplatedParquetValueConversion<T>>>(reader, schema);
Expand All @@ -840,7 +840,7 @@ static unique_ptr<ColumnReader> CreateDecimalReader(ParquetReader &reader, const
}
}

unique_ptr<ColumnReader> ColumnReader::CreateReader(ParquetReader &reader, const ParquetColumnSchema &schema) {
unique_ptr<ColumnReader> ColumnReader::CreateReader(const ParquetReader &reader, const ParquetColumnSchema &schema) {
switch (schema.type.id()) {
case LogicalTypeId::BOOLEAN:
return make_uniq<BooleanColumnReader>(reader, schema);
Expand Down
8 changes: 4 additions & 4 deletions src/duckdb/extension/parquet/include/column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ class ColumnReader {
friend class RLEDecoder;

public:
ColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema_p);
ColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema_p);
virtual ~ColumnReader();

public:
static unique_ptr<ColumnReader> CreateReader(ParquetReader &reader, const ParquetColumnSchema &schema);
static unique_ptr<ColumnReader> CreateReader(const ParquetReader &reader, const ParquetColumnSchema &schema);
virtual void InitializeRead(idx_t row_group_index, const vector<ColumnChunk> &columns, TProtocol &protocol_p);
virtual idx_t Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result_out);
virtual void Select(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result_out,
Expand All @@ -78,7 +78,7 @@ class ColumnReader {
SelectionVector &sel, idx_t &approved_tuple_count);
virtual void Skip(idx_t num_values);

ParquetReader &Reader();
const ParquetReader &Reader();
const LogicalType &Type() const {
return column_schema.type;
}
Expand Down Expand Up @@ -303,7 +303,7 @@ class ColumnReader {
protected:
const ParquetColumnSchema &column_schema;

ParquetReader &reader;
const ParquetReader &reader;
idx_t pending_skips = 0;
bool page_is_filtered_out = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ParquetDecimalUtils {
return res;
}

static unique_ptr<ColumnReader> CreateReader(ParquetReader &reader, const ParquetColumnSchema &schema);
static unique_ptr<ColumnReader> CreateReader(const ParquetReader &reader, const ParquetColumnSchema &schema);
};

template <>
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/extension/parquet/include/parquet_geometry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ColumnReader;
class ClientContext;

struct GeometryColumnReader {
static unique_ptr<ColumnReader> Create(ParquetReader &reader, const ParquetColumnSchema &schema,
static unique_ptr<ColumnReader> Create(const ParquetReader &reader, const ParquetColumnSchema &schema,
ClientContext &context);
};

Expand Down
15 changes: 8 additions & 7 deletions src/duckdb/extension/parquet/include/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class ParquetReader : public BaseFileReader {
shared_ptr<ParquetFileMetadataCache> metadata = nullptr);
~ParquetReader() override;

CachingFileSystem fs;
mutable CachingFileSystem fs;
Allocator &allocator;
shared_ptr<ParquetFileMetadataCache> metadata;
ParquetOptions parquet_options;
Expand All @@ -167,13 +167,15 @@ class ParquetReader : public BaseFileReader {

bool TryInitializeScan(ClientContext &context, GlobalTableFunctionState &gstate,
LocalTableFunctionState &lstate) override;
void PrepareScan(ClientContext &context, GlobalTableFunctionState &gstate_p,
LocalTableFunctionState &lstate_p) override;
AsyncResult Scan(ClientContext &context, GlobalTableFunctionState &global_state,
LocalTableFunctionState &local_state, DataChunk &chunk) override;
void FinishFile(ClientContext &context, GlobalTableFunctionState &gstate_p) override;
double GetProgressInFile(ClientContext &context) override;

public:
void InitializeScan(ClientContext &context, ParquetReaderScanState &state, vector<idx_t> groups_to_read);
void InitializeScan(ClientContext &context, ParquetReaderScanState &state, vector<idx_t> groups_to_read) const;
AsyncResult Scan(ClientContext &context, ParquetReaderScanState &state, DataChunk &output);

idx_t NumRows() const;
Expand All @@ -184,11 +186,11 @@ class ParquetReader : public BaseFileReader {
const duckdb_parquet::FileMetaData *GetFileMetadata() const;
string static GetUniqueFileIdentifier(const duckdb_parquet::EncryptionAlgorithm &encryption_algorithm);

uint32_t Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot);
uint32_t Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) const;
uint32_t ReadEncrypted(duckdb_apache::thrift::TBase &object, TProtocol &iprot,
CryptoMetaData &aad_crypto_metadata) const;
uint32_t ReadData(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer,
const uint32_t buffer_size);
const uint32_t buffer_size) const;
uint32_t ReadDataEncrypted(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer,
const uint32_t buffer_size, CryptoMetaData &aad_crypto_metadata) const;

Expand Down Expand Up @@ -226,10 +228,9 @@ class ParquetReader : public BaseFileReader {
ParquetColumnSchema ParseSchemaRecursive(idx_t depth, idx_t max_define, idx_t max_repeat, idx_t &next_schema_idx,
idx_t &next_file_idx, ClientContext &context);

unique_ptr<ColumnReader> CreateReader(ClientContext &context);

unique_ptr<ColumnReader> CreateReader(ClientContext &context) const;
unique_ptr<ColumnReader> CreateReaderRecursive(ClientContext &context, const vector<ColumnIndex> &indexes,
const ParquetColumnSchema &schema);
const ParquetColumnSchema &schema) const;
const duckdb_parquet::RowGroup &GetGroup(ParquetReaderScanState &state);
uint64_t GetGroupCompressedSize(ParquetReaderScanState &state);
idx_t GetGroupOffset(ParquetReaderScanState &state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class BooleanColumnReader : public TemplatedColumnReader<bool, BooleanParquetVal
static constexpr const PhysicalType TYPE = PhysicalType::BOOL;

public:
BooleanColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema)
BooleanColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema)
: TemplatedColumnReader<bool, BooleanParquetValueConversion>(reader, schema), byte_pos(0) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CallbackColumnReader
static constexpr const PhysicalType TYPE = PhysicalType::INVALID;

public:
CallbackColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema)
CallbackColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema)
: TemplatedColumnReader<DUCKDB_PHYSICAL_TYPE,
CallbackParquetValueConversion<PARQUET_PHYSICAL_TYPE, DUCKDB_PHYSICAL_TYPE, FUNC>>(
reader, schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DecimalColumnReader
TemplatedColumnReader<DUCKDB_PHYSICAL_TYPE, DecimalParquetValueConversion<DUCKDB_PHYSICAL_TYPE, FIXED_LENGTH>>;

public:
DecimalColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema)
DecimalColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema)
: TemplatedColumnReader<DUCKDB_PHYSICAL_TYPE,
DecimalParquetValueConversion<DUCKDB_PHYSICAL_TYPE, FIXED_LENGTH>>(reader, schema) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct IntervalValueConversion {

class IntervalColumnReader : public TemplatedColumnReader<interval_t, IntervalValueConversion> {
public:
IntervalColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema)
IntervalColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema)
: TemplatedColumnReader<interval_t, IntervalValueConversion>(reader, schema) {
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ListColumnReader : public ColumnReader {
static constexpr const PhysicalType TYPE = PhysicalType::LIST;

public:
ListColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema,
ListColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema,
unique_ptr<ColumnReader> child_column_reader_p);

idx_t Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result_out) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class NullColumnReader : public ColumnReader {
static constexpr const PhysicalType TYPE = PhysicalType::INVALID;

public:
NullColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) : ColumnReader(reader, schema) {};
NullColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema) : ColumnReader(reader, schema) {};

shared_ptr<ResizeableBuffer> dict;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class RowNumberColumnReader : public ColumnReader {
static constexpr const PhysicalType TYPE = PhysicalType::INT64;

public:
RowNumberColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema);
RowNumberColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema);

public:
idx_t Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class StringColumnReader : public ColumnReader {
static constexpr const PhysicalType TYPE = PhysicalType::VARCHAR;

public:
StringColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema);
StringColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema);
idx_t fixed_width_string_length;
const StringColumnType string_column_type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class StructColumnReader : public ColumnReader {
static constexpr const PhysicalType TYPE = PhysicalType::STRUCT;

public:
StructColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema,
StructColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema,
vector<unique_ptr<ColumnReader>> child_readers_p);

vector<unique_ptr<ColumnReader>> child_readers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class TemplatedColumnReader : public ColumnReader {
static constexpr const PhysicalType TYPE = PhysicalType::INVALID;

public:
TemplatedColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) : ColumnReader(reader, schema) {
TemplatedColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema)
: ColumnReader(reader, schema) {
}

shared_ptr<ResizeableBuffer> dict;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct UUIDValueConversion {

class UUIDColumnReader : public TemplatedColumnReader<hugeint_t, UUIDValueConversion> {
public:
UUIDColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema)
UUIDColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema)
: TemplatedColumnReader<hugeint_t, UUIDValueConversion>(reader, schema) {
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class VariantColumnReader : public ColumnReader {
static constexpr const PhysicalType TYPE = PhysicalType::STRUCT;

public:
VariantColumnReader(ClientContext &context, ParquetReader &reader, const ParquetColumnSchema &schema,
VariantColumnReader(ClientContext &context, const ParquetReader &reader, const ParquetColumnSchema &schema,
vector<unique_ptr<ColumnReader>> child_readers_p);

ClientContext &context;
Expand Down
8 changes: 5 additions & 3 deletions src/duckdb/extension/parquet/parquet_geometry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ unique_ptr<GeoParquetFileMetadata> GeoParquetFileMetadata::TryRead(const duckdb_

// Parse the CRS
const auto crs_val = yyjson_obj_get(column_val, "crs");
if (crs_val) {
if (crs_val && !yyjson_is_null(crs_val)) {
// Parse the CRS
if (!yyjson_is_obj(crs_val)) {
throw InvalidInputException("Geoparquet column '%s' has invalid CRS", column_name);
Expand All @@ -126,8 +126,10 @@ unique_ptr<GeoParquetFileMetadata> GeoParquetFileMetadata::TryRead(const duckdb_

// Free the temporary CRS JSON string
free(crs_json);
} else if (crs_val && yyjson_is_null(crs_val)) {
// If CRS is null, do nothing
} else {
// Otherwise, default to OGC:CRS84
// Otherwise, if no CRS, default to OGC:CRS84
auto crs = CoordinateReferenceSystem::TryConvert(context, "OGC:CRS84",
CoordinateReferenceSystemType::PROJJSON);
if (crs) {
Expand Down Expand Up @@ -363,7 +365,7 @@ optional_ptr<const GeoParquetColumnMetadata> GeoParquetFileMetadata::GetColumnMe
return &it->second;
}

unique_ptr<ColumnReader> GeometryColumnReader::Create(ParquetReader &reader, const ParquetColumnSchema &schema,
unique_ptr<ColumnReader> GeometryColumnReader::Create(const ParquetReader &reader, const ParquetColumnSchema &schema,
ClientContext &context) {
D_ASSERT(schema.type.id() == LogicalTypeId::GEOMETRY);
D_ASSERT(schema.children.size() == 1 && schema.children[0].type.id() == LogicalTypeId::BLOB);
Expand Down
13 changes: 9 additions & 4 deletions src/duckdb/extension/parquet/parquet_multi_file_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct ParquetReadGlobalState : public GlobalTableFunctionState {

struct ParquetReadLocalState : public LocalTableFunctionState {
ParquetReaderScanState scan_state;
vector<idx_t> group_indexes;
};

static void ParseFileRowNumberOption(MultiFileReaderBindData &bind_data, ParquetOptions &options,
Expand Down Expand Up @@ -341,7 +342,7 @@ static vector<PartitionStatistics> ParquetGetPartitionStats(ClientContext &conte
}
auto &parquet_data = bind_data.bind_data->Cast<ParquetReadBindData>();
auto &cached_metadata = parquet_data.TryLoadCaches(bind_data, context);
if (!cached_metadata.empty()) {
if (cached_metadata.empty()) {
// no cached metadata - bail
return result;
}
Expand Down Expand Up @@ -684,12 +685,17 @@ bool ParquetReader::TryInitializeScan(ClientContext &context, GlobalTableFunctio
return false;
}
// The current reader has rowgroups left to be scanned
vector<idx_t> group_indexes {gstate.row_group_index};
InitializeScan(context, lstate.scan_state, group_indexes);
lstate.group_indexes = {gstate.row_group_index};
gstate.row_group_index++;
return true;
}

void ParquetReader::PrepareScan(ClientContext &context, GlobalTableFunctionState &gstate_p,
LocalTableFunctionState &lstate_p) {
auto &lstate = lstate_p.Cast<ParquetReadLocalState>();
InitializeScan(context, lstate.scan_state, lstate.group_indexes);
}

void ParquetReader::FinishFile(ClientContext &context, GlobalTableFunctionState &gstate_p) {
auto &gstate = gstate_p.Cast<ParquetReadGlobalState>();
gstate.row_group_index = 0;
Expand All @@ -705,7 +711,6 @@ AsyncResult ParquetReader::Scan(ClientContext &context, GlobalTableFunctionState
}
}
#endif

auto &gstate = gstate_p.Cast<ParquetReadGlobalState>();
auto &local_state = local_state_p.Cast<ParquetReadLocalState>();
local_state.scan_state.op = gstate.op;
Expand Down
12 changes: 6 additions & 6 deletions src/duckdb/extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ ParquetColumnSchema ParquetReader::ParseColumnSchema(const SchemaElement &s_ele,

unique_ptr<ColumnReader> ParquetReader::CreateReaderRecursive(ClientContext &context,
const vector<ColumnIndex> &indexes,
const ParquetColumnSchema &schema) {
const ParquetColumnSchema &schema) const {
switch (schema.schema_type) {
case ParquetColumnSchemaType::FILE_ROW_NUMBER:
return make_uniq<RowNumberColumnReader>(*this, schema);
Expand Down Expand Up @@ -460,7 +460,7 @@ unique_ptr<ColumnReader> ParquetReader::CreateReaderRecursive(ClientContext &con
}
}

unique_ptr<ColumnReader> ParquetReader::CreateReader(ClientContext &context) {
unique_ptr<ColumnReader> ParquetReader::CreateReader(ClientContext &context) const {
auto ret = CreateReaderRecursive(context, column_indexes, *root_schema);
if (ret->Type().id() != LogicalTypeId::STRUCT) {
throw InternalException("Root element of Parquet file must be a struct");
Expand Down Expand Up @@ -974,7 +974,7 @@ string ParquetReader::GetUniqueFileIdentifier(const duckdb_parquet::EncryptionAl
}
}

uint32_t ParquetReader::Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) {
uint32_t ParquetReader::Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) const {
return object.read(&iprot);
}

Expand All @@ -986,7 +986,7 @@ uint32_t ParquetReader::ReadEncrypted(duckdb_apache::thrift::TBase &object, TPro
}

uint32_t ParquetReader::ReadData(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer,
const uint32_t buffer_size) {
const uint32_t buffer_size) const {
return iprot.getTransport()->read(buffer, buffer_size);
}

Expand All @@ -997,7 +997,7 @@ uint32_t ParquetReader::ReadDataEncrypted(duckdb_apache::thrift::protocol::TProt
*encryption_util, aad_crypto_metadata);
}

static idx_t GetRowGroupOffset(ParquetReader &reader, idx_t group_idx) {
static idx_t GetRowGroupOffset(const ParquetReader &reader, idx_t group_idx) {
idx_t row_group_offset = 0;
auto &row_groups = reader.GetFileMetadata()->row_groups;
for (idx_t i = 0; i < group_idx; i++) {
Expand Down Expand Up @@ -1232,7 +1232,7 @@ ParquetScanFilter::~ParquetScanFilter() {
}

void ParquetReader::InitializeScan(ClientContext &context, ParquetReaderScanState &state,
vector<idx_t> groups_to_read) {
vector<idx_t> groups_to_read) const {
state.current_group = -1;
state.finished = false;
state.offset_in_group = 0;
Expand Down
Loading
Loading