Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 60 additions & 7 deletions cpp/src/common/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ int Tablet::init() {
case BLOB:
case TEXT:
case STRING: {
value_matrix_[c].string_data =
(common::String*)malloc(sizeof(String) * max_row_num_);
auto* sc = new StringColumn();
sc->init(max_row_num_, max_row_num_ * 32);
value_matrix_[c].string_col = sc;
break;
}
default:
Expand All @@ -99,6 +100,7 @@ int Tablet::init() {
for (size_t c = 0; c < schema_count; c++) {
bitmaps_[c].init(max_row_num_, false);
}

return E_OK;
}

Expand Down Expand Up @@ -132,7 +134,8 @@ void Tablet::destroy() {
case BLOB:
case TEXT:
case STRING:
free(value_matrix_[c].string_data);
value_matrix_[c].string_col->destroy();
delete value_matrix_[c].string_col;
break;
default:
break;
Expand Down Expand Up @@ -197,8 +200,7 @@ void* Tablet::get_value(int row_index, uint32_t schema_index,
return &double_values[row_index];
}
case STRING: {
auto string_values = column_values.string_data;
return &string_values[row_index];
return &column_values.string_col->get_string_view(row_index);
}
default:
return nullptr;
Expand All @@ -208,8 +210,8 @@ void* Tablet::get_value(int row_index, uint32_t schema_index,
template <>
void Tablet::process_val(uint32_t row_index, uint32_t schema_index,
common::String str) {
value_matrix_[schema_index].string_data[row_index].dup_from(str,
page_arena_);
value_matrix_[schema_index].string_col->append(row_index, str.buf_,
str.len_);
bitmaps_[schema_index].clear(row_index); /* mark as non-null */
}

Expand Down Expand Up @@ -348,6 +350,57 @@ void Tablet::set_column_categories(
}
}

void Tablet::reset_string_columns() {
size_t schema_count = schema_vec_->size();
for (size_t c = 0; c < schema_count; c++) {
const MeasurementSchema& schema = schema_vec_->at(c);
if (schema.data_type_ == STRING || schema.data_type_ == TEXT ||
schema.data_type_ == BLOB) {
value_matrix_[c].string_col->reset();
}
}
}

std::vector<uint32_t> Tablet::find_all_device_boundaries() const {
const uint32_t row_count = get_cur_row_size();
if (row_count <= 1) return {};

// Use uint64_t bitmap instead of vector<bool> for faster set/test/scan.
const uint32_t nwords = (row_count + 63) / 64;
std::vector<uint64_t> boundary(nwords, 0);

for (auto col_idx : id_column_indexes_) {
const StringColumn& sc = *value_matrix_[col_idx].string_col;
const uint32_t* off = sc.offsets;
const char* buf = sc.buffer;
for (uint32_t i = 1; i < row_count; i++) {
if (boundary[i >> 6] & (1ULL << (i & 63))) continue;
uint32_t len_a = off[i] - off[i - 1];
uint32_t len_b = off[i + 1] - off[i];
if (len_a != len_b ||
(len_a > 0 &&
memcmp(buf + off[i - 1], buf + off[i], len_a) != 0)) {
boundary[i >> 6] |= (1ULL << (i & 63));
}
}
}

// Collect boundary positions using bitscan
std::vector<uint32_t> result;
for (uint32_t w = 0; w < nwords; w++) {
uint64_t bits = boundary[w];
while (bits) {
uint32_t bit = __builtin_ctzll(bits);
uint32_t idx = w * 64 + bit;
if (idx > 0 && idx < row_count) {
result.push_back(idx);
}
bits &= bits - 1; // clear lowest set bit
}
}
return result;
}

std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const {
std::vector<std::string*> id_array;
id_array.push_back(new std::string(insert_target_name_));
Expand Down
70 changes: 68 additions & 2 deletions cpp/src/common/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,78 @@ class TabletColIterator;
* with their associated metadata such as column names and types.
*/
class Tablet {
// Arrow-style string column: offsets + contiguous buffer.
// string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
struct StringColumn {
uint32_t* offsets; // length: max_rows + 1
char* buffer; // contiguous string data
uint32_t buf_capacity; // allocated buffer size
uint32_t buf_used; // bytes written so far

StringColumn()
: offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) {}

void init(uint32_t max_rows, uint32_t init_buf_capacity) {
offsets = (uint32_t*)common::mem_alloc(
sizeof(uint32_t) * (max_rows + 1), common::MOD_DEFAULT);
offsets[0] = 0;
buf_capacity = init_buf_capacity;
buffer =
(char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT);
buf_used = 0;
}

void destroy() {
if (offsets) common::mem_free(offsets);
offsets = nullptr;
if (buffer) common::mem_free(buffer);
buffer = nullptr;
buf_capacity = buf_used = 0;
}

void reset() {
buf_used = 0;
if (offsets) offsets[0] = 0;
}

void append(uint32_t row, const char* data, uint32_t len) {
// Grow buffer if needed
if (buf_used + len > buf_capacity) {
buf_capacity = buf_capacity * 2 + len;
buffer = (char*)common::mem_realloc(buffer, buf_capacity);
}
memcpy(buffer + buf_used, data, len);
offsets[row] = buf_used;
offsets[row + 1] = buf_used + len;
buf_used += len;
}

const char* get_str(uint32_t row) const {
return buffer + offsets[row];
}
uint32_t get_len(uint32_t row) const {
return offsets[row + 1] - offsets[row];
}
// Return a String view for a given row. The returned reference is
// valid until the next call to get_string_view on this column.
common::String& get_string_view(uint32_t row) {
view_cache_.buf_ = buffer + offsets[row];
view_cache_.len_ = offsets[row + 1] - offsets[row];
return view_cache_;
}

private:
common::String view_cache_;
};

struct ValueMatrixEntry {
union {
int32_t* int32_data;
int64_t* int64_data;
float* float_data;
double* double_data;
bool* bool_data;
common::String* string_data;
StringColumn* string_col;
};
};

Expand Down Expand Up @@ -201,6 +265,7 @@ class Tablet {
void set_column_categories(
const std::vector<common::ColumnCategory>& column_categories);
std::shared_ptr<IDeviceID> get_device_id(int i) const;
std::vector<uint32_t> find_all_device_boundaries() const;
/**
* @brief Template function to add a value of type T to the specified row
* and column by name.
Expand Down Expand Up @@ -234,6 +299,8 @@ class Tablet {
schema_map_ = schema_map;
}

void reset_string_columns();

friend class TabletColIterator;
friend class TsFileWriter;
friend struct MeasurementNamesFromTablet;
Expand All @@ -246,7 +313,6 @@ class Tablet {
private:
template <typename T>
void process_val(uint32_t row_index, uint32_t schema_index, T val);
common::PageArena page_arena_;
uint32_t max_row_num_;
uint32_t cur_row_size_;
std::string insert_target_name_;
Expand Down
71 changes: 54 additions & 17 deletions cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -934,17 +934,20 @@ int TsFileWriter::write_table(Tablet& tablet) {
}
}
record_count_since_last_flush_ += tablet.cur_row_size_;
// Reset string column buffers so the tablet can be reused for the next
// batch without accumulating memory across writes.
tablet.reset_string_columns();
ret = check_memory_size_and_may_flush_chunks();
return ret;
}

std::vector<std::pair<std::shared_ptr<IDeviceID>, int>>
TsFileWriter::split_tablet_by_device(const Tablet& tablet) {
std::vector<std::pair<std::shared_ptr<IDeviceID>, int>> result;
std::shared_ptr<IDeviceID> last_device_id =
std::make_shared<StringArrayDeviceID>("last_device_id");

if (tablet.id_column_indexes_.empty()) {
result.emplace_back(std::move(last_device_id), 0);
auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id");
result.emplace_back(std::move(sentinel), 0);
std::vector<std::string*> id_array;
id_array.push_back(new std::string(tablet.insert_target_name_));
auto res = std::make_shared<StringArrayDeviceID>(id_array);
Expand All @@ -953,14 +956,22 @@ TsFileWriter::split_tablet_by_device(const Tablet& tablet) {
return result;
}

for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) {
std::shared_ptr<IDeviceID> cur_device_id(tablet.get_device_id(i));
if (*cur_device_id != *last_device_id) {
result.emplace_back(std::move(last_device_id), i);
last_device_id = std::move(cur_device_id);
}
const uint32_t row_count = tablet.get_cur_row_size();
if (row_count == 0) return result;

auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id");
result.emplace_back(std::move(sentinel), 0);

auto boundaries = tablet.find_all_device_boundaries();

uint32_t seg_start = 0;
for (uint32_t b : boundaries) {
std::shared_ptr<IDeviceID> dev_id(tablet.get_device_id(seg_start));
result.emplace_back(std::move(dev_id), b);
seg_start = b;
}
result.emplace_back(std::move(last_device_id), tablet.get_cur_row_size());
std::shared_ptr<IDeviceID> last_id(tablet.get_device_id(seg_start));
result.emplace_back(std::move(last_id), row_count);
return result;
}

Expand Down Expand Up @@ -996,7 +1007,7 @@ int TsFileWriter::write_column(ChunkWriter* chunk_writer, const Tablet& tablet,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::STRING) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.string_data,
write_typed_column(chunk_writer, timestamps, col_values.string_col,
col_notnull_bitmap, start_idx, end_idx);
} else {
ASSERT(false);
Expand Down Expand Up @@ -1061,8 +1072,8 @@ int TsFileWriter::value_write_column(ValueChunkWriter* value_chunk_writer,
case common::TEXT:
case common::BLOB:
ret = write_typed_column(value_chunk_writer, timestamps,
(common::String*)col_values.string_data,
col_notnull_bitmap, start_idx, end_idx);
col_values.string_col, col_notnull_bitmap,
start_idx, end_idx);
break;
default:
ret = E_NOT_SUPPORT;
Expand Down Expand Up @@ -1140,10 +1151,21 @@ int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer,

int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer,
int64_t* timestamps,
common::String* col_values,
Tablet::StringColumn* string_col,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
int ret = E_OK;
for (uint32_t r = start_idx; r < end_idx; r++) {
if (LIKELY(!col_notnull_bitmap.test(r))) {
common::String val(
string_col->buffer + string_col->offsets[r],
string_col->offsets[r + 1] - string_col->offsets[r]);
if (RET_FAIL(chunk_writer->write(timestamps[r], val))) {
return ret;
}
}
}
return ret;
}

int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
Expand Down Expand Up @@ -1183,10 +1205,25 @@ int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,

int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps,
common::String* col_values,
Tablet::StringColumn* string_col,
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
int ret = E_OK;
for (uint32_t r = start_idx; r < end_idx; r++) {
common::String val(string_col->buffer + string_col->offsets[r],
string_col->offsets[r + 1] - string_col->offsets[r]);
if (LIKELY(col_notnull_bitmap.test(r))) {
if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) {
return ret;
}
} else {
if (RET_FAIL(
value_chunk_writer->write(timestamps[r], val, false))) {
return ret;
}
}
}
return ret;
}

// TODO make sure ret is meaningful to SDK user
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/writer/tsfile_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class TsFileWriter {
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx);
int write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps,
common::String* col_values,
Tablet::StringColumn* string_col,
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx);

Expand Down Expand Up @@ -198,7 +198,8 @@ class TsFileWriter {
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx);
int write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps, common::String* col_values,
int64_t* timestamps,
Tablet::StringColumn* string_col,
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx);

Expand Down
Loading