Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion trpc/metrics/prometheus/prometheus_metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void PrometheusMetrics::Start() noexcept {

void PrometheusMetrics::Stop() noexcept {
if (push_gateway_task_id_ != 0) {
PeripheryTaskScheduler::GetInstance()->StopInnerTask(push_gateway_task_id_);
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(push_gateway_task_id_);
PeripheryTaskScheduler::GetInstance()->JoinInnerTask(push_gateway_task_id_);
push_gateway_task_id_ = 0;
}
Expand Down
2 changes: 1 addition & 1 deletion trpc/naming/domain/selector_domain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ void SelectorDomain::Start() noexcept {

void SelectorDomain::Stop() noexcept {
if (task_id_) {
PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_);
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_);
task_id_ = 0;
}
}
Expand Down
2 changes: 1 addition & 1 deletion trpc/rpcz/collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void RpczCollectorTask::Destroy() {

void RpczCollectorTask::Stop() {
if (task_id_) {
trpc::PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_);
trpc::PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_);
task_id_ = 0;
}
}
Expand Down
2 changes: 1 addition & 1 deletion trpc/runtime/common/heartbeat/heartbeat_report.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void HeartBeatReport::Stop() {

enable_ = false;

PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_);
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_);
PeripheryTaskScheduler::GetInstance()->JoinInnerTask(task_id_);

task_id_ = 0;
Expand Down
38 changes: 35 additions & 3 deletions trpc/runtime/common/periphery_task_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ void PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::Stop() {
}

void PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::Join() {
for (unsigned i = 0; i < thread_num_; ++i) {
if (workers_[i].joinable()) {
workers_[i].join();
for (auto & worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
workers_.clear();
Expand All @@ -61,6 +61,11 @@ void PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::Join() {
}

while (!tasks_.empty()) {
// Compatibility handling for scenarios where the user does not call DetachTask/RemoveTask:
// actively call Deref when program exit.
if (tasks_.top()->UnsafeRefCount() > 1) {
tasks_.top()->Deref();
}
tasks_.pop();
}
}
Expand Down Expand Up @@ -135,6 +140,19 @@ bool PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::StopTaskImpl(uint64_t t
return StopAndDestroyTask(task_id, false);
}

bool PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::DetachTaskImpl(uint64_t task_id) {
if (TRPC_UNLIKELY(exited_.load(std::memory_order_relaxed))) {
return false;
}

TaskPtr task_ptr = GetTaskPtr(task_id);
if (task_ptr.Get() == nullptr) {
return false;
}

return true;
}

bool PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::JoinTaskImpl(uint64_t task_id) {
if (TRPC_UNLIKELY(exited_.load(std::memory_order_relaxed))) {
TRPC_FMT_ERROR("PeripheryTaskScheduler is already exited.");
Expand Down Expand Up @@ -323,6 +341,13 @@ bool PeripheryTaskScheduler::RemoveTask(uint64_t task_id) {
return scheduler_->RemoveTaskImpl(task_id);
}

bool PeripheryTaskScheduler::DetachTask(std::uint64_t task_id) {
if (!scheduler_) {
return false;
}
return scheduler_->DetachTaskImpl(task_id);
}

bool PeripheryTaskScheduler::StopTask(uint64_t task_id) {
if (!scheduler_) {
return false;
Expand Down Expand Up @@ -368,6 +393,13 @@ bool PeripheryTaskScheduler::RemoveInnerTask(uint64_t task_id) {
return inner_scheduler_->RemoveTaskImpl(task_id);
}

bool PeripheryTaskScheduler::DetachInnerTask(std::uint64_t task_id) {
if (!inner_scheduler_) {
return false;
}
return inner_scheduler_->DetachTaskImpl(task_id);
}

bool PeripheryTaskScheduler::StopInnerTask(uint64_t task_id) {
if (!inner_scheduler_) {
return false;
Expand Down
17 changes: 14 additions & 3 deletions trpc/runtime/common/periphery_task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,16 @@ class PeripheryTaskScheduler {
/// @brief Remove task, used in scenarios where it is not necessary to wait for tasks to complete before exiting.
/// @param task_id task id
/// @return on success, return true. on error, return false
/// @note This interface can only be called once with the same ID.
/// @note This interface can only be called once with the same ID. After call it, the task_id can't be used again
bool RemoveTask(std::uint64_t task_id);

/// @brief Detach task, after calling this interface, the lifecycle of this task will be managed by the scheduler,
// users no longer need to concern themselves with the release of the task.
/// @param task_id task id
/// @return on success return true, otherwise return false
/// @note the task_id can't be used again after calling this interface
bool DetachTask(std::uint64_t task_id);

/// @brief Same as 'SubmitTask', but is used only internally by the framework.
std::uint64_t SubmitInnerTask(Function<void()>&& task, const std::string& name = "");

Expand All @@ -119,10 +126,13 @@ class PeripheryTaskScheduler {
/// @brief Same as 'RemoveTask', but is used only internally by the framework.
bool RemoveInnerTask(std::uint64_t task_id);

/// @brief Same as 'Stoptask', but is used only internally by the framework.
/// @brief Same as 'DetachTask', but is used only internally by the framework.
bool DetachInnerTask(std::uint64_t task_id);

/// @brief Same as 'StopTask', but is used only internally by the framework.
bool StopInnerTask(std::uint64_t task_id);

/// @brief Same as 'Jointask', but is used only internally by the framework.
/// @brief Same as 'JoinTask', but is used only internally by the framework.
bool JoinInnerTask(std::uint64_t task_id);

/// @brief Used to destroy resources accessed by scheduled tasks after all scheduled task execution threads have
Expand Down Expand Up @@ -158,6 +168,7 @@ class PeripheryTaskScheduler {
bool StopTaskImpl(std::uint64_t task_id);
bool JoinTaskImpl(std::uint64_t task_id);
bool RemoveTaskImpl(std::uint64_t task_id);
bool DetachTaskImpl(uint64_t task_id);

void Init(size_t thread_num);
void Start();
Expand Down
9 changes: 9 additions & 0 deletions trpc/runtime/common/periphery_task_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ void TestRemoveTask() {
ASSERT_TRUE(PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id));
}

void TestDetachTask() {
std::uint64_t task_id = PeripheryTaskScheduler::GetInstance()->SubmitInnerTask([]() {});
ASSERT_TRUE(task_id > 0);
ASSERT_FALSE(PeripheryTaskScheduler::GetInstance()->DetachInnerTask(task_id + 1));
ASSERT_TRUE(PeripheryTaskScheduler::GetInstance()->DetachInnerTask(task_id));
}

void TestSubmitPeriodicTask() {
int count = 0;
Latch latch(1);
Expand Down Expand Up @@ -268,6 +275,8 @@ TEST_F(PeripheryTaskSchedulerTest, SubmitTaskTest) { TestSubmitTask(); }

TEST_F(PeripheryTaskSchedulerTest, RemoveTaskTest) { TestRemoveTask(); }

TEST_F(PeripheryTaskSchedulerTest, DetachTaskTest) { TestDetachTask(); }

TEST_F(PeripheryTaskSchedulerTest, SubmitPeriodicTaskTest) { TestSubmitPeriodicTask(); }

TEST_F(PeripheryTaskSchedulerTest, RemoveTaskAdvanceTest) { TestRemoveTaskAdvance(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void StopReportRuntimeInfo() {
return;
}

PeripheryTaskScheduler::GetInstance()->StopInnerTask(report_task_id);
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(report_task_id);
PeripheryTaskScheduler::GetInstance()->JoinInnerTask(report_task_id);

report_task_id = 0;
Expand Down
2 changes: 1 addition & 1 deletion trpc/runtime/common/stats/frame_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void FrameStats::Start() {

void FrameStats::Stop() {
if (task_id_) {
PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_);
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_);
PeripheryTaskScheduler::GetInstance()->JoinInnerTask(task_id_);
task_id_ = 0;
}
Expand Down
6 changes: 3 additions & 3 deletions trpc/runtime/iomodel/reactor/default/tcp_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ int TcpConnection::HandleReadEvent() {

int ret = ReadIoData(read_buffer_.buffer);
if (ret > 0) {
SetConnActiveTime(trpc::time::GetMilliSeconds());
GetConnectionHandler()->UpdateConnection();

std::deque<std::any> data;
RefPtr ref(ref_ptr, this);
int checker_ret = GetConnectionHandler()->CheckMessage(ref, read_buffer_.buffer, data);
Expand All @@ -197,9 +200,6 @@ int TcpConnection::HandleReadEvent() {
if (TRPC_UNLIKELY(GetConnectionState() == ConnectionState::kUnconnected)) {
return -1;
}

SetConnActiveTime(trpc::time::GetMilliSeconds());
GetConnectionHandler()->UpdateConnection();
}
} else if (checker_ret == kPacketError) {
TRPC_LOG_ERROR("TcpConnection::HandleReadEvent fd:" << socket_.GetFd() << ", ip:" << GetPeerIp()
Expand Down
5 changes: 2 additions & 3 deletions trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ FiberConnection::EventAction FiberTcpConnection::OnReadable() {
do {
status = ReadData();
if (TRPC_LIKELY(status != ReadStatus::kError)) {
SetConnActiveTime(trpc::time::GetMilliSeconds());
GetConnectionHandler()->UpdateConnection();
auto rc = ConsumeReadData();
if (TRPC_UNLIKELY(rc != EventAction::kReady)) {
TRPC_LOG_WARN("FiberTcpConnection::OnReadable ConsumeReadData failed, ip:"
Expand Down Expand Up @@ -256,9 +258,6 @@ FiberConnection::EventAction FiberTcpConnection::ConsumeReadData() {
return EventAction::kLeaving;
}

SetConnActiveTime(trpc::time::GetMilliSeconds());
GetConnectionHandler()->UpdateConnection();

return EventAction::kReady;
} else if (checker_ret == kPacketError) {
return EventAction::kLeaving;
Expand Down
8 changes: 8 additions & 0 deletions trpc/stream/http/http_client_stream_connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ int FiberHttpClientStreamConnectionHandler::CheckMessage(const ConnectionPtr& co
HttpClientStreamHandlerPtr handler_ptr = static_pointer_cast<HttpClientStreamHandler>(stream_handler_);
if (p) {
(*p)->GetStream() = handler_ptr->GetHttpStream();
// When MessageCheck is repeatedly entered on a stream, the underlying objects of (*p)->GetStream() and
// handler_ptr->GetHttpStream()actually point to the same stream object. In this scenario, assigning the
// HttpClientStreamPtrwould cause its internal reference count to first decrease by 1 and then increase by 1. If the
// connection happens to close at this moment, it would lead to incorrect reference counting, causing the stream
// object to be destructed prematurely. Subsequent operations using this stream would then result in a core dump.
if (((*p)->GetStream()).get() != (handler_ptr->GetHttpStream()).get()) {
(*p)->GetStream() = handler_ptr->GetHttpStream();
}
return FiberClientConnectionHandler::CheckMessage(conn, in, out);
}

Expand Down
17 changes: 16 additions & 1 deletion trpc/util/thread/cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,22 @@ void InitializeProcessorInfoOnce() {
// I don't think it's possible to print log reliably here, unfortunately.
static std::once_flag once;
std::call_once(once, [&] {
node_of_cpus.resize(GetNumberOfProcessorsConfigured(), -1);
// In a container environment with strict CPU affinity binding, the number of CPU cores obtained by
// GetNumberOfProcessorsConfigured may reflect the actual number of cores allocated to the container (rather than
// the total cores of the deployment host). For example, if the host machine has 48 cores [0,47) and the container
// is bound to 6 cores [2-7], the corresponding CPU affinity would be {2, 3, 4, 5, 6, 7}. However, in this case,
// GetNumberOfProcessorsConfigured returns 6 (the container's available cores) instead of 48 (the host's total
// cores). This causes CPUs 6 and 7 to be incorrectly judged as inaccessible, preventing fiber worker threads from
// running on these CPUs. Therefore, we adjust the actual number of machine cores based on the maximum CPU index in
// the CPU affinity, to prevent certain CPUs from being misidentified as inaccessible in container environments.
std::size_t number_of_processors = GetNumberOfProcessorsConfigured();
auto affinity = GetCurrentThreadAffinity();
if (!affinity.empty()) {
if (affinity.back() >= number_of_processors) {
number_of_processors = affinity.back() + 1;
}
}
node_of_cpus.resize(number_of_processors, -1);
for (std::size_t i = 0; i != node_of_cpus.size(); ++i) {
auto n = GetNodeOfProcessorImpl(static_cast<unsigned>(i));
if (n == -1) {
Expand Down
Loading