-
Notifications
You must be signed in to change notification settings - Fork 22
Test and fix tieredIndexMock deadlocks #704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -944,3 +944,91 @@ TEST(CommonAPITest, testSetTestLogContext) { | |
| ASSERT_TRUE(found) << "Log message not found in log file: " << log_file; | ||
| VecSimIndex_Free(index); | ||
| } | ||
|
|
||
| TEST(UtilsTests, testMockThreadPool) { | ||
| const size_t num_repeats = 2; | ||
| const size_t num_submissions = 200; | ||
| // 100 seconds timeout for the test should be enough for CI MemoryChecks | ||
| std::chrono::seconds test_timeout(100); | ||
|
|
||
| auto TestBody = [=]() { | ||
| // Protection against test deadlock is implemented by a thread which exits process if | ||
| // condition variable is not notified within a timeout. | ||
| std::mutex mtx; | ||
| std::condition_variable cv; | ||
| auto guard_thread = std::thread([&]() { | ||
| std::unique_lock<std::mutex> lock(mtx); | ||
| if (cv.wait_for(lock, test_timeout) == std::cv_status::timeout) { | ||
| std::cerr << "Test timeout! Exiting..." << std::endl; | ||
| std::exit(-1); | ||
| } | ||
| }); | ||
|
|
||
| // Create and test a mock thread pool several times | ||
| for (size_t i = 0; i < num_repeats; i++) { | ||
| // Create a mock thread pool and verify its properties | ||
| tieredIndexMock mock_thread_pool; | ||
| ASSERT_EQ(mock_thread_pool.ctx->index_strong_ref, nullptr); | ||
| ASSERT_TRUE(mock_thread_pool.jobQ.empty()); | ||
|
|
||
| // Create a new stub index to add to the mock thread pool | ||
| BFParams params = {.dim = 4, .metric = VecSimMetric_L2}; | ||
| auto index = test_utils::CreateNewIndex(params, VecSimType_FLOAT32); | ||
| mock_thread_pool.ctx->index_strong_ref.reset(index); | ||
| auto allocator = index->getAllocator(); | ||
|
|
||
| // Very fast and simple job routine that increments a counter | ||
| // This is just to simulate a job that does some work. | ||
| std::atomic_int32_t job_counter = 0; | ||
| auto job_mock = [&job_counter](AsyncJob * /*unused*/) { job_counter++; }; | ||
|
|
||
| // Define a mock job just to convert lambda with capture to a function pointer | ||
| class LambdaJob : public AsyncJob { | ||
| public: | ||
| LambdaJob(std::shared_ptr<VecSimAllocator> allocator, JobType type, | ||
| std::function<void(AsyncJob *)> execute, VecSimIndex *index) | ||
| : AsyncJob(allocator, type, executeJob, index), impl_(execute) {} | ||
|
|
||
| static void executeJob(AsyncJob *job) { | ||
| static_cast<LambdaJob *>(job)->impl_(job); | ||
| delete job; // Clean up the job after execution | ||
| } | ||
| std::function<void(AsyncJob *)> impl_; | ||
| }; | ||
|
|
||
| mock_thread_pool.init_threads(); | ||
| // Verify the job queue is empty | ||
| ASSERT_TRUE(mock_thread_pool.jobQ.empty()); | ||
|
|
||
| // Create a vector of jobs to submit to the mock thread pool | ||
| // The number of jobs is equal to the thread pool size, so they will all be executed in | ||
| // parallel | ||
| std::vector<AsyncJob *> jobs(mock_thread_pool.thread_pool_size); | ||
|
|
||
| // Submit jobs to the mock thread pool and wait several times | ||
| for (size_t j = 0; j < num_submissions; j++) { | ||
| job_counter.store(0); // Reset the counter for each iteration | ||
| // Generate jobs and submit them to the mock thread pool | ||
| std::generate(jobs.begin(), jobs.end(), [&]() { | ||
| return new (allocator) LambdaJob(allocator, HNSW_SEARCH_JOB, job_mock, index); | ||
| }); | ||
| mock_thread_pool.submit_callback_internal(jobs.data(), nullptr /*unused*/, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we remove the second argument from
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR includes minimal changes to fix the deadlock issue.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not a must, we can continue as is |
||
| jobs.size()); | ||
| mock_thread_pool.thread_pool_wait(); | ||
| // Verify the job queue is empty | ||
| ASSERT_TRUE(mock_thread_pool.jobQ.empty()); | ||
| // Verify counter was incremented | ||
| ASSERT_EQ(job_counter.load(), mock_thread_pool.thread_pool_size); | ||
| } | ||
| mock_thread_pool.thread_pool_join(); | ||
| } | ||
|
|
||
| // Notify the guard thread that the test is done | ||
| cv.notify_one(); | ||
| guard_thread.join(); | ||
| std::cerr << "Success" << std::endl; | ||
| std::exit(testing::Test::HasFailure() ? -1 : 0); // Exit with failure if any test failed | ||
| }; | ||
|
|
||
| EXPECT_EXIT(TestBody(), ::testing::ExitedWithCode(0), "Success"); | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.