Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions cpp/src/common/allocator/byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ class ByteStream {
};

public:
static const uint32_t DEFAULT_PAGE_SIZE = 1024;

ByteStream(uint32_t page_size, AllocModID mid, bool enable_atomic = false,
BaseAllocator& allocator = g_base_allocator)
: allocator_(allocator),
Expand All @@ -263,10 +265,9 @@ class ByteStream {
read_pos_(0),
marked_read_pos_(0),
page_size_(page_size),
page_mask_(page_size - 1),
mid_(mid),
wrapped_page_(false, nullptr) {
// assert(page_size >= 16); // commented out by gxh on 2023.03.09
}
wrapped_page_(false, nullptr) {}

// TODO use a specific construct function to mark it as wrapped use.
// for wrap plain buffer to ByteStream
Expand All @@ -279,6 +280,7 @@ class ByteStream {
read_pos_(0),
marked_read_pos_(0),
page_size_(0),
page_mask_(0),
mid_(MOD_DEFAULT),
wrapped_page_(false, nullptr) {}

Expand All @@ -292,6 +294,7 @@ class ByteStream {
wrapped_page_.buf_ = (uint8_t*)buf;

page_size_ = buf_len;
page_mask_ = buf_len - 1;
head_.store(&wrapped_page_);
tail_.store(&wrapped_page_);
total_size_.store(buf_len);
Expand Down Expand Up @@ -340,6 +343,7 @@ class ByteStream {
// never used TODO
void shallow_clone_from(ByteStream& other) {
this->page_size_ = other.page_size_;
this->page_mask_ = other.page_mask_;
this->mid_ = other.mid_;
this->head_.store(other.head_.load());
this->tail_.store(other.tail_.load());
Expand All @@ -366,10 +370,10 @@ class ByteStream {
std::cout << "write_buf error " << ret << std::endl;
return ret;
}
uint32_t remainder = page_size_ - (total_size_.load() % page_size_);
uint32_t remainder = page_size_ - (total_size_.load() & page_mask_);
uint32_t copy_len =
remainder < (len - write_len) ? remainder : (len - write_len);
memcpy(tail_.load()->buf_ + total_size_.load() % page_size_,
memcpy(tail_.load()->buf_ + (total_size_.load() & page_mask_),
buf + write_len, copy_len);
total_size_.atomic_aaf(copy_len);
write_len += copy_len;
Expand All @@ -390,11 +394,11 @@ class ByteStream {
if (RET_FAIL(check_space())) {
return ret;
}
uint32_t remainder = page_size_ - (read_pos_ % page_size_);
uint32_t remainder = page_size_ - (read_pos_ & page_mask_);
uint32_t copy_len = remainder < want_len_limited - read_len
? remainder
: want_len_limited - read_len;
memcpy(buf + read_len, read_page_->buf_ + (read_pos_ % page_size_),
memcpy(buf + read_len, read_page_->buf_ + (read_pos_ & page_mask_),
copy_len);
read_len += copy_len;
read_pos_ += copy_len;
Expand Down Expand Up @@ -446,16 +450,17 @@ class ByteStream {
return b;
}
b.buf_ =
(char*)(tail_.load()->buf_ + (total_size_.load() % page_size_));
b.len_ = page_size_ - (total_size_.load() % page_size_);
(char*)(tail_.load()->buf_ + (total_size_.load() & page_mask_));
b.len_ = page_size_ - (total_size_.load() & page_mask_);
return b;
}

void buffer_used(uint32_t used_bytes) {
ASSERT(used_bytes >= 1);
// would not span page
ASSERT((total_size_.load() / page_size_) ==
((total_size_.load() + used_bytes - 1) / page_size_));
ASSERT(page_size_ == 0 ||
(total_size_.load() / page_size_) ==
((total_size_.load() + used_bytes - 1) / page_size_));
total_size_.atomic_aaf(used_bytes);
}

Expand All @@ -471,7 +476,7 @@ class ByteStream {
if (RET_FAIL(prepare_space())) {
return ret;
}
uint32_t remainder = page_size_ - (total_size_.load() % page_size_);
uint32_t remainder = page_size_ - (total_size_.load() & page_mask_);
uint32_t step =
remainder < (len - advanced) ? remainder : (len - advanced);
total_size_.atomic_aaf(step);
Expand Down Expand Up @@ -501,8 +506,8 @@ class ByteStream {
if (cur_ != nullptr) {
b.buf_ = (char*)cur_->buf_;
if (cur_ == end_ &&
host_.total_size_.load() % host_.page_size_ != 0) {
b.len_ = host_.total_size_.load() % host_.page_size_;
(host_.total_size_.load() & host_.page_mask_) != 0) {
b.len_ = host_.total_size_.load() & host_.page_mask_;
} else {
b.len_ = host_.page_size_;
}
Expand Down Expand Up @@ -559,7 +564,7 @@ class ByteStream {

while (true) {
if (cur_ == host_end) {
if (host_total_size % host_.page_size_ == 0) {
if ((host_total_size & host_.page_mask_) == 0) {
if (read_offset_within_cur_page_ == host_.page_size_) {
return b;
} else {
Expand All @@ -573,15 +578,15 @@ class ByteStream {
}
} else {
if (read_offset_within_cur_page_ ==
(host_total_size % host_.page_size_)) {
(host_total_size & host_.page_mask_)) {
return b;
} else {
b.buf_ = ((char*)(cur_->buf_)) +
read_offset_within_cur_page_;
b.len_ = (host_total_size % host_.page_size_) -
b.len_ = (host_total_size & host_.page_mask_) -
read_offset_within_cur_page_;
read_offset_within_cur_page_ =
(host_total_size % host_.page_size_);
(host_total_size & host_.page_mask_);
total_end_offset_ += b.len_;
return b;
}
Expand Down Expand Up @@ -611,7 +616,7 @@ class ByteStream {
FORCE_INLINE int prepare_space() {
int ret = common::E_OK;
if (UNLIKELY(tail_.load() == nullptr ||
total_size_.load() % page_size_ == 0)) {
(total_size_.load() & page_mask_) == 0)) {
Page* p = nullptr;
if (RET_FAIL(alloc_page(p))) {
return ret;
Expand All @@ -628,7 +633,7 @@ class ByteStream {
}
if (UNLIKELY(read_page_ == nullptr)) {
read_page_ = head_.load();
} else if (UNLIKELY(read_pos_ % page_size_ == 0)) {
} else if (UNLIKELY((read_pos_ & page_mask_) == 0)) {
read_page_ = read_page_->next_.load();
}
if (UNLIKELY(read_page_ == nullptr)) {
Expand Down Expand Up @@ -668,6 +673,7 @@ class ByteStream {
uint32_t read_pos_; // current reader position
uint32_t marked_read_pos_; // current reader position
uint32_t page_size_;
uint32_t page_mask_; // page_size_ - 1, for bitwise AND instead of modulo
AllocModID mid_;
Page wrapped_page_;
};
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/common/container/bit_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class BitMap {
return (*start_addr & bit_mask);
}

// Set all bits to 0 (all non-null in TsFile convention where bit=1 is null)
FORCE_INLINE void clear_all() { memset(bitmap_, 0x00, size_); }

FORCE_INLINE uint32_t get_size() { return size_; }

FORCE_INLINE char* get_bitmap() { return bitmap_; } // for debug
Expand Down
Loading
Loading