Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions tests/unit/test_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,3 +944,91 @@ TEST(CommonAPITest, testSetTestLogContext) {
ASSERT_TRUE(found) << "Log message not found in log file: " << log_file;
VecSimIndex_Free(index);
}

TEST(UtilsTests, testMockThreadPool) {
const size_t num_repeats = 2;
const size_t num_submissions = 200;
// 100 seconds timeout for the test should be enough for CI MemoryChecks
std::chrono::seconds test_timeout(100);

auto TestBody = [=]() {
// Protection against test deadlock is implemented by a thread which exits process if
// condition variable is not notified within a timeout.
std::mutex mtx;
std::condition_variable cv;
auto guard_thread = std::thread([&]() {
std::unique_lock<std::mutex> lock(mtx);
if (cv.wait_for(lock, test_timeout) == std::cv_status::timeout) {
std::cerr << "Test timeout! Exiting..." << std::endl;
std::exit(-1);
}
});

// Create and test a mock thread pool several times
for (size_t i = 0; i < num_repeats; i++) {
// Create a mock thread pool and verify its properties
tieredIndexMock mock_thread_pool;
ASSERT_EQ(mock_thread_pool.ctx->index_strong_ref, nullptr);
ASSERT_TRUE(mock_thread_pool.jobQ.empty());

// Create a new stub index to add to the mock thread pool
BFParams params = {.dim = 4, .metric = VecSimMetric_L2};
auto index = test_utils::CreateNewIndex(params, VecSimType_FLOAT32);
mock_thread_pool.ctx->index_strong_ref.reset(index);
auto allocator = index->getAllocator();

// Very fast and simple job routine that increments a counter
// This is just to simulate a job that does some work.
std::atomic_int32_t job_counter = 0;
auto job_mock = [&job_counter](AsyncJob * /*unused*/) { job_counter++; };

// Define a mock job just to convert lambda with capture to a function pointer
class LambdaJob : public AsyncJob {
public:
LambdaJob(std::shared_ptr<VecSimAllocator> allocator, JobType type,
std::function<void(AsyncJob *)> execute, VecSimIndex *index)
: AsyncJob(allocator, type, executeJob, index), impl_(execute) {}

static void executeJob(AsyncJob *job) {
static_cast<LambdaJob *>(job)->impl_(job);
delete job; // Clean up the job after execution
}
std::function<void(AsyncJob *)> impl_;
};

mock_thread_pool.init_threads();
// Verify the job queue is empty
ASSERT_TRUE(mock_thread_pool.jobQ.empty());

// Create a vector of jobs to submit to the mock thread pool
// The number of jobs is equal to the thread pool size, so they will all be executed in
// parallel
std::vector<AsyncJob *> jobs(mock_thread_pool.thread_pool_size);

// Submit jobs to the mock thread pool and wait several times
for (size_t j = 0; j < num_submissions; j++) {
job_counter.store(0); // Reset the counter for each iteration
// Generate jobs and submit them to the mock thread pool
std::generate(jobs.begin(), jobs.end(), [&]() {
return new (allocator) LambdaJob(allocator, HNSW_SEARCH_JOB, job_mock, index);
});
mock_thread_pool.submit_callback_internal(jobs.data(), nullptr /*unused*/,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the second argument from submit_callback_internal on this chance? Looks like it is redundant and a leftover

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR includes minimal changes to fix the deadlock issue.
But if it is needed, I can change tieredIndexMock::submit_callback_internal signature in this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a must, we can continue as is

jobs.size());
mock_thread_pool.thread_pool_wait();
// Verify the job queue is empty
ASSERT_TRUE(mock_thread_pool.jobQ.empty());
// Verify counter was incremented
ASSERT_EQ(job_counter.load(), mock_thread_pool.thread_pool_size);
}
mock_thread_pool.thread_pool_join();
}

// Notify the guard thread that the test is done
cv.notify_one();
guard_thread.join();
std::cerr << "Success" << std::endl;
std::exit(testing::Test::HasFailure() ? -1 : 0); // Exit with failure if any test failed
};

EXPECT_EXIT(TestBody(), ::testing::ExitedWithCode(0), "Success");
}
6 changes: 3 additions & 3 deletions tests/utils/mock_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ void tieredIndexMock::thread_iteration(int thread_id, const bool *run_thread_ptr
if (run_thread_ptr && !*run_thread_ptr)
return;
auto managed_job = jobQ.front();
MarkExecuteInProcess(thread_id);
executions_status.MarkInProcess(thread_id);
jobQ.pop();
lock.unlock();
// Upgrade the index weak reference to a strong ref while we run the job over the index.
if (auto temp_ref = managed_job.index_weak_ref.lock()) {
managed_job.job->Execute(managed_job.job);
}
MarkExecuteDone(thread_id);
executions_status.MarkDone(thread_id);
}

// Main loop for background worker threads that execute the jobs form the job queue.
Expand Down Expand Up @@ -117,7 +117,7 @@ void tieredIndexMock::thread_pool_join() {
void tieredIndexMock::thread_pool_wait(size_t waiting_duration) {
while (true) {
std::unique_lock<std::mutex> lock(queue_guard);
if (jobQ.empty() && executions_status.count() == 0) {
if (jobQ.empty() && executions_status.AllDone()) {
break;
}
lock.unlock();
Expand Down
18 changes: 14 additions & 4 deletions tests/utils/mock_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#include "VecSim/vec_sim.h"
#include "VecSim/algorithms/hnsw/hnsw_tiered.h"

// MAX_POOL_SIZE and pool_execution_mask_t to be adjusted to each other.
static const size_t MAX_POOL_SIZE = 16;
using pool_execution_mask_t = std::atomic_uint_fast16_t;

class tieredIndexMock {

Expand All @@ -30,11 +32,19 @@ class tieredIndexMock {
// The thread's corresponding bit should be set to before the job is popped
// from the queue and the execution starts.
// We turn the bit off after the execute callback returns to mark the job is done.
std::bitset<MAX_POOL_SIZE> executions_status;

void inline MarkExecuteInProcess(size_t thread_index) { executions_status.set(thread_index); }
class ExecutionsStatus {
pool_execution_mask_t executions_status = 0; // Using atomic for thread safety
public:
void MarkInProcess(size_t thread_index) {
executions_status.fetch_or(1 << thread_index, std::memory_order_relaxed);
}
void MarkDone(size_t thread_index) {
executions_status.fetch_and(~(1 << thread_index), std::memory_order_relaxed);
}
bool AllDone() const { return executions_status.load(std::memory_order_relaxed) == 0; }
};

void inline MarkExecuteDone(size_t thread_index) { executions_status.reset(thread_index); }
ExecutionsStatus executions_status;

typedef struct RefManagedJob {
AsyncJob *job;
Expand Down
Loading