Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 37 additions & 36 deletions runtime-light/stdlib/component/inter-component-session/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace kphp::component::inter_component_session {

// Inter component client for interactions over a stream in a client-server manner
// The client for inter-component communication over a stream in a client-server manner
class client final {
using query_id_type = uint64_t;
using query2notifier_type = kphp::stl::map<query_id_type, kphp::coro::event, kphp::memory::script_allocator>;
Expand All @@ -51,10 +51,10 @@ class client final {
kphp::stl::map<query_id_type, int32_t, kphp::memory::script_allocator> req_status{};

auto process_write(shared_transport_type t, query_id_type qid, std::span<const std::byte> payload) noexcept -> kphp::coro::task<void> {
// Ensure that readiness waiter is presented
// Ensure that the notifier is present
kphp::log::assertion(req_finish_notifier.contains(qid));

// Wait until transport will be available
// Wait until transport is available
if (is_occupied) [[unlikely]] {
transport_readiness_notifier.emplace(qid, kphp::coro::event{});
queue.push(qid);
Expand All @@ -76,7 +76,7 @@ class client final {
co_return;
}

// Ensure the transport we still held by us
// Ensure the transport is still held by this write
kphp::log::assertion(is_occupied && occupied_by == qid);

// Write payload
Expand All @@ -86,13 +86,13 @@ class client final {
co_return;
}

// Double check
// Double-check transport ownership
kphp::log::assertion(is_occupied && occupied_by == qid);

// Release transport
is_occupied = false;

// Notify next one that transport is available
// Notify the next waiting query that the transport is available
if (!queue.empty()) {
auto q{queue.front()};
queue.pop();
Expand All @@ -107,7 +107,7 @@ class client final {
auto write(shared_transport_type t, query_id_type qid, std::span<const std::byte> payload) noexcept -> kphp::coro::task<std::expected<void, int32_t>> {
req_finish_notifier.emplace(qid, kphp::coro::event{});

// The protocol design assumes that interrupting the transfer in the middle of frame headers leads to critical error.
// The protocol design assumes that interrupting the transfer in the middle of a frame leads to critical error.
// Therefore, we need to write the request in a separate coroutine.
// This technique prevents integrity violations when this coroutine is cancelled.
kphp::coro::io_scheduler::get().start(process_write(t, qid, payload));
Expand All @@ -133,7 +133,7 @@ class client final {
using shared_ctx_type = class_instance<refcountable_ctx_type>;

// The following reader state is intended to be initialized once for a new client.
// It is assumed that "copying" (ref count increasing) will be the common case, rather than moving.
// It is assumed that "copying" (ref count increase) will be the common case, rather than moving.
shared_ctx_type ctx;
kphp::coro::shared_task<void> interrupter;
kphp::coro::shared_task<void> runner;
Expand All @@ -151,7 +151,7 @@ class client final {
while (!ctx.get()->interrupted.is_set()) {
// Read response header or interrupt
auto read_header_res{co_await kphp::coro::when_any(t.get()->stream.read(resp_header_buf), interrupter)};
// Interrupt is happened
// An interrupt has occurred
if (std::holds_alternative<kphp::coro::void_value>(read_header_res)) [[unlikely]] {
kphp::log::debug("reader has been interrupted");
break;
Expand All @@ -171,15 +171,15 @@ class client final {
const auto size{resp_header.size.value};
kphp::log::debug("read {} bytes for query #{} ", size, qid);

// Ensure that buffer for response can be provided
// Ensure that buffer for response is provided
auto& buffer_providers{ctx.get()->query2resp_buffer_provider};
auto it_buffer_provider{buffer_providers.find(qid)};

// Response provider is not presented => read response into dummy buffer, just for keeping of consistency
// Response provider is not present => read response into dummy buffer, just to maintain consistency
if (it_buffer_provider == buffer_providers.end()) {
kphp::stl::vector<std::byte, kphp::memory::script_allocator> sink_buffer{size};
std::span<std::byte> sink_resp{sink_buffer.data(), sink_buffer.size()};
kphp::log::debug("response buffer provider hasn't been presented for query #{}, read response into dummy buffer", qid);
kphp::log::debug("response buffer provider is not present for query #{}, read response into dummy buffer", qid);
// Read dummy payload
if (auto res{co_await t.get()->stream.read(sink_resp)}; !res) [[unlikely]] {
kphp::log::warning("an error occurred while reading the payload from a stream: {}", res.error());
Expand All @@ -189,7 +189,7 @@ class client final {
continue;
}

// Response provider is presented => make an appropriate buffer's slice for a response
// Response provider is present => make an appropriate buffer slice for the response
auto resp{it_buffer_provider->second(size)};
ctx.get()->query2resp[qid] = resp;

Expand All @@ -202,31 +202,31 @@ class client final {
kphp::log::debug("resp buffer first byte: {} Resp buffer last byte: {} ", static_cast<uint8_t>(*std::next(resp.begin(), 0)),
static_cast<uint8_t>(*std::next(resp.begin(), resp.size() - 1)));

// Ensure that notifier is presented and notify
// Ensure that notifier is present and notify
kphp::log::assertion(ctx.get()->resp_finish_notifier.contains(qid));
ctx.get()->resp_finish_notifier[qid].set();
}

// Error occurred, notify all waiting queries
// An error occurred, notify all waiting queries
for (auto& [_, notifier] : ctx.get()->resp_finish_notifier) {
if (!notifier.is_set()) [[unlikely]] {
notifier.set();
}
}
}

// Dummy routine for waiting until an interrupting (stopping) event will happen
// Dummy routine for waiting until an interrupt (stop) event occurs
static auto wait_until_interrupt(shared_ctx_type ctx) noexcept -> kphp::coro::shared_task<void> {
co_await ctx.get()->interrupted;
co_return;
}

// Semantics of this method is considering tha state will be changed. That's why it is not marked as `const`
// Semantics of this method considers that the state will be changed. That is why it is not marked as `const`
auto register_query(query_id_type qid, details::function_wrapper<std::span<std::byte>, size_t>&& buffer_provider) noexcept -> void { // NOLINT
// We wouldn't read a response twice
// We do not read a response twice
kphp::log::assertion(ctx.get()->resp_finish_notifier.contains(qid) == false); // NOLINT

// Register provider of storage for a response
// Register storage provider for a response
ctx.get()->query2resp_buffer_provider.emplace(qid, std::move(buffer_provider));

// Register notifier
Expand All @@ -238,24 +238,25 @@ class client final {
: transport(make_instance<refcountable_transport_type>(refcountable_transport_type{.stream = std::move(s)})),
reader({make_instance<reader::refcountable_ctx_type>(), transport}) {
auto& scheduler{kphp::coro::io_scheduler::get()};
// Interrupter needs for immediately stopping the reader in the of client's life
// Interrupter is needed to immediately stop the reader at the end of the client's lifetime
scheduler.start(reader.interrupter);
// Run reader as "service"
// Run reader as a "service"
scheduler.start(reader.runner);
}

public:
~client() {
// If client has been moved, skip disabling the reader.
// If client has been moved, skip shutting down the reader.
// Otherwise, shut down the reader.
if (query_count != std::numeric_limits<query_id_type>::max()) {
reader.ctx.get()->interrupted.set();
}
}

// Designed that `transport` and `reader` will be allocated once as refcountable class instance.
// Moving looks like copying but is simply reference count increasing for 'transport' and 'reader' fields.
// Such approach motivated by the fact that the "reader-service" cannot be easily moved due to depends on transport and cannot be trivial stopped.
// 'transport' and 'reader' are designed to be allocated once as refcountable class instances.
// Moving is similar to copying, but simply increases the reference count for the 'transport' and 'reader' fields.
// This approach is motivated by the fact that the 'reader-service' cannot be easily moved because the 'reader-service' depends on the transport and cannot be
// easily stopped.
client(client&& other) noexcept
: transport(other.transport), // NOLINT // Intentionally call of copy constructor for shared transport
query_count(std::exchange(other.query_count, std::numeric_limits<query_id_type>::max())),
Expand Down Expand Up @@ -292,7 +293,7 @@ inline auto client::create(std::string_view component_name) noexcept -> std::exp
if (!expected) [[unlikely]] {
return std::unexpected{expected.error()};
}
// Create and move stream into a session
// Create and move the stream into the session
return std::expected<client, int32_t>{client{std::move(*expected)}};
}

Expand All @@ -301,20 +302,20 @@ requires std::is_convertible_v<std::invoke_result_t<B, size_t>, std::span<std::b
std::same_as<std::invoke_result_t<R, std::span<std::byte>>, client::response_readiness>
auto client::query(std::span<const std::byte> request, B response_buffer_provider,
R response_handler) noexcept -> kphp::coro::task<std::expected<void, int32_t>> {
// If previously any readers' error has been occurred
// If any reader error has previously occurred
if (reader.ctx.get() == nullptr) [[unlikely]] {
co_return std::unexpected(k2::errno_eshutdown);
}

kphp::log::assertion(query_count < std::numeric_limits<query_id_type>::max());
const auto query_id{query_count++};

// Ensure that query will be invalidated after occasionally cancellation
// Ensure that query will be invalidated after accidental cancellation
const vk::final_action finalizer{[reader_ctx = reader.ctx, &query_id] noexcept { reader_ctx.get()->query2resp_buffer_provider.erase(query_id); }};

// Register a new query and send request
// Register a new query and send the request
reader.register_query(query_id, details::function_wrapper<std::span<std::byte>, size_t>{std::move(response_buffer_provider)});
kphp::log::debug("client create query #{}", query_id);
kphp::log::debug("client creates query #{}", query_id);
if (auto res{co_await writer.write(transport, query_id, request)}; !res) [[unlikely]] {
co_return std::move(res);
}
Expand All @@ -323,25 +324,25 @@ auto client::query(std::span<const std::byte> request, B response_buffer_provide
auto response_readiness_status{response_readiness::pending};

kphp::log::debug("client now is reading responses for query #{}", query_id);
// Wait a new response until handler returns false
// Wait for a new response until the handler returns ready
while (response_readiness_status != response_readiness::ready) {
// Suspend on response notifier
// Suspend on the response notifier
co_await reader.ctx.get()->resp_finish_notifier[query_id];

// First of all, turn off notifier
// First, unset the notifier
reader.ctx.get()->resp_finish_notifier[query_id].unset();

// If reader has been interrupted do not invoke handler and finish normally
// If reader has been interrupted do not invoke the handler and finish normally
if (reader.ctx.get()->interrupted.is_set()) {
co_return std::expected<void, int32_t>{};
}

// If any readers' error has been occurred
// If any reader error has occurred
if (reader.ctx.get()->error) [[unlikely]] {
co_return std::unexpected(*(reader.ctx.get()->error));
}

// Invoke handler and pass response slice
// Invoke handler and pass the response slice
response_readiness_status = std::invoke(std::move(response_handler), reader.ctx.get()->query2resp[query_id]);
}
co_return std::expected<void, int32_t>{};
Expand Down
2 changes: 1 addition & 1 deletion runtime-light/stdlib/web-transfer-lib/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ inline auto property_value::serialize() const noexcept -> tl::webPropertyValue {
} else {
tl::Dictionary<tl::webPropertyValue> res{tl::Dictionary<tl::webPropertyValue>{tl::vector<tl::dictionaryField<tl::webPropertyValue>>{.value = {}}}};
for (const auto& i : v) {
// We cannot convert key into string right here since string which is produced will be destroyed once we occur out of this scope
// We cannot convert key into a string right here since a string which is produced will be destroyed once we occur out of this scope
kphp::log::assertion(i.get_key().is_string());
const auto& key{i.get_key().as_string()};
const auto& val{i.get_value()};
Expand Down
3 changes: 1 addition & 2 deletions runtime-light/stdlib/web-transfer-lib/web-simple-transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#pragma once

#include <__expected/unexpected.h>
#include <cstddef>
#include <expected>
#include <optional>
Expand Down Expand Up @@ -175,7 +174,7 @@ inline auto simple_transfer_close(simple_transfer st) noexcept -> kphp::coro::ta
kphp::log::error("session with Web components has been closed");
}

// Checking that Simple transfer is held by some Composite transfer
// Checking that Simple transfer is still held by some Composite transfer
auto& composite_holder{web_state.simple_transfer2holder[st.descriptor]};
if (composite_holder.has_value()) {
if (auto remove_res{
Expand Down
Loading