Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 74 additions & 50 deletions src/ailego/buffer/vector_page_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <cstring>
#include <ailego/utility/memory_helper.h>
#include <zvec/ailego/buffer/vector_page_table.h>
#include <zvec/core/framework/index_logger.h>

Expand Down Expand Up @@ -41,6 +44,8 @@ static ssize_t zvec_pread(int fd, void *buf, size_t count, size_t offset) {
namespace zvec {
namespace ailego {

const size_t kVectorPageSize = MemoryHelper::PageSize();

void VectorPageTable::init(size_t entry_num) {
if (entries_) {
delete[] entries_;
Expand Down Expand Up @@ -97,12 +102,11 @@ void VectorPageTable::evict_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
char *buffer = entry.buffer;
size_t size = entry.size;
int expected = 0;
if (entry.ref_count.compare_exchange_strong(
expected, std::numeric_limits<int>::min())) {
if (buffer) {
MemoryLimitPool::get_instance().release_buffer(buffer, size);
MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize);
}
}
// Always reset in_evict_queue regardless of whether the CAS succeeded:
Expand All @@ -113,32 +117,20 @@ void VectorPageTable::evict_block(block_id_t block_id) {
entry.in_evict_queue.store(false, std::memory_order_relaxed);
}

char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer,
size_t size) {
char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
while (true) {
int current_count = entry.ref_count.load(std::memory_order_relaxed);
if (current_count >= 0) {
// Defensive branch: in practice this path should never be reached.
// set_block_acquired() is always called under block_mutexes_[block_id],
// and the caller (acquire_buffer) re-checks acquire_block() inside the
// same lock before invoking this function. Therefore, if we get here,
// ref_count must still be negative (unloaded). This branch is retained
// as a safety net in case the locking contract is violated in the future,
// e.g. if set_block_acquired is called from an unlocked context.
if (entry.ref_count.compare_exchange_weak(
current_count, current_count + 1, std::memory_order_acq_rel,
std::memory_order_acquire)) {
MemoryLimitPool::get_instance().release_buffer(buffer, size);
MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize);
return entry.buffer;
}
} else {
entry.buffer = buffer;
entry.size = size;
// Ensure in_evict_queue is cleared when the block is freshly loaded so
// that the first release_block() after loading can register it in the
// eviction queue.
entry.in_evict_queue.store(false, std::memory_order_relaxed);
entry.ref_count.store(1, std::memory_order_release);
return entry.buffer;
Expand Down Expand Up @@ -170,70 +162,71 @@ VecBufferPool::VecBufferPool(const std::string &filename) {
file_size_ = st.st_size;
}

int VecBufferPool::init(size_t segment_count) {
size_t block_num = segment_count + 10;
int VecBufferPool::init() {
size_t block_num = (file_size_ + kVectorPageSize - 1) / kVectorPageSize;
page_table_.init(block_num);
// Allocate all mutexes in a single contiguous array so that the cold-path
// lock in acquire_buffer() accesses cache-friendly memory instead of
// chasing 31K+ independent heap pointers.
block_mutexes_ = std::make_unique<std::mutex[]>(block_num);
block_mutexes_count_ = block_num;
LOG_DEBUG("entry num: %zu", page_table_.entry_num());
block_mutexes_ =
std::make_unique<std::mutex[]>(VecBufferPool::kMutexBucketCount);
LOG_DEBUG("entry num: %zu, file_size: %zu", page_table_.entry_num(),
file_size_);
return 0;
}

VecBufferPoolHandle VecBufferPool::get_handle() {
return VecBufferPoolHandle(*this);
}

char *VecBufferPool::acquire_buffer(block_id_t block_id, size_t offset,
size_t size, int retry) {
assert(block_id < block_mutexes_count_);
char *buffer = page_table_.acquire_block(block_id);
char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) {
assert(page_id < page_table_.entry_num());
char *buffer = page_table_.acquire_block(page_id);
if (buffer) {
return buffer;
}
std::lock_guard<std::mutex> lock(block_mutexes_[block_id]);
buffer = page_table_.acquire_block(block_id);
std::lock_guard<std::mutex> lock(
block_mutexes_[page_id % VecBufferPool::kMutexBucketCount]);
buffer = page_table_.acquire_block(page_id);
if (buffer) {
return buffer;
}
{
bool found =
MemoryLimitPool::get_instance().try_acquire_buffer(size, buffer);
bool found = MemoryLimitPool::get_instance().try_acquire_buffer(
kVectorPageSize, buffer);
if (!found) {
for (int i = 0; i < retry; i++) {
BlockEvictionQueue::get_instance().recycle();
found =
MemoryLimitPool::get_instance().try_acquire_buffer(size, buffer);
found = MemoryLimitPool::get_instance().try_acquire_buffer(
kVectorPageSize, buffer);
if (found) {
break;
}
}
}
if (!found) {
LOG_ERROR(
"Buffer pool failed to get free buffer: file[%s], block_id[%zu], "
"offset[%zu], size[%zu]",
file_name_.c_str(), block_id, offset, size);
LOG_ERROR("Buffer pool failed to get free buffer: file[%s], page_id[%zu]",
file_name_.c_str(), page_id);
return nullptr;
}
}

size_t page_offset = page_id * kVectorPageSize;
size_t expected_bytes = std::min(kVectorPageSize, file_size_ - page_offset);
if (expected_bytes < kVectorPageSize) {
std::memset(buffer + expected_bytes, 0, kVectorPageSize - expected_bytes);
}
#if defined(_MSC_VER)
ssize_t read_bytes = zvec_pread(fd_, buffer, size, offset);
ssize_t read_bytes = zvec_pread(fd_, buffer, expected_bytes, page_offset);
#else
ssize_t read_bytes = pread(fd_, buffer, size, offset);
ssize_t read_bytes = pread(fd_, buffer, expected_bytes, page_offset);
#endif
if (read_bytes != static_cast<ssize_t>(size)) {
if (read_bytes != static_cast<ssize_t>(expected_bytes)) {
LOG_ERROR(
"Buffer pool failed to read file at offset: file[%s], block_id[%zu], "
"offset[%zu], size[%zu]",
file_name_.c_str(), block_id, offset, size);
MemoryLimitPool::get_instance().release_buffer(buffer, size);
"Buffer pool failed to read file at offset: file[%s], page_id[%zu], "
"offset[%zu], expected[%zu], got[%zd]",
file_name_.c_str(), page_id, page_offset, expected_bytes, read_bytes);
MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize);
return nullptr;
}
return page_table_.set_block_acquired(block_id, buffer, size);
return page_table_.set_block_acquired(page_id, buffer);
}

int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) {
Expand All @@ -252,10 +245,41 @@ int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) {
return 0;
}

char *VecBufferPoolHandle::get_block(size_t offset, size_t size,
size_t block_id) {
char *buffer = pool_.acquire_buffer(block_id, offset, size, 50);
return buffer;
char *VecBufferPoolHandle::get_single_page(size_t file_offset, size_t len,
size_t &out_page_id) {
size_t first_page = file_offset / kVectorPageSize;
assert(len == 0 || (file_offset + len - 1) / kVectorPageSize == first_page);
out_page_id = first_page;
char *page = pool_.acquire_buffer(first_page, 50);
if (!page) {
return nullptr;
}
return page + (file_offset - first_page * kVectorPageSize);
}

bool VecBufferPoolHandle::read_range(size_t file_offset, size_t len,
char *out) {
if (len == 0) {
return true;
}
size_t first_page = file_offset / kVectorPageSize;
size_t last_page = (file_offset + len - 1) / kVectorPageSize;
size_t remaining = len;
size_t dst_cursor = 0;
for (size_t pg = first_page; pg <= last_page; ++pg) {
char *page = pool_.acquire_buffer(pg, 50);
if (!page) {
return false;
}
size_t page_start = pg * kVectorPageSize;
size_t intra_offset = (pg == first_page) ? (file_offset - page_start) : 0;
size_t chunk = std::min(kVectorPageSize - intra_offset, remaining);
std::memcpy(out + dst_cursor, page + intra_offset, chunk);
pool_.page_table_.release_block(pg);
dst_cursor += chunk;
remaining -= chunk;
}
return true;
}

int VecBufferPoolHandle::get_meta(size_t offset, size_t length, char *buffer) {
Expand Down
29 changes: 26 additions & 3 deletions src/core/algorithm/hnsw/hnsw_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,21 @@ struct BufferPoolMemoryBlock {
void *data)
: buffer_pool_handle_(handle), buffer_block_id_(block_id), data_(data) {}

static BufferPoolMemoryBlock MakeOwned(void *owned_data) {
BufferPoolMemoryBlock b;
b.owns_buffer_ = true;
b.data_ = owned_data;
return b;
}

BufferPoolMemoryBlock(const BufferPoolMemoryBlock &rhs)
: buffer_pool_handle_(rhs.buffer_pool_handle_),
buffer_block_id_(rhs.buffer_block_id_),
data_(rhs.data_) {
if (buffer_pool_handle_) {
if (rhs.owns_buffer_) {
owns_buffer_ = false;
buffer_pool_handle_ = nullptr;
} else if (buffer_pool_handle_) {
buffer_pool_handle_->acquire_one(buffer_block_id_);
}
}
Expand All @@ -216,7 +226,10 @@ struct BufferPoolMemoryBlock {
buffer_pool_handle_ = rhs.buffer_pool_handle_;
buffer_block_id_ = rhs.buffer_block_id_;
data_ = rhs.data_;
if (buffer_pool_handle_) {
if (rhs.owns_buffer_) {
owns_buffer_ = false;
buffer_pool_handle_ = nullptr;
} else if (buffer_pool_handle_) {
buffer_pool_handle_->acquire_one(buffer_block_id_);
}
}
Expand All @@ -226,8 +239,10 @@ struct BufferPoolMemoryBlock {
BufferPoolMemoryBlock(BufferPoolMemoryBlock &&rhs) noexcept
: buffer_pool_handle_(rhs.buffer_pool_handle_),
buffer_block_id_(rhs.buffer_block_id_),
owns_buffer_(rhs.owns_buffer_),
data_(rhs.data_) {
rhs.buffer_pool_handle_ = nullptr;
rhs.owns_buffer_ = false;
rhs.data_ = nullptr;
}

Expand All @@ -236,8 +251,10 @@ struct BufferPoolMemoryBlock {
release();
buffer_pool_handle_ = rhs.buffer_pool_handle_;
buffer_block_id_ = rhs.buffer_block_id_;
owns_buffer_ = rhs.owns_buffer_;
data_ = rhs.data_;
rhs.buffer_pool_handle_ = nullptr;
rhs.owns_buffer_ = false;
rhs.data_ = nullptr;
}
return *this;
Expand All @@ -260,7 +277,12 @@ struct BufferPoolMemoryBlock {

private:
void release() {
if (buffer_pool_handle_) {
if (owns_buffer_) {
if (data_) {
ailego_free(data_);
}
owns_buffer_ = false;
} else if (buffer_pool_handle_) {
buffer_pool_handle_->release_one(buffer_block_id_);
buffer_pool_handle_ = nullptr;
}
Expand All @@ -269,6 +291,7 @@ struct BufferPoolMemoryBlock {

ailego::VecBufferPoolHandle *buffer_pool_handle_{nullptr};
size_t buffer_block_id_{0};
bool owns_buffer_{false};
void *data_{nullptr};
};

Expand Down
28 changes: 22 additions & 6 deletions src/core/algorithm/hnsw/hnsw_streamer_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,16 @@ HnswStreamerEntity::get_neighbors_typed<BufferPoolMemoryBlock>(
LOG_ERROR("Read neighbor header failed, ret=%zu", ret);
return NeighborsT<BufferPoolMemoryBlock>();
}
BufferPoolMemoryBlock block(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
BufferPoolMemoryBlock block;
if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) {
block = BufferPoolMemoryBlock::MakeOwned(mem_block.data_);
mem_block.data_ = nullptr;
mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN;
} else {
block = BufferPoolMemoryBlock(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
}
return NeighborsT<BufferPoolMemoryBlock>(std::move(block));
}

Expand Down Expand Up @@ -688,10 +695,19 @@ inline int HnswStreamerEntity::get_vector_typed<BufferPoolMemoryBlock>(
loc.second, read_size, ret);
return IndexError_ReadData;
}
vec_blocks[i] =
BufferPoolMemoryBlock(mem_block.buffer_pool_handle_,
vec_blocks[i] = [&]() {
if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) {
BufferPoolMemoryBlock b =
BufferPoolMemoryBlock::MakeOwned(mem_block.data_);
mem_block.data_ = nullptr;
mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN;
return b;
}
BufferPoolMemoryBlock b(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
mem_block.buffer_pool_handle_ = nullptr;
return b;
}();
}
return 0;
}
Expand Down
28 changes: 22 additions & 6 deletions src/core/algorithm/vamana/vamana_streamer_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,16 @@ VamanaStreamerEntity::get_neighbors_typed<BufferPoolMemoryBlock>(
LOG_ERROR("Read neighbor header failed, ret=%zu", ret);
return NeighborsT<BufferPoolMemoryBlock>();
}
BufferPoolMemoryBlock block(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
BufferPoolMemoryBlock block;
if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) {
block = BufferPoolMemoryBlock::MakeOwned(mem_block.data_);
mem_block.data_ = nullptr;
mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN;
} else {
block = BufferPoolMemoryBlock(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
}
return NeighborsT<BufferPoolMemoryBlock>(std::move(block));
}

Expand Down Expand Up @@ -392,10 +399,19 @@ inline int VamanaStreamerEntity::get_vector_typed<BufferPoolMemoryBlock>(
LOG_ERROR("Read vector failed, ret=%zu", ret);
return IndexError_ReadData;
}
vec_blocks[i] =
BufferPoolMemoryBlock(mem_block.buffer_pool_handle_,
vec_blocks[i] = [&]() {
if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) {
BufferPoolMemoryBlock b =
BufferPoolMemoryBlock::MakeOwned(mem_block.data_);
mem_block.data_ = nullptr;
mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN;
return b;
}
BufferPoolMemoryBlock b(mem_block.buffer_pool_handle_,
mem_block.buffer_block_id_, mem_block.data_);
mem_block.buffer_pool_handle_ = nullptr;
mem_block.buffer_pool_handle_ = nullptr;
return b;
}();
}
return 0;
}
Expand Down
Loading
Loading