Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
421 changes: 327 additions & 94 deletions src/ailego/buffer/vector_page_table.cc

Large diffs are not rendered by default.

22 changes: 16 additions & 6 deletions src/core/algorithm/flat/flat_streamer_entity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,20 @@ int FlatStreamerEntity::add(uint64_t key, const void *vec, size_t size) {

IndexStorage::MemoryBlock head_block;
this->get_head_block(head_block);
const BlockLocation *bl =
reinterpret_cast<const BlockLocation *>(head_block.data());
if (ailego_unlikely(bl == nullptr)) {
LOG_ERROR("Failed to get block loc");
return IndexError_ReadData;
BlockLocation block;
{
const BlockLocation *bl =
reinterpret_cast<const BlockLocation *>(head_block.data());
if (ailego_unlikely(bl == nullptr)) {
LOG_ERROR("Failed to get block loc");
return IndexError_ReadData;
}
block = *bl;
}
BlockLocation block = *bl;
// Release the head block reference early so that the buffer pool ref_count
// and memory budget held by it do not block subsequent acquire/evict in this
// function (alloc_block / add_to_block may compete for the same memory).
head_block.reset(nullptr);

if (!this->is_valid_block(block)) {
int ret = this->alloc_block(block, &block);
Expand Down Expand Up @@ -922,6 +929,9 @@ int FlatStreamerEntity::add_vector_with_id(const uint32_t id, const void *query,
this->get_head_block(head_block);
BlockLocation block =
*reinterpret_cast<const BlockLocation *>(head_block.data());
// Release buffer-pool pin before any alloc_block() call that may trigger
// append_segment() and rebuild the pool (same reason as in add()).
head_block.reset(nullptr);
if (!this->is_valid_block(block)) {
int ret = this->alloc_block(block, &block);
if (ailego_unlikely(ret != 0)) {
Expand Down
29 changes: 26 additions & 3 deletions src/core/algorithm/hnsw/hnsw_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,21 @@ struct BufferPoolMemoryBlock {
void *data)
: buffer_pool_handle_(handle), buffer_block_id_(block_id), data_(data) {}

static BufferPoolMemoryBlock MakeOwned(void *owned_data) {
BufferPoolMemoryBlock b;
b.owns_buffer_ = true;
b.data_ = owned_data;
return b;
}

BufferPoolMemoryBlock(const BufferPoolMemoryBlock &rhs)
: buffer_pool_handle_(rhs.buffer_pool_handle_),
buffer_block_id_(rhs.buffer_block_id_),
data_(rhs.data_) {
if (buffer_pool_handle_) {
if (rhs.owns_buffer_) {
owns_buffer_ = false;
buffer_pool_handle_ = nullptr;
} else if (buffer_pool_handle_) {
buffer_pool_handle_->acquire_one(buffer_block_id_);
}
}
Expand All @@ -216,7 +226,10 @@ struct BufferPoolMemoryBlock {
buffer_pool_handle_ = rhs.buffer_pool_handle_;
buffer_block_id_ = rhs.buffer_block_id_;
data_ = rhs.data_;
if (buffer_pool_handle_) {
if (rhs.owns_buffer_) {
owns_buffer_ = false;
buffer_pool_handle_ = nullptr;
} else if (buffer_pool_handle_) {
buffer_pool_handle_->acquire_one(buffer_block_id_);
}
}
Expand All @@ -226,8 +239,10 @@ struct BufferPoolMemoryBlock {
BufferPoolMemoryBlock(BufferPoolMemoryBlock &&rhs) noexcept
: buffer_pool_handle_(rhs.buffer_pool_handle_),
buffer_block_id_(rhs.buffer_block_id_),
owns_buffer_(rhs.owns_buffer_),
data_(rhs.data_) {
rhs.buffer_pool_handle_ = nullptr;
rhs.owns_buffer_ = false;
rhs.data_ = nullptr;
}

Expand All @@ -236,8 +251,10 @@ struct BufferPoolMemoryBlock {
release();
buffer_pool_handle_ = rhs.buffer_pool_handle_;
buffer_block_id_ = rhs.buffer_block_id_;
owns_buffer_ = rhs.owns_buffer_;
data_ = rhs.data_;
rhs.buffer_pool_handle_ = nullptr;
rhs.owns_buffer_ = false;
rhs.data_ = nullptr;
}
return *this;
Expand All @@ -260,7 +277,12 @@ struct BufferPoolMemoryBlock {

private:
void release() {
if (buffer_pool_handle_) {
if (owns_buffer_) {
if (data_) {
ailego_free(data_);
}
owns_buffer_ = false;
} else if (buffer_pool_handle_) {
buffer_pool_handle_->release_one(buffer_block_id_);
buffer_pool_handle_ = nullptr;
}
Expand All @@ -269,6 +291,7 @@ struct BufferPoolMemoryBlock {

ailego::VecBufferPoolHandle *buffer_pool_handle_{nullptr};
size_t buffer_block_id_{0};
bool owns_buffer_{false};
void *data_{nullptr};
};

Expand Down
39 changes: 17 additions & 22 deletions src/core/algorithm/hnsw/hnsw_index_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class HnswIndexHashMap {
items_(reinterpret_cast<const Item *>(data)) {}
//! Return a empty loc or the key item loc

Slot(Chunk::Pointer &&chunk, IndexStorage::MemoryBlock &&mem_block)
: chunk_(std::move(chunk)), items_block_(std::move(mem_block)) {
items_ = reinterpret_cast<const Item *>(items_block_.data());
Slot(Chunk::Pointer &&chunk, std::vector<char> &&local_data)
: chunk_(std::move(chunk)), local_data_(std::move(local_data)) {
items_ = reinterpret_cast<const Item *>(local_data_.data());
}
const_iterator find(key_type key, uint32_t max_items, uint32_t mask) const {
auto it = &items_[key & mask];
Expand Down Expand Up @@ -73,8 +73,8 @@ class HnswIndexHashMap {

private:
Chunk::Pointer chunk_{};
const Item *items_{nullptr}; // point to chunk data
IndexStorage::MemoryBlock items_block_{};
const Item *items_{nullptr}; // point to local_data_
std::vector<char> local_data_{};
};

public:
Expand Down Expand Up @@ -114,9 +114,9 @@ class HnswIndexHashMap {
}

int cleanup(void) {
broker_.reset();
slots_.clear();
slots_.shrink_to_fit();
broker_.reset();
mask_bits_ = 0U;
slot_items_ = 0U;
slot_loc_mask_ = 0U;
Expand All @@ -141,7 +141,6 @@ class HnswIndexHashMap {
auto idx = key >> mask_bits_;
if (idx >= slots_.size()) {
if (ailego_unlikely(idx >= slots_.capacity())) {
LOG_ERROR("no space to insert");
return false;
}
for (auto i = slots_.size(); i <= idx; ++i) {
Expand All @@ -152,7 +151,6 @@ class HnswIndexHashMap {
}
auto it = slots_[idx].find(key, slot_items_, slot_loc_mask_);
if (ailego_unlikely(it == nullptr)) {
LOG_ERROR("no space to insert");
return false;
}

Expand All @@ -179,14 +177,10 @@ class HnswIndexHashMap {
LOG_ERROR("Chunk resize failed, size=%zu", size);
return false;
}
//! Read the whole data to memory
IndexStorage::MemoryBlock data_block;
if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) {
LOG_ERROR("Chunk read failed, size=%zu", size);
return false;
}

slots_.emplace_back(std::move(chunk), std::move(data_block));
//! Use a local zero-initialized buffer; new chunks contain all zeros,
//! so no buffer-pool read is needed and no ref_count is pinned.
std::vector<char> local_buf(size, 0);
slots_.emplace_back(std::move(chunk), std::move(local_buf));
return true;
}

Expand All @@ -208,13 +202,14 @@ class HnswIndexHashMap {
i, chunk->data_size(), size);
return IndexError_InvalidFormat;
}
//! Read the whole data to memory
IndexStorage::MemoryBlock data_block;
if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) {
LOG_ERROR("Chunk read failed, size=%zu", size);
return false;
//! Copy chunk data into a local buffer via fetch() so that no
//! buffer-pool block is pinned for the lifetime of the Slot.
std::vector<char> local_buf(size);
if (ailego_unlikely(chunk->fetch(0U, local_buf.data(), size) != size)) {
LOG_ERROR("Chunk fetch failed, size=%zu", size);
return IndexError_InvalidFormat;
}
slots_.emplace_back(std::move(chunk), std::move(data_block));
slots_.emplace_back(std::move(chunk), std::move(local_buf));
}
return 0;
}
Expand Down
37 changes: 31 additions & 6 deletions src/core/algorithm/hnsw/hnsw_streamer_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ class HnswStreamerEntity : public HnswEntity {
if (level == 0) {
return 0;
}
// Serialize concurrent add_upper_neighbor calls: multiple build threads
// share the same entity via shared_mutex (shared-lock), so both
// upper_neighbor_chunks_ (vector mutation) and
// upper_neighbor_index_->insert (hashmap slot assignment) must be protected
// from concurrent writes.
std::lock_guard<std::mutex> lk(upper_neighbor_mutex_);
Chunk::Pointer chunk;
uint64_t chunk_offset = UINT64_MAX;
size_t neighbors_size = get_total_upper_neighbors_size(level);
Expand Down Expand Up @@ -529,6 +535,9 @@ class HnswStreamerEntity : public HnswEntity {
protected:
IndexStreamer::Stats &stats_;
std::mutex mutex_{};
//! Guards add_upper_neighbor (upper_neighbor_chunks_ + upper_neighbor_index_
//! insert) against concurrent build threads holding the shared lock.
mutable std::mutex upper_neighbor_mutex_{};
size_t max_index_size_{0UL};
uint32_t chunk_size_{kDefaultChunkSize};
uint32_t upper_neighbor_chunk_size_{kDefaultChunkSize};
Expand Down Expand Up @@ -638,9 +647,16 @@ HnswStreamerEntity::get_neighbors_typed<BufferPoolMemoryBlock>(
LOG_ERROR("Read neighbor header failed, ret=%zu", ret);
return NeighborsT<BufferPoolMemoryBlock>();
}
BufferPoolMemoryBlock block(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
BufferPoolMemoryBlock block;
if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) {
block = BufferPoolMemoryBlock::MakeOwned(mem_block.data_);
mem_block.data_ = nullptr;
mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN;
} else {
block = BufferPoolMemoryBlock(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
}
return NeighborsT<BufferPoolMemoryBlock>(std::move(block));
}

Expand Down Expand Up @@ -688,10 +704,19 @@ inline int HnswStreamerEntity::get_vector_typed<BufferPoolMemoryBlock>(
loc.second, read_size, ret);
return IndexError_ReadData;
}
vec_blocks[i] =
BufferPoolMemoryBlock(mem_block.buffer_pool_handle_,
vec_blocks[i] = [&]() {
if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) {
BufferPoolMemoryBlock b =
BufferPoolMemoryBlock::MakeOwned(mem_block.data_);
mem_block.data_ = nullptr;
mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN;
return b;
}
BufferPoolMemoryBlock b(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
mem_block.buffer_pool_handle_ = nullptr;
return b;
}();
}
return 0;
}
Expand Down
28 changes: 22 additions & 6 deletions src/core/algorithm/vamana/vamana_streamer_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,16 @@ VamanaStreamerEntity::get_neighbors_typed<BufferPoolMemoryBlock>(
LOG_ERROR("Read neighbor header failed, ret=%zu", ret);
return NeighborsT<BufferPoolMemoryBlock>();
}
BufferPoolMemoryBlock block(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
BufferPoolMemoryBlock block;
if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) {
block = BufferPoolMemoryBlock::MakeOwned(mem_block.data_);
mem_block.data_ = nullptr;
mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN;
} else {
block = BufferPoolMemoryBlock(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
}
return NeighborsT<BufferPoolMemoryBlock>(std::move(block));
}

Expand Down Expand Up @@ -392,10 +399,19 @@ inline int VamanaStreamerEntity::get_vector_typed<BufferPoolMemoryBlock>(
LOG_ERROR("Read vector failed, ret=%zu", ret);
return IndexError_ReadData;
}
vec_blocks[i] =
BufferPoolMemoryBlock(mem_block.buffer_pool_handle_,
vec_blocks[i] = [&]() {
if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) {
BufferPoolMemoryBlock b =
BufferPoolMemoryBlock::MakeOwned(mem_block.data_);
mem_block.data_ = nullptr;
mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN;
return b;
}
BufferPoolMemoryBlock b(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
mem_block.buffer_pool_handle_ = nullptr;
return b;
}();
}
return 0;
}
Expand Down
Loading
Loading