Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions be/src/core/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ MutableColumns Block::mutate_columns() {
MutableColumns columns(num_columns);
for (size_t i = 0; i < num_columns; ++i) {
DCHECK(data[i].type);
columns[i] = data[i].column ? (*std::move(data[i].column)).mutate()
columns[i] = data[i].column ? IColumn::mutate(std::move(data[i].column))
: data[i].type->create_column();
}
return columns;
Expand Down Expand Up @@ -655,9 +655,11 @@ void Block::clear_column_data(int64_t column_size) noexcept {
}
for (auto& d : data) {
if (d.column) {
// Temporarily disable reference count check because a column might be referenced multiple times within a block.
// Queries like this: `select c, c from t1;`
(*std::move(d.column)).assume_mutable()->clear();
if (d.column->is_exclusive()) {
d.column->assume_mutable()->clear();
} else {
d.column = d.column->clone_empty();
}
}
}
}
Expand Down Expand Up @@ -1085,7 +1087,13 @@ void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_
for (auto idx : char_type_idx) {
if (idx < data.size()) {
auto& col_and_name = this->get_by_position(idx);
col_and_name.column->assume_mutable()->shrink_padding_chars();
if (col_and_name.column->is_exclusive()) {
col_and_name.column->assume_mutable()->shrink_padding_chars();
} else {
auto mutable_col = std::move(*col_and_name.column).mutate();
mutable_col->shrink_padding_chars();
col_and_name.column = std::move(mutable_col);
}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/core/column/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,11 @@ bool is_column_const(const IColumn& column) {

void IColumn::check_const_only_in_top_level() const {
ColumnCallback throw_if_const = [&](WrappedPtr& column) {
if (is_column_const(*column)) {
const ColumnPtr& col = const_cast<const WrappedPtr&>(column);
if (is_column_const(*col)) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"const column is not allowed to be nested, but got {}",
column->get_name());
col->get_name());
}
};
const_cast<IColumn*>(this)->for_each_subcolumn(throw_if_const);
Expand Down
12 changes: 8 additions & 4 deletions be/src/core/column/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -581,16 +581,20 @@ class IColumn : public COW<IColumn> {

MutablePtr mutate() const&& {
MutablePtr res = shallow_mutate();
res->for_each_subcolumn(
[](WrappedPtr& subcolumn) { subcolumn = std::move(*subcolumn).mutate(); });
res->for_each_subcolumn([](WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
});
return res;
}

static MutablePtr mutate(Ptr ptr) {
MutablePtr res = ptr->shallow_mutate(); /// Now use_count is 2.
ptr.reset(); /// Reset use_count to 1.
res->for_each_subcolumn(
[](WrappedPtr& subcolumn) { subcolumn = std::move(*subcolumn).mutate(); });
res->for_each_subcolumn([](WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
});
return res;
}

Expand Down
19 changes: 18 additions & 1 deletion be/src/core/column/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ ColumnArray::ColumnArray(MutableColumnPtr&& nested_column, MutableColumnPtr&& of
// }
// #endif
check_const_only_in_top_level();
const auto* offsets_concrete = typeid_cast<const ColumnOffsets*>(offsets.get());
// Use const access to avoid triggering assume_mutable_ref() during construction.
const auto* offsets_concrete =
typeid_cast<const ColumnOffsets*>(static_cast<const IColumn::Ptr&>(offsets).get());

if (!offsets_concrete) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "offsets_column must be a ColumnUInt64");
Expand Down Expand Up @@ -98,6 +100,21 @@ ColumnArray::ColumnArray(MutableColumnPtr&& nested_column) : data(std::move(nest
offsets = ColumnOffsets::create();
}

ColumnArray::ColumnArray(SharedTag, ColumnPtr nested_column, ColumnPtr offsets_column) {
if (is_column_const(*nested_column)) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"const column is not allowed to be nested, but got {}",
nested_column->get_name());
}
if (is_column_const(*offsets_column)) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"const column is not allowed to be nested, but got {}",
offsets_column->get_name());
}
static_cast<IColumn::Ptr&>(data) = std::move(nested_column);
static_cast<IColumn::Ptr&>(offsets) = std::move(offsets_column);
}

void ColumnArray::shrink_padding_chars() {
data->shrink_padding_chars();
}
Expand Down
14 changes: 11 additions & 3 deletions be/src/core/column/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
/** Create an empty column of arrays with the type of values as in the column `nested_column` */
explicit ColumnArray(MutableColumnPtr&& nested_column);

/** Create an array column with shared (possibly non-exclusive) nested column and offsets. */
struct SharedTag {};
ColumnArray(SharedTag, ColumnPtr nested_column, ColumnPtr offsets_column);

ColumnArray(const ColumnArray&) = default;

ColumnArray() = default;
Expand All @@ -98,12 +102,16 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
using Base = COWHelper<IColumn, ColumnArray>;

static MutablePtr create(const ColumnPtr& nested_column, const ColumnPtr& offsets_column) {
return ColumnArray::create(nested_column->assume_mutable(),
offsets_column->assume_mutable());
// Construct with shared columns preserved (no cloning), as create(ColumnPtr) is designed
// to accept immutable/shared arguments per the COW contract.
return Base::create(SharedTag {}, nested_column, offsets_column);
}

static MutablePtr create(const ColumnPtr& nested_column) {
return ColumnArray::create(nested_column->assume_mutable());
// Construct with shared columns preserved (no cloning), as create(ColumnPtr) is designed
// to accept immutable/shared arguments per the COW contract.
ColumnPtr empty_offsets = ColumnOffsets::create();
return Base::create(SharedTag {}, nested_column, std::move(empty_offsets));
}

template <typename... Args,
Expand Down
14 changes: 9 additions & 5 deletions be/src/core/column/column_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,19 @@ ColumnPtr squash_const(const ColumnPtr& col) {
ColumnConst::ColumnConst(const ColumnPtr& data_, size_t s_, bool create_with_empty,
bool need_squash)
: data(need_squash ? squash_const(data_) : data_), s(s_) {
if (data->empty() != create_with_empty) {
const IColumn& col = get_data_column();
if (col.empty() != create_with_empty) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Incorrect size of nested column in constructor of ColumnConst: {}, "
"create_with_empty: {}.",
data->size(), create_with_empty);
col.size(), create_with_empty);
}

if (data->size() != 1 && !create_with_empty) {
if (col.size() != 1 && !create_with_empty) {
throw doris::Exception(
ErrorCode::INTERNAL_ERROR,
"Incorrect size of nested column in constructor of ColumnConst: {}, must be 1.",
data->size());
col.size());
}
}

Expand Down Expand Up @@ -108,7 +109,10 @@ void ColumnConst::get_permutation(bool /*reverse*/, size_t /*limit*/, int /*nan_
}

void ColumnConst::replace_float_special_values() {
data->replace_float_special_values();
// COW: get exclusive ownership of data before mutating
auto mutable_data = IColumn::mutate(std::move(static_cast<ColumnPtr&>(data)));
mutable_data->replace_float_special_values();
data = std::move(mutable_data);
}

std::pair<ColumnPtr, size_t> check_column_const_set_readability(const IColumn& column,
Expand Down
9 changes: 7 additions & 2 deletions be/src/core/column/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
bool has_enough_capacity(const IColumn& src) const override { return true; }

int compare_at(size_t, size_t, const IColumn& rhs, int nan_direction_hint) const override {
auto rhs_const_column = assert_cast<const ColumnConst&, TypeCheckOnRelease::DISABLE>(rhs);
const auto& rhs_const_column =
assert_cast<const ColumnConst&, TypeCheckOnRelease::DISABLE>(rhs);

const auto* this_nullable = check_and_get_column<ColumnNullable>(data.get());
const auto* rhs_nullable =
Expand Down Expand Up @@ -321,7 +322,11 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {

size_t deserialize_impl(const char* pos) override {
++s;
return data->deserialize_impl(pos);
ColumnPtr owned = std::move(static_cast<ColumnPtr&>(data));
auto mutable_data = IColumn::mutate(std::move(owned));
size_t ret = mutable_data->deserialize_impl(pos);
data = std::move(mutable_data);
return ret;
}

void replace_float_special_values() override;
Expand Down
89 changes: 58 additions & 31 deletions be/src/core/column/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,59 +518,82 @@ void ColumnMap::insert_range_from_ignore_overflow(const IColumn& src, size_t sta
}

ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) const {
auto k_arr =
ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable())
->filter(filt, result_size_hint);
auto v_arr =
ColumnArray::create(values_column->assume_mutable(), offsets_column->assume_mutable())
->filter(filt, result_size_hint);
// For const filter we must clone subcolumns so the original ColumnMap remains intact.
// IColumn::mutate(copy) clones if use_count>1, returns self if exclusive.
auto offsets_mut = IColumn::mutate(static_cast<IColumn::Ptr>(offsets_column));
MutableColumnPtr offsets_copy = offsets_mut->clone_empty();
offsets_copy->insert_range_from(*offsets_mut, 0, offsets_mut->size());
auto k_arr = ColumnArray::create(IColumn::mutate(static_cast<IColumn::Ptr>(keys_column)),
std::move(offsets_mut))
->filter(filt, result_size_hint);
auto v_arr = ColumnArray::create(IColumn::mutate(static_cast<IColumn::Ptr>(values_column)),
std::move(offsets_copy))
->filter(filt, result_size_hint);
return ColumnMap::create(assert_cast<const ColumnArray&>(*k_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*v_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*k_arr).get_offsets_ptr());
}

size_t ColumnMap::filter(const Filter& filter) {
MutableColumnPtr copied_off = offsets_column->clone_empty();
copied_off->insert_range_from(*offsets_column, 0, offsets_column->size());
ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable())
->filter(filter);
ColumnArray::create(values_column->assume_mutable(), copied_off->assume_mutable())
->filter(filter);
return get_offsets().size();
// Move subcolumns out of this ColumnMap to get exclusive ownership, then write back.
auto keys_mut = IColumn::mutate(std::move(static_cast<IColumn::Ptr&>(keys_column)));
auto offsets_mut = IColumn::mutate(std::move(static_cast<IColumn::Ptr&>(offsets_column)));
auto values_mut = IColumn::mutate(std::move(static_cast<IColumn::Ptr&>(values_column)));
// Clone offsets for values (both keys and values share the same offsets structure)
MutableColumnPtr copied_off = offsets_mut->clone_empty();
copied_off->insert_range_from(*offsets_mut, 0, offsets_mut->size());
auto k_arr = ColumnArray::create(std::move(keys_mut), std::move(offsets_mut));
k_arr->filter(filter);
auto v_arr = ColumnArray::create(std::move(values_mut), std::move(copied_off));
v_arr->filter(filter);
// Put filtered subcolumns back
static_cast<IColumn::Ptr&>(keys_column) = k_arr->get_data_ptr();
static_cast<IColumn::Ptr&>(offsets_column) = k_arr->get_offsets_ptr();
static_cast<IColumn::Ptr&>(values_column) = v_arr->get_data_ptr();
// Use const access to avoid assume_mutable_ref() on the just-written-back offsets_column
// (k_arr still holds a ref, so use_count > 1 until k_arr goes out of scope)
return static_cast<const IColumn::Ptr&>(offsets_column)->size();
}

MutableColumnPtr ColumnMap::permute(const Permutation& perm, size_t limit) const {
// Make a temp column array
auto k_arr =
ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable())
->permute(perm, limit);
auto v_arr =
ColumnArray::create(values_column->assume_mutable(), offsets_column->assume_mutable())
->permute(perm, limit);
// Const permute: clone subcolumns so the original ColumnMap remains intact.
auto offsets_mut = IColumn::mutate(static_cast<IColumn::Ptr>(offsets_column));
MutableColumnPtr offsets_copy = offsets_mut->clone_empty();
offsets_copy->insert_range_from(*offsets_mut, 0, offsets_mut->size());
auto k_arr = ColumnArray::create(IColumn::mutate(static_cast<IColumn::Ptr>(keys_column)),
std::move(offsets_mut))
->permute(perm, limit);
auto v_arr = ColumnArray::create(IColumn::mutate(static_cast<IColumn::Ptr>(values_column)),
std::move(offsets_copy))
->permute(perm, limit);

return ColumnMap::create(assert_cast<const ColumnArray&>(*k_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*v_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*k_arr).get_offsets_ptr());
}

Status ColumnMap::deduplicate_keys(bool recursive) {
const auto inner_rows = keys_column->size();
const auto rows = offsets_column->size();
const IColumn& ck = *static_cast<const IColumn::Ptr&>(keys_column);
const IColumn& co = *static_cast<const IColumn::Ptr&>(offsets_column);
const auto inner_rows = ck.size();
const auto rows = co.size();

if (recursive) {
auto values_column_ = values_column;
const IColumn::Ptr& values_ptr = static_cast<const IColumn::Ptr&>(values_column);
IColumn::Ptr values_column_ = values_ptr;
if (values_column_->is_nullable()) {
values_column_ = (assert_cast<ColumnNullable&>(*values_column)).get_nested_column_ptr();
values_column_ =
assert_cast<const ColumnNullable&>(*values_column_).get_nested_column_ptr();
}

if (auto* values_map = check_and_get_column<ColumnMap>(values_column_.get())) {
RETURN_IF_ERROR(values_map->deduplicate_keys(recursive));
RETURN_IF_ERROR(const_cast<ColumnMap*>(values_map)->deduplicate_keys(recursive));
}
}

DorisVector<StringRef> serialized_keys(inner_rows);

const size_t max_one_row_byte_size = keys_column->get_max_row_byte_size();
const size_t max_one_row_byte_size = ck.get_max_row_byte_size();

size_t total_bytes = max_one_row_byte_size * inner_rows;
Arena pool;
Expand All @@ -579,7 +602,7 @@ Status ColumnMap::deduplicate_keys(bool recursive) {
// reach mem limit, don't serialize in batch
const char* begin = nullptr;
for (size_t i = 0; i != inner_rows; ++i) {
serialized_keys[i] = keys_column->serialize_value_into_arena(i, pool, begin);
serialized_keys[i] = ck.serialize_value_into_arena(i, pool, begin);
}
} else {
auto* serialized_key_buffer = reinterpret_cast<uint8_t*>(pool.alloc(total_bytes));
Expand All @@ -590,15 +613,15 @@ Status ColumnMap::deduplicate_keys(bool recursive) {
serialized_keys[i].size = 0;
}

keys_column->serialize(serialized_keys.data(), inner_rows);
ck.serialize(serialized_keys.data(), inner_rows);
}

auto new_offsets = COffsets::create();
new_offsets->reserve(rows);
auto& new_offsets_data = new_offsets->get_data();

IColumn::Filter filter(inner_rows, 1);
auto& offsets = get_offsets();
const auto& offsets = static_cast<const ColumnMap*>(this)->get_offsets();

Offset64 offset = 0;
bool has_duplicated_key = false;
Expand Down Expand Up @@ -636,8 +659,12 @@ Status ColumnMap::deduplicate_keys(bool recursive) {

if (has_duplicated_key) {
offsets_column = std::move(new_offsets);
keys_column->filter(filter);
values_column->filter(filter);
auto keys_mut = IColumn::mutate(std::move(static_cast<IColumn::Ptr&>(keys_column)));
keys_mut->filter(filter);
static_cast<IColumn::Ptr&>(keys_column) = std::move(keys_mut);
auto values_mut = IColumn::mutate(std::move(static_cast<IColumn::Ptr&>(values_column)));
values_mut->filter(filter);
static_cast<IColumn::Ptr&>(values_column) = std::move(values_mut);
}

return Status::OK();
Expand Down
5 changes: 3 additions & 2 deletions be/src/core/column/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {

static MutablePtr create(const ColumnPtr& keys, const ColumnPtr& values,
const ColumnPtr& offsets) {
return ColumnMap::create(keys->assume_mutable(), values->assume_mutable(),
offsets->assume_mutable());
// Mutate to ensure exclusive ownership required by the constructor's non-const WrappedPtr access.
return ColumnMap::create(IColumn::mutate(keys), IColumn::mutate(values),
IColumn::mutate(offsets));
}

template <typename... Args,
Expand Down
13 changes: 10 additions & 3 deletions be/src/core/column/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ void ColumnNullable::update_crc32c_batch(uint32_t* __restrict hashes,
assert_cast<const ColumnUInt8&>(get_null_map_column()).get_data().data();
if (_nested_column->support_replace_column_null_data()) {
// nullmap process is slow, replace null data to default value to avoid nullmap process
_nested_column->assume_mutable()->replace_column_null_data(real_null_data);
// This is an intentional in-place mutation inside a logically-const hash computation:
// null positions are overwritten with defaults so the inner hash loop needs no null checks.
auto nested_mut = std::move(*static_cast<const IColumn::Ptr&>(_nested_column)).mutate();
nested_mut->replace_column_null_data(real_null_data);
static_cast<IColumn::Ptr&>(const_cast<WrappedPtr&>(_nested_column)) = std::move(nested_mut);
_nested_column->update_crc32c_batch(hashes, nullptr);
} else {
auto s = size();
Expand Down Expand Up @@ -380,12 +384,15 @@ size_t ColumnNullable::filter(const Filter& filter) {

Status ColumnNullable::filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) {
auto* nullable_col_ptr = assert_cast<ColumnNullable*>(col_ptr);
WrappedPtr nest_col_ptr = nullable_col_ptr->_nested_column;
// Access the nested column via const path to avoid assume_mutable_ref (which requires
// exclusive ownership). The output col_ptr was just created, so its nested column is exclusive.
IColumn* nest_col_raw = const_cast<IColumn*>(
static_cast<const WrappedPtr&>(nullable_col_ptr->_nested_column).get());

/// `get_null_map_data` will set `_need_update_has_null` to true
auto& res_nullmap = nullable_col_ptr->get_null_map_data();

RETURN_IF_ERROR(get_nested_column().filter_by_selector(sel, sel_size, nest_col_ptr.get()));
RETURN_IF_ERROR(get_nested_column().filter_by_selector(sel, sel_size, nest_col_raw));
DCHECK(res_nullmap.empty());
res_nullmap.resize(sel_size);
auto& cur_nullmap = get_null_map_column().get_data();
Expand Down
Loading
Loading