Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions cpp/src/common/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "tablet.h"

#include <cstdlib>
#include <cstring>

#include "datatype/date_converter.h"
#include "utils/errno_define.h"
Expand All @@ -28,6 +29,112 @@ using namespace common;

namespace storage {

Tablet::Tablet(uint32_t column_num, uint32_t row_num, void** columns,
const std::vector<std::string>& column_names,
const std::vector<common::TSDataType>& data_types)
: Tablet(column_names, data_types, row_num) {
if (columns == nullptr) {
err_code_ = common::E_INVALID_ARG;
return;
}
ASSERT(column_num == column_names.size());
ASSERT(column_num == data_types.size());

for (uint32_t c = 0; c < column_num; ++c) {
int ret = set_column(static_cast<int>(c), columns[c],
static_cast<int>(row_num));
if (ret != common::E_OK) {
err_code_ = ret;
break;
}
}
}

int Tablet::set_column(int column_index, void* column, int column_length) {
if (err_code_ != common::E_OK) {
return err_code_;
}
if (UNLIKELY(column_index < 0 ||
static_cast<uint32_t>(column_index) >= schema_vec_->size())) {
return common::E_OUT_OF_RANGE;
}
if (UNLIKELY(column_length < 0)) {
return common::E_INVALID_ARG;
}
uint32_t len = static_cast<uint32_t>(column_length);
if (UNLIKELY(len > max_row_num_)) {
return common::E_OUT_OF_RANGE;
}
if (len == 0) {
return common::E_OK;
}

const MeasurementSchema& schema = schema_vec_->at(column_index);
auto& col_values = value_matrix_[column_index];
BitMap& col_notnull_bitmap = bitmaps_[column_index];

if (column == nullptr) {
// Treat as all-null.
for (uint32_t r = 0; r < len; ++r) {
col_notnull_bitmap.set(r);
}
cur_row_size_ = std::max(cur_row_size_, len);
return common::E_OK;
}

switch (schema.data_type_) {
case common::BOOLEAN:
memcpy(col_values.bool_data, column, sizeof(bool) * len);
for (uint32_t r = 0; r < len; ++r) {
col_notnull_bitmap.clear(r);
}
break;
case common::DATE:
case common::INT32:
memcpy(col_values.int32_data, column, sizeof(int32_t) * len);
for (uint32_t r = 0; r < len; ++r) {
col_notnull_bitmap.clear(r);
}
break;
case common::TIMESTAMP:
case common::INT64:
memcpy(col_values.int64_data, column, sizeof(int64_t) * len);
for (uint32_t r = 0; r < len; ++r) {
col_notnull_bitmap.clear(r);
}
break;
case common::FLOAT:
memcpy(col_values.float_data, column, sizeof(float) * len);
for (uint32_t r = 0; r < len; ++r) {
col_notnull_bitmap.clear(r);
}
break;
case common::DOUBLE:
memcpy(col_values.double_data, column, sizeof(double) * len);
for (uint32_t r = 0; r < len; ++r) {
col_notnull_bitmap.clear(r);
}
break;
case common::TEXT:
case common::BLOB:
case common::STRING: {
const common::String* strings =
static_cast<const common::String*>(column);
for (uint32_t r = 0; r < len; ++r) {
col_values.string_data[r].dup_from(strings[r], page_arena_);
col_notnull_bitmap.clear(r);
}
break;
}
default:
ASSERT(false);
return common::E_NOT_SUPPORT;
}

cur_row_size_ = std::max(cur_row_size_, len);
return common::E_OK;
}

int Tablet::init() {
ASSERT(timestamps_ == nullptr);
timestamps_ = (int64_t*)malloc(sizeof(int64_t) * max_row_num_);
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/common/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,28 @@ class Tablet {
err_code_ = init();
}

/**
* @brief Constructs a Tablet object by taking pre-filled columns.
*
* For numeric types, `columns[col_index]` should point to a contiguous
* array of the corresponding C++ type.
* For string-like types (TEXT/BLOB/STRING), `columns[col_index]` should
* point to an array of `common::String` with length `row_num`.
*/
Tablet(uint32_t column_num, uint32_t row_num, void** columns,
const std::vector<std::string>& column_names,
const std::vector<common::TSDataType>& data_types);

/**
* @brief Set a whole column buffer at once.
*
* @param column_index Schema index of the column to set.
* @param column A pointer to an array with `column_length` elements.
* See the constructor doc for the expected element types.
* @param column_length Number of rows to copy from `column`.
*/
int set_column(int column_index, void* column, int column_length);

~Tablet() { destroy(); }

const std::string& get_table_name() const { return insert_target_name_; }
Expand Down
118 changes: 118 additions & 0 deletions cpp/test/writer/table_view/tsfile_writer_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,124 @@ TEST_F(TsFileWriterTableTest, MultiDatatypes) {
delete[] literal;
}

TEST_F(TsFileWriterTableTest, TabletSetColumnWriteReadAllTypes) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;

// Cover most TSDataType that storage::Tablet currently supports.
std::vector<std::string> measurement_names = {
"level", "int32", "num", "bools", "double",
"id", "ts", "text", "blob", "date"};
std::vector<common::TSDataType> data_types = {
FLOAT, INT32, INT64, BOOLEAN, DOUBLE,
STRING, TIMESTAMP, TEXT, BLOB, DATE};

for (size_t i = 0; i < measurement_names.size(); i++) {
measurement_schemas.emplace_back(
new MeasurementSchema(measurement_names[i], data_types[i]));
column_categories.emplace_back(ColumnCategory::FIELD);
}

auto table_schema =
new TableSchema("testTable", measurement_schemas, column_categories);
auto tsfile_table_writer =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);

constexpr int row_num = 20;
Tablet tablet(table_schema->get_measurement_names(),
table_schema->get_data_types(), row_num);

// Prepare input column buffers.
std::vector<float> level_vals(row_num, 1.5f);
std::vector<int32_t> int32_vals(row_num, 123);
std::vector<int64_t> int64_vals(row_num, 415412);

bool* bool_vals = new bool[row_num];
for (int i = 0; i < row_num; ++i) {
bool_vals[i] = (i % 2 == 0);
}

std::vector<double> double_vals(row_num, 2.5);

char* literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
std::vector<String> string_vals(row_num, literal_str);

std::vector<int64_t> ts_vals(row_num, 415412);

// DATE is stored as int32 (yyyymmdd) internally.
std::tm today = {};
today.tm_year = 120; // 2020
today.tm_mon = 0; // Jan
today.tm_mday = 2; // 2nd
int32_t today_int = 0;
ASSERT_EQ(DateConverter::date_to_int(today, today_int), common::E_OK);
std::vector<int32_t> date_vals(row_num, today_int);

// Set columns in one shot.
ASSERT_EQ(tablet.set_column(0, level_vals.data(), row_num), E_OK);
ASSERT_EQ(tablet.set_column(1, int32_vals.data(), row_num), E_OK);
ASSERT_EQ(tablet.set_column(2, int64_vals.data(), row_num), E_OK);
ASSERT_EQ(tablet.set_column(3, bool_vals, row_num), E_OK);
ASSERT_EQ(tablet.set_column(4, double_vals.data(), row_num), E_OK);
ASSERT_EQ(tablet.set_column(5, string_vals.data(), row_num), E_OK);
ASSERT_EQ(tablet.set_column(6, ts_vals.data(), row_num), E_OK);
ASSERT_EQ(tablet.set_column(7, string_vals.data(), row_num), E_OK);
ASSERT_EQ(tablet.set_column(8, string_vals.data(), row_num), E_OK);
ASSERT_EQ(tablet.set_column(9, date_vals.data(), row_num), E_OK);

// Set row timestamps (time column).
for (int i = 0; i < row_num; i++) {
tablet.add_timestamp(i, static_cast<int64_t>(i));
}

ASSERT_EQ(tsfile_table_writer->write_table(tablet), E_OK);
ASSERT_EQ(tsfile_table_writer->flush(), E_OK);
ASSERT_EQ(tsfile_table_writer->close(), E_OK);

delete table_schema;

auto reader = TsFileReader();
reader.open(write_file_.get_file_path());
ResultSet* ret = nullptr;
ASSERT_EQ(reader.query("testTable", measurement_names, 0, row_num, ret),
common::E_OK);

auto* table_result_set = (TableResultSet*)ret;
bool has_next = false;
int row_idx = 0;
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
ASSERT_EQ(table_result_set->get_value<float>(2), level_vals[row_idx]);
ASSERT_EQ(table_result_set->get_value<int32_t>(3), int32_vals[row_idx]);
ASSERT_EQ(table_result_set->get_value<int64_t>(4), int64_vals[row_idx]);
ASSERT_EQ(table_result_set->get_value<bool>(5), bool_vals[row_idx]);
ASSERT_EQ(table_result_set->get_value<double>(6), double_vals[row_idx]);

ASSERT_EQ(table_result_set->get_value<common::String*>(7)->compare(
literal_str),
0);
ASSERT_EQ(table_result_set->get_value<int64_t>(8), ts_vals[row_idx]);
ASSERT_EQ(table_result_set->get_value<common::String*>(9)->compare(
literal_str),
0);
ASSERT_EQ(table_result_set->get_value<common::String*>(10)->compare(
literal_str),
0);

ASSERT_TRUE(DateConverter::is_tm_ymd_equal(
table_result_set->get_value<std::tm>(11), today));
row_idx++;
}
ASSERT_EQ(row_idx, row_num);
table_result_set->close();
reader.destroy_query_data_set(table_result_set);
ASSERT_EQ(reader.close(), common::E_OK);

delete[] bool_vals;
delete[] literal;
}

TEST_F(TsFileWriterTableTest, DiffCodecTypes) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
Expand Down
Loading