Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions be/src/olap/accept_null_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ class AcceptNullPredicate : public ColumnPredicate {
return _nested->evaluate_and(statistic);
}

bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
return _nested->evaluate_and(statistic) || statistic->has_null;
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const override {
_nested->evaluate_and(statistic, row_ranges);
vectorized::ParquetPredicate::PageIndexStat* stat = nullptr;
if (!(statistic->get_stat_func)(&stat, column_id())) {
return true;
}

for (int page_id = 0; page_id < stat->num_of_pages; page_id++) {
if (stat->has_null[page_id]) {
row_ranges->add(stat->ranges[page_id]);
}
}
return row_ranges->count() > 0;
}

bool evaluate_del(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
return _nested->evaluate_del(statistic);
}
Expand Down
12 changes: 8 additions & 4 deletions be/src/olap/comparison_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,13 @@ class ComparisonPredicateBase final : public ColumnPredicate {
if ((*statistic->get_stat_func)(statistic, column_id())) {
vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
if (statistic->is_all_null) {
result = false;
} else if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field,
&max_field)
.ok()) [[unlikely]] {
result = true;
} else {
result = camp_field(min_field, max_field);
Expand All @@ -224,6 +227,7 @@ class ComparisonPredicateBase final : public ColumnPredicate {
RowRanges* row_ranges) const override {
vectorized::ParquetPredicate::PageIndexStat* stat = nullptr;
if (!(statistic->get_stat_func)(&stat, column_id())) {
row_ranges->add(statistic->row_group_range);
return true;
}

Expand Down
12 changes: 8 additions & 4 deletions be/src/olap/in_list_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,13 @@ class InListPredicateBase final : public ColumnPredicate {
if ((*statistic->get_stat_func)(statistic, column_id())) {
vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
if (statistic->is_all_null) {
result = false;
} else if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field,
&max_field)
.ok()) [[unlikely]] {
result = true;
} else {
result = camp_field(min_field, max_field);
Expand All @@ -308,6 +311,7 @@ class InListPredicateBase final : public ColumnPredicate {
RowRanges* row_ranges) const override {
vectorized::ParquetPredicate::PageIndexStat* stat = nullptr;
if (!(statistic->get_stat_func)(&stat, column_id())) {
row_ranges->add(statistic->row_group_range);
return true;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/null_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class NullPredicate final : public ColumnPredicate {
RowRanges* row_ranges) const override {
vectorized::ParquetPredicate::PageIndexStat* stat = nullptr;
if (!(statistic->get_stat_func)(&stat, column_id())) {
row_ranges->add(statistic->row_group_range);
return true;
}
for (int page_id = 0; page_id < stat->num_of_pages; page_id++) {
Expand Down
21 changes: 21 additions & 0 deletions be/src/olap/shared_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,27 @@ class SharedPredicate final : public ColumnPredicate {
return _nested->get_search_str();
}

bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
std::shared_lock<std::shared_mutex> lock(*_mtx);
if (!_nested) {
// at the begining _nested will be null, so return true.
return true;
}
return _nested->evaluate_and(statistic);
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const override {
std::shared_lock<std::shared_mutex> lock(*_mtx);

if (!_nested) {
// at the begining _nested will be null, so return true.
row_ranges->add(statistic->row_group_range);
return true;
}
return _nested->evaluate_and(statistic, row_ranges);
}

private:
uint16_t _evaluate_inner(const vectorized::IColumn& column, uint16_t* sel,
uint16_t size) const override {
Expand Down
6 changes: 0 additions & 6 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,12 +524,6 @@ Status TabletReader::_init_conditions_param(const ReaderParams& read_params) {
}
}

for (int id : read_params.topn_filter_source_node_ids) {
auto& runtime_predicate =
read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id);
RETURN_IF_ERROR(runtime_predicate.set_tablet_schema(read_params.topn_filter_target_node_id,
_tablet_schema));
}
return Status::OK();
}

Expand Down
17 changes: 17 additions & 0 deletions be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
PushDownType _should_push_down_topn_filter() const override {
return PushDownType::PARTIAL_ACCEPTABLE;
}
bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override {
// For external table/ file scan, first try push down the predicate,
// and then determine whether it can be pushed down within the (parquet/orc) reader.
return true;
}

PushDownType _should_push_down_bitmap_filter() const override {
return PushDownType::PARTIAL_ACCEPTABLE;
}
Expand Down Expand Up @@ -110,6 +116,17 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
return _batch_split_mode ? 1 : ScanOperatorX<FileScanLocalState>::parallelism(state);
}

int get_column_id(const std::string& col_name) const override {
int column_id_counter = 0;
for (const auto& slot : _output_tuple_desc->slots()) {
if (slot->col_name() == col_name) {
return column_id_counter;
}
column_id_counter++;
}
return column_id_counter;
}

private:
friend class FileScanLocalState;

Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ bool ScanLocalState<Derived>::_is_predicate_acting_on_slot(const vectorized::VEx
if (_slot_id_to_value_range.end() == sid_to_range) {
return false;
}
if (remove_nullable((*slot_desc)->type())->get_primitive_type() == TYPE_VARBINARY) {
return false;
}
*range = &(sid_to_range->second);
return true;
}
Expand Down Expand Up @@ -1235,6 +1238,9 @@ Status ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
.nodes[0]
.slot_ref.slot_id];
DCHECK(s != nullptr);
if (remove_nullable(s->type())->get_primitive_type() == TYPE_VARBINARY) {
continue;
}
auto col_name = s->col_name();
cid = get_column_id(col_name);
}
Expand Down
12 changes: 5 additions & 7 deletions be/src/runtime/runtime_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ Status RuntimePredicate::init_target(
_contexts[target_node_id].col_name =
slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id]
->col_name();
_contexts[target_node_id].predicate =
SharedPredicate::create_shared(cast_set<uint32_t>(column_id), "");
_contexts[target_node_id].col_data_type =
slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id]->type();
_contexts[target_node_id].predicate = SharedPredicate::create_shared(
cast_set<uint32_t>(column_id), _contexts[target_node_id].col_name);
}
_detected_target = true;
return Status::OK();
Expand Down Expand Up @@ -211,13 +213,9 @@ Status RuntimePredicate::update(const Field& value) {
}
for (auto p : _contexts) {
auto ctx = p.second;
if (!ctx.tablet_schema) {
continue;
}
const auto& column = *DORIS_TRY(ctx.tablet_schema->column(ctx.col_name));
auto str_ref = _get_string_ref(_orderby_extrem, _type);
std::shared_ptr<ColumnPredicate> pred =
_pred_constructor(ctx.predicate->column_id(), column.name(), column.get_vec_type(),
_pred_constructor(ctx.predicate->column_id(), ctx.col_name, ctx.col_data_type,
str_ref, false, _predicate_arena);

// For NULLS FIRST, wrap a AcceptNullPredicate to return true for NULL
Expand Down
20 changes: 1 addition & 19 deletions be/src/runtime/runtime_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,6 @@ class RuntimePredicate {
_detected_source = true;
}

Status set_tablet_schema(int32_t target_node_id, TabletSchemaSPtr tablet_schema) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
check_target_node_id(target_node_id);
if (_contexts[target_node_id].tablet_schema) {
return Status::OK();
}
RETURN_IF_ERROR(tablet_schema->have_column(_contexts[target_node_id].col_name));
_contexts[target_node_id].tablet_schema = tablet_schema;
DCHECK(_contexts[target_node_id].predicate != nullptr);
return Status::OK();
}

std::shared_ptr<ColumnPredicate> get_predicate(int32_t target_node_id) {
std::shared_lock<std::shared_mutex> rlock(_rwlock);
check_target_node_id(target_node_id);
Expand Down Expand Up @@ -130,15 +118,9 @@ class RuntimePredicate {
struct TargetContext {
TExpr expr;
std::string col_name;
// TODO(gabriel): remove this
TabletSchemaSPtr tablet_schema;
vectorized::DataTypePtr col_data_type;
std::shared_ptr<ColumnPredicate> predicate;

Result<int32_t> get_field_index() {
const auto& column = *DORIS_TRY(tablet_schema->column(col_name));
return tablet_schema->field_index(column.unique_id());
}

bool target_is_slot() const {
return expr.nodes[0].node_type == TExprNodeType::SLOT_REF &&
expr.nodes[0].slot_ref.is_virtual_slot == false;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/parquet/parquet_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class ParquetPredicate {
const cctz::time_zone* ctz;
std::map<int, PageIndexStat> stats;
std::function<bool(PageIndexStat**, int)> get_stat_func;
RowRange row_group_range;
};

// The encoded Parquet min-max value is parsed into `fields`;
Expand Down
62 changes: 8 additions & 54 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,12 @@ bool ParquetReader::_type_matches(const int cid) const {
!is_complex_type(table_col_type->get_primitive_type());
}

Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts) {
RowGroupReader::LazyReadContext new_lazy_read_ctx;
new_lazy_read_ctx.conjuncts = new_conjuncts;
new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns);
new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns);
_lazy_read_ctx = std::move(new_lazy_read_ctx);

_push_down_predicates.clear();
Status ParquetReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
_lazy_read_ctx.fill_partition_columns = partition_columns;
_lazy_read_ctx.fill_missing_columns = missing_columns;

// std::unordered_map<column_name, std::pair<col_id, slot_id>>
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
Expand All @@ -442,7 +440,6 @@ Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjunc
visit_slot(child.get());
}
};

for (const auto& conjunct : _lazy_read_ctx.conjuncts) {
auto expr = conjunct->root();

Expand All @@ -452,33 +449,7 @@ Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjunc

auto filter_impl = runtime_filter->get_impl();
visit_slot(filter_impl.get());

// only support push down for filter row group : MAX_FILTER, MAX_FILTER, MINMAX_FILTER, IN_FILTER
if ((runtime_filter->node_type() == TExprNodeType::BINARY_PRED) &&
(runtime_filter->op() == TExprOpcode::GE ||
runtime_filter->op() == TExprOpcode::LE)) {
expr = filter_impl;
} else if (runtime_filter->node_type() == TExprNodeType::IN_PRED &&
runtime_filter->op() == TExprOpcode::FILTER_IN) {
VDirectInPredicate* direct_in_predicate =
assert_cast<VDirectInPredicate*>(filter_impl.get());

int max_in_size =
_state->query_options().__isset.max_pushdown_conditions_per_column
? _state->query_options().max_pushdown_conditions_per_column
: 1024;
if (direct_in_predicate->get_set_func()->size() == 0 ||
direct_in_predicate->get_set_func()->size() > max_in_size) {
continue;
}

VExprSPtr new_in_slot = nullptr;
if (direct_in_predicate->get_slot_in_expr(new_in_slot)) {
expr = new_in_slot;
}
}
} else if (VTopNPred* topn_pred = typeid_cast<VTopNPred*>(expr.get());
topn_pred == nullptr) {
} else {
visit_slot(expr.get());
}
}
Expand Down Expand Up @@ -566,17 +537,6 @@ Status ParquetReader::_update_lazy_read_ctx(const VExprContextSPtrs& new_conjunc
}
}

return Status::OK();
}

Status ParquetReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
_lazy_read_ctx.fill_partition_columns = partition_columns;
_lazy_read_ctx.fill_missing_columns = missing_columns;
RETURN_IF_ERROR(_update_lazy_read_ctx(_lazy_read_ctx.conjuncts));

if (_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
return Status::EndOfFile("No row group to read");
}
Expand Down Expand Up @@ -716,13 +676,6 @@ Status ParquetReader::_next_row_group_reader() {
continue;
}

bool has_late_rf_cond = false;
VExprContextSPtrs new_push_down_conjuncts;
RETURN_IF_ERROR(_call_late_rf_func(&has_late_rf_cond, new_push_down_conjuncts));
if (has_late_rf_cond) {
RETURN_IF_ERROR(_update_lazy_read_ctx(new_push_down_conjuncts));
}

candidate_row_ranges.clear();
// The range of lines to be read is determined by the push down predicate.
RETURN_IF_ERROR(_process_min_max_bloom_filter(
Expand Down Expand Up @@ -1064,6 +1017,7 @@ Status ParquetReader::_process_page_index_filter(
*ans = &sig_stat;
return true;
};
cached_page_index.row_group_range = {0, row_group.num_rows};
cached_page_index.get_stat_func = get_stat_func;

candidate_row_ranges->add({0, row_group.num_rows});
Expand Down
17 changes: 0 additions & 17 deletions be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,6 @@ class ParquetReader : public GenericReader {

bool count_read_rows() override { return true; }

void set_update_late_rf_func(std::function<Status(bool*, VExprContextSPtrs&)>&& func) {
_call_late_rf_func = std::move(func);
}

protected:
void _collect_profile_before_close() override;

Expand Down Expand Up @@ -262,9 +258,6 @@ class ParquetReader : public GenericReader {
bool _exists_in_file(const std::string& expr_name) const;
bool _type_matches(const int cid) const;

// update lazy read context when runtime filter changed
Status _update_lazy_read_ctx(const VExprContextSPtrs& new_conjuncts);

RuntimeProfile* _profile = nullptr;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
Expand Down Expand Up @@ -349,18 +342,8 @@ class ParquetReader : public GenericReader {

std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;

// Since the filtering conditions for topn are dynamic, the filtering is delayed until create next row group reader.
std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
Arena _arena;

// when creating a new row group reader, call this function to get the latest runtime filter conjuncts.
// The default implementation does nothing, sets 'changed' to false, and returns OK.
// This is used when iceberg read position delete file ...
static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) {
*changed = false;
return Status::OK();
}
std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = default_late_rf_func;
};
#include "common/compile_check_end.h"

Expand Down
Loading