Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "4.0.9"
version = "4.0.10"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand Down
18 changes: 13 additions & 5 deletions src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ void GCManager::start() {
gc_actor->start();
LOGINFOMOD(gcmgr, "start gc actor for pdev={}", pdev_id);
}
start_gc_scan_timer();
}

void GCManager::start_gc_scan_timer() {
const auto gc_scan_interval_sec = HS_BACKEND_DYNAMIC_CONFIG(gc_scan_interval_sec);

// the initial idea here is that we want gc timer to run in a reactor that not shared with other fibers that
Expand All @@ -147,9 +150,7 @@ void GCManager::start() {
LOGINFOMOD(gcmgr, "gc scheduler timer has started, interval is set to {} seconds", gc_scan_interval_sec);
}

bool GCManager::is_started() { return m_gc_timer_hdl != iomgr::null_timer_handle; }

void GCManager::stop() {
void GCManager::stop_gc_scan_timer() {
if (m_gc_timer_hdl == iomgr::null_timer_handle) {
LOGWARNMOD(gcmgr, "gc scheduler timer is not running, no need to stop it");
return;
Expand All @@ -163,6 +164,10 @@ void GCManager::stop() {
m_gc_timer_hdl = iomgr::null_timer_handle;
});
m_gc_timer_fiber = nullptr;
}

void GCManager::stop() {
stop_gc_scan_timer();

for (const auto& [pdev_id, gc_actor] : m_pdev_gc_actors) {
gc_actor->stop();
Expand All @@ -171,8 +176,6 @@ void GCManager::stop() {
}

folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chunk_id_t chunk_id) {
if (!is_started()) return folly::makeFuture< bool >(false);

auto pdev_id = m_chunk_selector->get_extend_vchunk(chunk_id)->get_pdev_id();
auto it = m_pdev_gc_actors.find(pdev_id);
if (it == m_pdev_gc_actors.end()) {
Expand Down Expand Up @@ -337,6 +340,11 @@ void GCManager::pdev_gc_actor::add_reserved_chunk(
}

folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority, chunk_id_t move_from_chunk) {
if (m_is_stopped.load()) {
LOGWARNMOD(gcmgr, "pdev gc actor for pdev_id={} is not started yet or already stopped, cannot add gc task!",
m_pdev_id);
return folly::makeSemiFuture< bool >(false);
}
auto EXvchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk);
// it does not belong to any pg, so we don't need to gc it.
if (!EXvchunk->m_pg_id.has_value()) {
Expand Down
6 changes: 5 additions & 1 deletion src/lib/homestore_backend/gc_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,11 @@ class GCManager {

void start();
void stop();
bool is_started();

// the following two functions should not be called concurrently. if we need to call them concurrently, we need to
// add lock to protect
void start_gc_scan_timer();
void stop_gc_scan_timer();

void scan_chunks_for_gc();
void drain_pg_pending_gc_task(const pg_id_t pg_id);
Expand Down
235 changes: 234 additions & 1 deletion src/lib/homestore_backend/hs_http_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) {
{Pistache::Http::Method::Get, "/api/v1/chunk/dump",
Pistache::Rest::Routes::bind(&HttpManager::dump_chunk, this)},
{Pistache::Http::Method::Get, "/api/v1/shard/dump",
Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)}};
Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)},
{Pistache::Http::Method::Post, "/api/v1/trigger_gc",
Pistache::Rest::Routes::bind(&HttpManager::trigger_gc, this)},
{Pistache::Http::Method::Get, "/api/v1/gc_job_status",
Pistache::Rest::Routes::bind(&HttpManager::get_gc_job_status, this)}};

auto http_server = ioenvironment.get_http_server();
if (!http_server) {
Expand Down Expand Up @@ -271,6 +275,235 @@ void HttpManager::dump_shard(const Pistache::Rest::Request& request, Pistache::H
response.send(Pistache::Http::Code::Ok, j.dump());
}

void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
const auto chunk_id_param = request.query().get("chunk_id");

auto gc_mgr = ho_.gc_manager();
if (!gc_mgr) {
response.send(Pistache::Http::Code::Internal_Server_Error, "GC manager not available");
return;
}

auto chunk_selector = ho_.chunk_selector();
if (!chunk_selector) {
response.send(Pistache::Http::Code::Internal_Server_Error, "Chunk selector not available");
return;
}

std::string job_id = generate_job_id();
nlohmann::json result;

// trigger gc for all chunks
if (!chunk_id_param || chunk_id_param.value().empty()) {
LOGINFO("Received trigger_gc request for all chunks, job_id={}", job_id);

auto job_info = std::make_shared< GCJobInfo >(job_id);
{
std::lock_guard< std::mutex > lock(gc_job_mutex_);
gc_jobs_map_.set(job_id, job_info);
}

result["job_id"] = job_id;
result["message"] = "GC triggered for all eligible chunks, pls query job status using gc_job_status API";
response.send(Pistache::Http::Code::Accepted, result.dump());

LOGINFO("GC job {} stopping GC scan timer", job_id);
gc_mgr->stop_gc_scan_timer();

std::vector< pg_id_t > pg_ids;
ho_.get_pg_ids(pg_ids);
LOGINFO("GC job {} will process {} PGs", job_id, pg_ids.size());

std::vector< folly::SemiFuture< bool > > gc_task_futures;

for (const auto& pg_id : pg_ids) {
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);

LOGINFO("GC job {} draining pending GC tasks for PG {}", job_id, pg_id);
gc_mgr->drain_pg_pending_gc_task(pg_id);

auto pg_sb = hs_pg->pg_sb_.get();
std::vector< homestore::chunk_num_t > pg_chunks(pg_sb->get_chunk_ids(),
pg_sb->get_chunk_ids() + pg_sb->num_chunks);

LOGINFO("GC job {} processing PG {} with {} chunks", job_id, pg_id, pg_chunks.size());

// Resume accepting new requests for this pg
hs_pg->repl_dev_->quiesce_reqs();

for (const auto& chunk_id : pg_chunks) {
job_info->total_chunks++;
// Determine priority based on chunk state (INUSE means has open shard)
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
RELEASE_ASSERT(chunk, "Chunk {} not found during GC job {}", chunk_id, job_id);
auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;

// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
if (priority == task_priority::emergent) { hs_pg->repl_dev_->clear_chunk_req(chunk_id); }

// Submit GC task for this chunk
auto future = gc_mgr->submit_gc_task(priority, chunk_id);
gc_task_futures.push_back(std::move(future));
LOGDEBUG("GC job {} for chunk {} in PG {} with priority={}", job_id, chunk_id, pg_id,
(priority == task_priority::emergent) ? "emergent" : "normal");
}
}

folly::collectAllUnsafe(gc_task_futures)
.thenValue([job_info](auto&& results) {
for (auto const& ok : results) {
RELEASE_ASSERT(ok.hasValue(), "we never throw any exception when copying data");
if (ok.value()) {
job_info->success_count++;
} else {
job_info->failed_count++;
}
}
})
.thenValue([this, pg_ids, job_info, gc_mgr](auto&& rets) {
LOGINFO("All GC tasks have been processed");
const auto& job_id = job_info->job_id;
for (const auto& pg_id : pg_ids) {
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
// Resume accepting new requests for this pg
hs_pg->repl_dev_->resume_accepting_reqs();
LOGINFO("GC job {} resumed accepting requests for PG {}", job_id, pg_id);
}

job_info->result = (job_info->failed_count == 0);
job_info->status = job_info->result.value() ? GCJobStatus::COMPLETED : GCJobStatus::FAILED;
LOGINFO("GC job {} completed: total={}, success={}, failed={}", job_id, job_info->total_chunks,
job_info->success_count, job_info->failed_count);

// Restart the GC scan timer
LOGINFO("GC job {} restarting GC scan timer", job_id);
gc_mgr->start_gc_scan_timer();
});
} else {
// trigger gc for specific chunk
uint32_t chunk_id = std::stoul(chunk_id_param.value());
LOGINFO("Received trigger_gc request for chunk_id={}, job_id={}", chunk_id, job_id);

auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
if (!chunk) {
nlohmann::json error;
error["chunk_id"] = chunk_id;
error["error"] = "Chunk not found";
response.send(Pistache::Http::Code::Not_Found, error.dump());
return;
}

if (!chunk->m_pg_id.has_value()) {
nlohmann::json error;
error["chunk_id"] = chunk_id;
error["error"] = "Chunk belongs to no pg";
response.send(Pistache::Http::Code::Not_Found, error.dump());
return;
}

const auto pg_id = chunk->m_pg_id.value();
auto pdev_id = chunk->get_pdev_id();

result["chunk_id"] = chunk_id;
result["pdev_id"] = pdev_id;
result["pg_id"] = pg_id;
result["job_id"] = job_id;

if (chunk->m_state == ChunkState::GC) {
result["message"] = "chunk is already under GC now";
response.send(Pistache::Http::Code::Accepted, result.dump());
return;
}

// Check for active job and create new job atomically under the same lock
auto job_info = std::make_shared< GCJobInfo >(job_id, chunk_id, pdev_id);
{
std::lock_guard< std::mutex > lock(gc_job_mutex_);
gc_jobs_map_.set(job_id, job_info);
}

result["message"] = "GC triggered for chunk, pls query job status using gc_job_status API";
response.send(Pistache::Http::Code::Accepted, result.dump());

// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
auto repl_dev = hs_pg->repl_dev_;
repl_dev->quiesce_reqs();
repl_dev->clear_chunk_req(chunk_id);
const auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;

gc_mgr->submit_gc_task(priority, chunk_id)
.via(&folly::InlineExecutor::instance())
.thenValue([this, job_info, repl_dev](bool res) {
job_info->result = res;
job_info->status = res ? GCJobStatus::COMPLETED : GCJobStatus::FAILED;
// Resume accepting new requests for this pg
repl_dev->resume_accepting_reqs();
});
}
}

std::string HttpManager::generate_job_id() {
auto counter = job_counter_.fetch_add(1, std::memory_order_relaxed);
return fmt::format("trigger-gc-task-{}", counter);
}

void HttpManager::get_gc_job_status(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
auto job_id_param = request.query().get("job_id");
if (!job_id_param) {
response.send(Pistache::Http::Code::Bad_Request, "job_id is required");
return;
}

std::string job_id = job_id_param.value();
std::shared_ptr< GCJobInfo > job_info;
{
std::lock_guard< std::mutex > lock(gc_job_mutex_);
job_info = gc_jobs_map_.get(job_id);
}

if (!job_info) {
response.send(Pistache::Http::Code::Not_Found, "job_id not found, or job has been evicted");
return;
}

// Access job_info outside the lock
nlohmann::json result;
result["job_id"] = job_info->job_id;

switch (job_info->status) {
case GCJobStatus::RUNNING:
result["status"] = "running";
break;
case GCJobStatus::COMPLETED:
result["status"] = "completed";
break;
case GCJobStatus::FAILED:
result["status"] = "failed";
break;
}

if (job_info->chunk_id.has_value()) {
result["chunk_id"] = job_info->chunk_id.value();
if (job_info->pdev_id.has_value()) { result["pdev_id"] = job_info->pdev_id.value(); }
}

if (job_info->total_chunks > 0) {
nlohmann::json stats;
stats["total_chunks"] = job_info->total_chunks;
stats["success_count"] = job_info->success_count;
stats["failed_count"] = job_info->failed_count;
result["statistics"] = stats;
}

if (job_info->result.has_value()) { result["result"] = job_info->result.value(); }

response.send(Pistache::Http::Code::Ok, result.dump());
}

#ifdef _PRERELEASE
void HttpManager::crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
std::string crash_type;
Expand Down
33 changes: 33 additions & 0 deletions src/lib/homestore_backend/hs_http_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
#include <iomgr/io_environment.hpp>
#include <iomgr/http_server.hpp>

#include <folly/futures/Future.h>
#include <folly/container/EvictingCacheMap.h>
#include <chrono>
#include <atomic>

namespace homeobject {
class HSHomeObject;

Expand All @@ -34,12 +39,40 @@ class HttpManager {
void dump_chunk(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
void dump_shard(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
void get_shard(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
void trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
void get_gc_job_status(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);

#ifdef _PRERELEASE
void crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
#endif

private:
enum class GCJobStatus { RUNNING, COMPLETED, FAILED };

struct GCJobInfo {
std::string job_id;
GCJobStatus status;
std::optional< uint32_t > chunk_id;
std::optional< uint32_t > pdev_id;
std::optional< bool > result;
std::optional< uint16_t > pg_id;

// Statistics for batch GC jobs (all chunks)
uint32_t total_chunks{0};
uint32_t success_count{0};
uint32_t failed_count{0};

GCJobInfo(const std::string& id, std::optional< uint32_t > cid = std::nullopt,
std::optional< uint32_t > pid = std::nullopt) :
job_id(id), status(GCJobStatus::RUNNING), chunk_id(cid), pdev_id(pid) {}
};

std::string generate_job_id();

private:
HSHomeObject& ho_;
std::atomic< uint64_t > job_counter_{0};
std::mutex gc_job_mutex_;
folly::EvictingCacheMap< std::string, std::shared_ptr< GCJobInfo > > gc_jobs_map_{100};
};
} // namespace homeobject
Loading