Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ if (${COV_ENABLED})
message("add_definitions -DCOV_ENABLED=1")
endif ()

option(ENABLE_MEM_STAT "Enable memory status" ON)

if (ENABLE_MEM_STAT)
add_definitions(-DENABLE_MEM_STAT)
message("add_definitions -DENABLE_MEM_STAT")
endif ()


if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)
Expand Down
87 changes: 40 additions & 47 deletions cpp/src/common/allocator/alloc_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,46 +30,33 @@ namespace common {

enum AllocModID {
__FIRST_MOD_ID = 0,
// if you are sure you will not consume too much memory, you can use
// MOD_DEFAULT.
MOD_DEFAULT = 0,
MOD_MEMTABLE = 1,
MOD_SCHEMA = 2,
MOD_SQL = 3,
MOD_NET = 4,
MOD_NET_DATA = 5,
MOD_TVLIST_DATA = 6,
MOD_TVLIST_OBJ = 7,
MOD_TSBLOCK = 8,
MOD_CONTAINER = 9,
MOD_TSSTORE_OBJ = 10,
MOD_FLUSH_TASK_OBJ = 11,
MOD_PAGE_WRITER_OUTPUT_STREAM = 12,
MOD_CW_PAGES_DATA = 13,
MOD_CHUNK_WRITER_OBJ = 14,
MOD_STATISTIC_OBJ = 15,
MOD_ENCODER_OBJ = 16,
MOD_DECODER_OBJ = 17,
MOD_TSFILE_WRITER_META = 18,
MOD_TSFILE_WRITE_STREAM = 19,
MOD_TIMESERIES_INDEX_OBJ = 20,
MOD_BLOOM_FILTER = 21,
MOD_OPEN_FILE_OBJ = 22,
MOD_TSFILE_READER = 23,
MOD_CHUNK_READER = 24,
MOD_COMPRESSOR_OBJ = 25,
MOD_ARRAY = 26,
MOD_HASH_TABLE = 27,
MOD_WRITER_INDEX_NODE = 28,
MOD_TS2DIFF_OBJ = 29,
MOD_BITENCODE_OBJ = 30,
MOD_DICENCODE_OBJ = 31,
MOD_ZIGZAG_OBJ = 32,
MOD_DEVICE_META_ITER = 33,
MOD_DEVICE_TASK_ITER = 34,
MOD_DEVICE_ORDER_TSBLOCK_READER = 35,
__LAST_MOD_ID = 36, // prev + 1,
__MAX_MOD_ID = 127, // leave 1 bit to detect header size
MOD_TVLIST_DATA = 1,
MOD_TSBLOCK = 2,
MOD_PAGE_WRITER_OUTPUT_STREAM = 3,
MOD_CW_PAGES_DATA = 4,
MOD_STATISTIC_OBJ = 5,
MOD_ENCODER_OBJ = 6,
MOD_DECODER_OBJ = 7,
MOD_TSFILE_WRITER_META = 8,
MOD_TSFILE_WRITE_STREAM = 9,
MOD_TIMESERIES_INDEX_OBJ = 10,
MOD_BLOOM_FILTER = 11,
MOD_TSFILE_READER = 12,
MOD_CHUNK_READER = 13,
MOD_COMPRESSOR_OBJ = 14,
MOD_ARRAY = 15,
MOD_HASH_TABLE = 16,
MOD_WRITER_INDEX_NODE = 17,
MOD_TS2DIFF_OBJ = 18,
MOD_BITENCODE_OBJ = 19,
MOD_DICENCODE_OBJ = 20,
MOD_ZIGZAG_OBJ = 21,
MOD_DEVICE_META_ITER = 22,
MOD_DEVICE_TASK_ITER = 23,
MOD_TABLET = 24,
__LAST_MOD_ID = 25,
__MAX_MOD_ID = 127,
};

extern const char* g_mod_names[__LAST_MOD_ID];
Expand All @@ -82,24 +69,30 @@ void* mem_realloc(void* ptr, uint32_t size);
class ModStat {
public:
ModStat() : stat_arr_(NULL) {}
~ModStat() { destroy(); }

static ModStat& get_instance() {
/*
* This is the singleton of Mod Memory Statistic.
* gms is short for Global Mod Statistic
*/
static ModStat gms;
#ifdef ENABLE_MEM_STAT
if (UNLIKELY(gms.stat_arr_ == NULL)) {
gms.init();
}
#endif
return gms;
}
void init();
void destroy();
INLINE void update_alloc(AllocModID mid, int32_t size) {
// ASSERT(mid < __LAST_MOD_ID);
// ATOMIC_FAA(get_item(mid), size);
#ifdef ENABLE_MEM_STAT
ASSERT(mid < __LAST_MOD_ID);
ATOMIC_FAA(get_item(mid), size);
#endif
}
void update_free(AllocModID mid, uint32_t size) {
// ASSERT(mid < __LAST_MOD_ID);
// ATOMIC_FAA(get_item(mid), 0 - size);
#ifdef ENABLE_MEM_STAT
ASSERT(mid < __LAST_MOD_ID);
ATOMIC_FAA(get_item(mid), 0 - size);
#endif
}
void print_stat();

Expand Down
40 changes: 25 additions & 15 deletions cpp/src/common/allocator/byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,8 @@ class ByteStream {
// assert(page_size >= 16); // commented out by gxh on 2023.03.09
}

// TODO use a specific construct function to mark it as wrapped use.
// for wrap plain buffer to ByteStream
ByteStream()
ByteStream(AllocModID mid = MOD_DEFAULT)
: allocator_(g_base_allocator),
head_(nullptr, false),
tail_(nullptr, false),
Expand All @@ -279,7 +278,7 @@ class ByteStream {
read_pos_(0),
marked_read_pos_(0),
page_size_(0),
mid_(MOD_DEFAULT),
mid_(mid),
wrapped_page_(false, nullptr) {}

~ByteStream() { destroy(); }
Expand Down Expand Up @@ -669,6 +668,11 @@ class ByteStream {
uint32_t marked_read_pos_; // current reader position
uint32_t page_size_;
AllocModID mid_;

public:
AllocModID get_mid() const { return mid_; }

private:
Page wrapped_page_;
};

Expand Down Expand Up @@ -896,10 +900,10 @@ class SerializationUtil {

FORCE_INLINE static int chunk_read_all_data(ByteStream& in, ByteStream& out,
size_t chunk_size = 4096) {
char* buffer = new char[chunk_size];
char* buffer = static_cast<char*>(mem_alloc(chunk_size, out.get_mid()));
if (buffer == nullptr) return E_OOM;
int ret = common::E_OK;
while (in.remaining_size() > 0) {
// Adjust read size based on remaining input size
uint32_t bytes_to_read = static_cast<uint32_t>(
std::min(chunk_size, static_cast<size_t>(in.remaining_size())));

Expand All @@ -913,7 +917,7 @@ class SerializationUtil {
break;
}
}
delete[] buffer;
mem_free(buffer);
return ret;
}

Expand Down Expand Up @@ -1172,16 +1176,18 @@ class SerializationUtil {
str = nullptr;
return ret;
} else {
char* tmp_buf = static_cast<char*>(malloc(len));
char* tmp_buf =
static_cast<char*>(mem_alloc(len, in.get_mid()));
if (tmp_buf == nullptr) return E_OOM;
if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) {
free(tmp_buf);
mem_free(tmp_buf);
return ret;
} else if (len != read_len) {
free(tmp_buf);
mem_free(tmp_buf);
ret = E_BUF_NOT_ENOUGH;
} else {
str = new std::string(tmp_buf, len);
free(tmp_buf);
mem_free(tmp_buf);
}
}
}
Expand All @@ -1194,15 +1200,17 @@ class SerializationUtil {
int32_t read_len = 0;
if (RET_FAIL(read_var_int(len, in))) {
} else {
char* tmp_buf = (char*)malloc(len + 1);
char* tmp_buf =
static_cast<char*>(mem_alloc(len + 1, in.get_mid()));
if (tmp_buf == nullptr) return E_OOM;
tmp_buf[len] = '\0';
if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) {
} else if (len != read_len) {
ret = E_BUF_NOT_ENOUGH;
} else {
str = std::string(tmp_buf);
}
free(tmp_buf);
mem_free(tmp_buf);
}
return ret;
}
Expand All @@ -1220,15 +1228,17 @@ class SerializationUtil {
if (RET_FAIL(read_i32(len, in))) {
} else {
int32_t read_len = 0;
char* tmp_buf = static_cast<char*>(malloc(len + 1));
char* tmp_buf =
static_cast<char*>(mem_alloc(len + 1, in.get_mid()));
if (tmp_buf == nullptr) return E_OOM;
tmp_buf[len] = '\0';
if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) {
} else if (len != read_len) {
ret = E_BUF_NOT_ENOUGH;
} else {
str = std::string(tmp_buf);
}
free(tmp_buf);
mem_free(tmp_buf);
}
return ret;
}
Expand Down Expand Up @@ -1308,7 +1318,7 @@ FORCE_INLINE char* get_bytes_from_bytestream(ByteStream& bs) {
return nullptr;
}
uint32_t size = bs.total_size();
char* ret_buf = (char*)malloc(size);
char* ret_buf = static_cast<char*>(mem_alloc(size, bs.get_mid()));
if (ret_buf == nullptr) {
return nullptr;
}
Expand Down
103 changes: 75 additions & 28 deletions cpp/src/common/allocator/mem_alloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#endif
#include <string.h>

#include <iomanip>
#include <iostream>

#include "alloc_base.h"
Expand All @@ -34,33 +35,30 @@ namespace common {

const char* g_mod_names[__LAST_MOD_ID] = {
/* 0 */ "DEFAULT",
/* 1 */ "MEMTABLE",
/* 2 */ "SCHEMA",
/* 3 */ "SQL",
/* 4 */ "NET",
/* 5 */ "NET_DATA",
/* 6 */ "TVLIST_DATA",
/* 7 */ "TVLIST_OBJ",
/* 8 */ "TSBLOCK",
/* 9 */ "CONTAINER",
/* 10 */ "TSSTORE_OBJ",
/* 11 */ "FLUSH_TASK_OBJ",
/* 12 */ "PAGE_WRITER_OUTPUT_STREAM",
/* 13 */ "CW_PAGES_DATA",
/* 14 */ "CHUNK_WRITER_OBJ",
/* 15 */ "STATISTIC_OBJ",
/* 16 */ "ENCODER_OBJ",
/* 17 */ "DECODER_OBJ",
/* 18 */ "TSFILE_WRITER_META",
/* 19 */ "TSFILE_WRITE_STREAM",
/* 20 */ "TIMESERIES_INDEX_OBJ",
/* 21 */ "BLOOM_FILTER",
/* 22 */ "OPEN_FILE_OBJ",
/* 23 */ "TSFILE_READER",
/* 24 */ "CHUNK_READER",
/* 25 */ "COMPRESSOR_OBJ",
/* 26 */ "ARRAY",
/* 27 */ "HASH_TABLE",
/* 1 */ "TVLIST_DATA",
/* 2 */ "TSBLOCK",
/* 3 */ "PAGE_WRITER_OUTPUT_STREAM",
/* 4 */ "CW_PAGES_DATA",
/* 5 */ "STATISTIC_OBJ",
/* 6 */ "ENCODER_OBJ",
/* 7 */ "DECODER_OBJ",
/* 8 */ "TSFILE_WRITER_META",
/* 9 */ "TSFILE_WRITE_STREAM",
/* 10 */ "TIMESERIES_INDEX_OBJ",
/* 11 */ "BLOOM_FILTER",
/* 12 */ "TSFILE_READER",
/* 13 */ "CHUNK_READER",
/* 14 */ "COMPRESSOR_OBJ",
/* 15 */ "ARRAY",
/* 16 */ "HASH_TABLE",
/* 17 */ "WRITER_INDEX_NODE",
/* 18 */ "TS2DIFF_OBJ",
/* 19 */ "BITENCODE_OBJ",
/* 20 */ "DICENCODE_OBJ",
/* 21 */ "ZIGZAG_OBJ",
/* 22 */ "DEVICE_META_ITER",
/* 23 */ "DEVICE_TASK_ITER",
/* 24 */ "TABLET",
};

// Most modern CPUs (e.g., x86_64, Arm) support at least 8-byte alignment,
Expand Down Expand Up @@ -97,6 +95,7 @@ void* mem_alloc(uint32_t size, AllocModID mid) {
auto high4b = static_cast<uint32_t>(header >> 32);
*reinterpret_cast<uint32_t*>(raw) = high4b;
*reinterpret_cast<uint32_t*>(raw + 4) = low4b;
ModStat::get_instance().update_alloc(mid, static_cast<int32_t>(size));
return raw + header_size;
}

Expand Down Expand Up @@ -164,14 +163,62 @@ void* mem_realloc(void* ptr, uint32_t size) {
}

void ModStat::init() {
if (stat_arr_ != NULL) {
return;
}
stat_arr_ = (int32_t*)(::malloc(ITEM_SIZE * ITEM_COUNT));
for (int8_t i = 0; i < __LAST_MOD_ID; i++) {
int32_t* item = get_item(i);
*item = 0;
}
}

void ModStat::destroy() { ::free(stat_arr_); }
void ModStat::destroy() {
::free(stat_arr_);
stat_arr_ = NULL;
}

void ModStat::print_stat() {
if (stat_arr_ == NULL) return;

struct Entry {
const char* name;
int32_t val;
};
Entry entries[__LAST_MOD_ID];
int count = 0;
int64_t total = 0;

for (int i = 0; i < __LAST_MOD_ID; i++) {
int32_t val = ATOMIC_FAA(get_item(i), 0);
total += val;
if (val != 0) {
entries[count++] = {g_mod_names[i], val};
}
}

for (int i = 0; i < count - 1; i++) {
for (int j = i + 1; j < count; j++) {
if (entries[j].val > entries[i].val) {
Entry tmp = entries[i];
entries[i] = entries[j];
entries[j] = tmp;
}
}
}

std::cout << "=== Memory Statistics ===" << std::endl;
for (int i = 0; i < count; i++) {
std::cout << " " << entries[i].name << ": " << entries[i].val
<< " bytes" << std::endl;
}
double kb = total / 1024.0;
double mb = kb / 1024.0;
std::cout << " TOTAL: " << total << " bytes / " << std::fixed
<< std::setprecision(2) << kb << " KB / " << mb << " MB"
<< std::endl;
std::cout << "=========================" << std::endl;
}

BaseAllocator g_base_allocator;

Expand Down
Loading
Loading