Skip to content
5 changes: 3 additions & 2 deletions example/cpp20_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ using boost::redis::ignore_t;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::resp3::node_view;
using boost::redis::command_context;

// Struct that will be stored in Redis using json serialization.
struct user {
Expand All @@ -44,9 +45,9 @@ struct user {
BOOST_DESCRIBE_STRUCT(user, (), (name, age, country))

// Boost.Redis customization points (example/json.hpp)
void boost_redis_to_bulk(std::string& to, user const& u)
void boost_redis_to_bulk(command_context ctx, user const& u)
{
resp3::boost_redis_to_bulk(to, boost::json::serialize(boost::json::value_from(u)));
ctx.add_argument(boost::json::serialize(boost::json::value_from(u)));
}

void boost_redis_from_bulk(user& u, node_view const& node, boost::system::error_code&)
Expand Down
42 changes: 22 additions & 20 deletions example/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/channel_error.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/use_awaitable.hpp>
Expand Down Expand Up @@ -46,38 +47,39 @@ using asio::signal_set;
// Receives server pushes.
auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
request req;
req.push("SUBSCRIBE", "channel");

generic_flat_response resp;
conn->set_receive_response(resp);

// Loop while reconnection is enabled
while (conn->will_reconnect()) {
// Reconnect to the channels.
co_await conn->async_exec(req);
// Connect to the channels
request req;
req.push("SUBSCRIBE", "channel");
co_await conn->async_exec(req);

// Loop to read Redis push messages.
for (error_code ec;;) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));

// Loop to read Redis push messages.
for (error_code ec;;) {
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
// Check for errors and cancellations
if (ec && (ec != asio::experimental::error::channel_cancelled || !conn->will_reconnect())) {
std::cerr << "Error during receive2: " << ec << std::endl;
break;
}

// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem: resp.value().get_view())
std::cout << elem.value << "\n";
// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem : resp.value().get_view())
std::cout << elem.value << "\n";

std::cout << std::endl;
std::cout << std::endl;

resp.value().clear();
}
resp.value().clear();
}
}

auto co_main(config cfg) -> asio::awaitable<void>
{
cfg.restore_pubsub_state = true;
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
asio::co_spawn(ex, receiver(conn), asio::detached);
Expand Down
3 changes: 3 additions & 0 deletions include/boost/redis/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ struct config {
* @ref sentinel_config::addresses is not empty.
*/
sentinel_config sentinel{};

// (TODO: document properly) Do we want to re-subscribe to channels on reconnection?
bool restore_pubsub_state = false;
};

} // namespace boost::redis
Expand Down
7 changes: 5 additions & 2 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ struct connection_impl {
{
while (true) {
// Invoke the state machine
auto act = fsm_.resume(obj_->is_open(), self.get_cancellation_state().cancelled());
auto act = fsm_.resume(
obj_->is_open(),
obj_->st_,
self.get_cancellation_state().cancelled());

// Do what the FSM said
switch (act.type()) {
Expand Down Expand Up @@ -203,7 +206,7 @@ struct connection_impl {
});

return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
exec_op{this, notifier, exec_fsm(st_.mpx, std::move(info))},
exec_op{this, notifier, exec_fsm(std::move(info))},
token,
writer_cv_);
}
Expand Down
3 changes: 3 additions & 0 deletions include/boost/redis/detail/connection_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <boost/redis/config.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/pubsub_state.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/resp3/node.hpp>
Expand Down Expand Up @@ -47,7 +48,9 @@ struct connection_state {
config cfg{};
multiplexer mpx{};
std::string diagnostic{}; // Used by the setup request and Sentinel
request setup_req{};
request ping_req{};
pubsub_state pubsub_st{};

// Sentinel stuff
lazy_random_engine eng{};
Expand Down
13 changes: 8 additions & 5 deletions include/boost/redis/detail/exec_fsm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

namespace boost::redis::detail {

struct connection_state;

// What should we do next?
enum class exec_action_type
{
Expand Down Expand Up @@ -54,16 +56,17 @@ class exec_action {

class exec_fsm {
int resume_point_{0};
multiplexer* mpx_{nullptr};
std::shared_ptr<multiplexer::elem> elem_;

public:
exec_fsm(multiplexer& mpx, std::shared_ptr<multiplexer::elem> elem) noexcept
: mpx_(&mpx)
, elem_(std::move(elem))
exec_fsm(std::shared_ptr<multiplexer::elem> elem) noexcept
: elem_(std::move(elem))
{ }

exec_action resume(bool connection_is_open, asio::cancellation_type_t cancel_state);
exec_action resume(
bool connection_is_open,
connection_state& st,
asio::cancellation_type_t cancel_state);
};

} // namespace boost::redis::detail
Expand Down
39 changes: 39 additions & 0 deletions include/boost/redis/detail/pubsub_state.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_REDIS_PUBSUB_STATE_HPP
#define BOOST_REDIS_PUBSUB_STATE_HPP

#include <set>
#include <string>
#include <string_view>

namespace boost::redis {

class request;

namespace detail {

enum class pubsub_change_type;

class pubsub_state {
std::set<std::string> channels_;
std::set<std::string> pchannels_;

public:
pubsub_state() = default;
void clear();
void commit_change(pubsub_change_type type, std::string_view channel);
void commit_changes(const request& req);
void compose_subscribe_request(request& to) const;
};

} // namespace detail
} // namespace boost::redis

#endif
18 changes: 14 additions & 4 deletions include/boost/redis/impl/exec_fsm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#ifndef BOOST_REDIS_EXEC_FSM_IPP
#define BOOST_REDIS_EXEC_FSM_IPP

#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/exec_fsm.hpp>
#include <boost/redis/request.hpp>
Expand All @@ -28,7 +29,10 @@ inline bool is_total_cancel(asio::cancellation_type_t type)
return !!(type & asio::cancellation_type_t::total);
}

exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t cancel_state)
exec_action exec_fsm::resume(
bool connection_is_open,
connection_state& st,
asio::cancellation_type_t cancel_state)
{
switch (resume_point_) {
BOOST_REDIS_CORO_INITIAL
Expand All @@ -47,7 +51,7 @@ exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t
BOOST_REDIS_YIELD(resume_point_, 2, exec_action_type::setup_cancellation)

// Add the request to the multiplexer
mpx_->add(elem_);
st.mpx.add(elem_);

// Notify the writer task that there is work to do. If the task is not
// listening (e.g. it's already writing or the connection is not healthy),
Expand All @@ -61,8 +65,14 @@ exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t

// If the request has completed (with error or not), we're done
if (elem_->is_done()) {
// If the request completed successfully and we were configured to do so,
// record the changes applied to the pubsub state
if (st.cfg.restore_pubsub_state && !elem_->get_error())
st.pubsub_st.commit_changes(elem_->get_request());

// Deallocate memory before finalizing
exec_action act{elem_->get_error(), elem_->get_read_size()};
elem_.reset(); // Deallocate memory before finalizing
elem_.reset();
return act;
}

Expand All @@ -71,7 +81,7 @@ exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t
if (
(is_total_cancel(cancel_state) && elem_->is_waiting()) ||
is_partial_or_terminal_cancel(cancel_state)) {
mpx_->cancel(elem_);
st.mpx.cancel(elem_);
elem_.reset(); // Deallocate memory before finalizing
return exec_action{asio::error::operation_aborted};
}
Expand Down
48 changes: 48 additions & 0 deletions include/boost/redis/impl/pubsub_state.ipp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/redis/detail/pubsub_state.hpp>
#include <boost/redis/request.hpp>

#include <boost/assert.hpp>

#include <string>

namespace boost::redis::detail {

void pubsub_state::clear()
{
channels_.clear();
pchannels_.clear();
}

void pubsub_state::commit_change(pubsub_change_type type, std::string_view channel)
{
std::string owning_channel{channel};
switch (type) {
case pubsub_change_type::subscribe: channels_.insert(std::move(owning_channel)); break;
case pubsub_change_type::unsubscribe: channels_.erase(std::move(owning_channel)); break;
case pubsub_change_type::psubscribe: pchannels_.insert(std::move(owning_channel)); break;
case pubsub_change_type::punsubscribe: pchannels_.erase(std::move(owning_channel)); break;
default: BOOST_ASSERT(false);
}
}

void pubsub_state::commit_changes(const request& req)
{
for (const auto& ch : detail::request_access::pubsub_changes(req))
commit_change(ch.type, req.payload().substr(ch.channel_offset, ch.channel_size));
}

void pubsub_state::compose_subscribe_request(request& to) const
{
to.push_range("SUBSCRIBE", channels_);
to.push_range("PBSUBSCRIBE", pchannels_);
}

} // namespace boost::redis::detail
36 changes: 36 additions & 0 deletions include/boost/redis/impl/request.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
*/

#include <boost/redis/request.hpp>
#include <boost/redis/resp3/serialization.hpp>
#include <boost/redis/resp3/type.hpp>

#include <cstddef>
#include <string_view>

namespace boost::redis::detail {
Expand Down Expand Up @@ -35,6 +38,39 @@ request make_hello_request()
void boost::redis::request::append(const request& other)
{
payload_ += other.payload_;
pubsub_changes_.insert(
pubsub_changes_.end(),
other.pubsub_changes_.begin(),
other.pubsub_changes_.end());
commands_ += other.commands_;
expected_responses_ += other.expected_responses_;
}

boost::redis::command_context boost::redis::request::start_command(
std::string_view cmd,
std::size_t num_args)
{
// Determine the pubsub change type that this command is performing
// TODO: this has overlap with has_response
auto change_type = detail::pubsub_change_type::none;
if (cmd == "SUBSCRIBE")
change_type = detail::pubsub_change_type::subscribe;
if (cmd == "PSUBSCRIBE")
change_type = detail::pubsub_change_type::psubscribe;
if (cmd == "UNSUBSCRIBE")
change_type = detail::pubsub_change_type::unsubscribe;
if (cmd == "PUNSUBSCRIBE")
change_type = detail::pubsub_change_type::punsubscribe;

// Add the header
resp3::add_header(
payload_,
resp3::type::array,
num_args + 1u); // the command string is also an array member

// Serialize the command string
resp3::add_bulk(payload_, cmd);

// Compose the command context
return detail::command_context_access::construct(change_type, pubsub_changes_, payload_);
}
11 changes: 6 additions & 5 deletions include/boost/redis/impl/run_fsm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ run_action run_fsm::resume(
return stored_ec_;
}

// Compose the setup request. This only depends on the config, so it can be done just once
compose_setup_request(st.cfg);
// Clear any remainder from previous runs
st.pubsub_st.clear();

// Compose the PING request. Same as above
// Compose the PING request. This only depends on the config, so it can be done just once
compose_ping_request(st.cfg, st.ping_req);

if (use_sentinel(st.cfg)) {
Expand Down Expand Up @@ -159,10 +159,11 @@ run_action run_fsm::resume(
// Initialization
st.mpx.reset();
st.diagnostic.clear();
compose_setup_request(st.cfg, st.pubsub_st, st.setup_req);

// Add the setup request to the multiplexer
if (st.cfg.setup.get_commands() != 0u) {
auto elm = make_elem(st.cfg.setup, make_any_adapter_impl(setup_adapter{st}));
if (st.setup_req.get_commands() != 0u) {
auto elm = make_elem(st.setup_req, make_any_adapter_impl(setup_adapter{st}));
elm->set_done_callback([&elem_ref = *elm, &st] {
on_setup_done(elem_ref, st);
});
Expand Down
Loading
Loading