Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ macro(make_example EXAMPLE_NAME STANDARD)
if (${STANDARD} STREQUAL "20")
target_link_libraries(${EXAMPLE_NAME} PRIVATE examples_main)
endif()
if (${EXAMPLE_NAME} STREQUAL "cpp20_json")
# TODO: this
# if (${EXAMPLE_NAME} STREQUAL "cpp20_json")
target_link_libraries(${EXAMPLE_NAME} PRIVATE Boost::json Boost::container_hash)
endif()
# endif()
endmacro()

macro(make_testable_example EXAMPLE_NAME STANDARD)
Expand Down
34 changes: 30 additions & 4 deletions example/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/

#include <boost/redis/connection.hpp>
#include <boost/redis/pubsub_response.hpp>
#include <boost/redis/resp3/node.hpp>

#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
Expand All @@ -13,8 +15,12 @@
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/describe/class.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/value_to.hpp>

#include <iostream>
#include <vector>

#if defined(BOOST_ASIO_HAS_CO_AWAIT)

Expand All @@ -25,6 +31,7 @@ using boost::redis::generic_flat_response;
using boost::redis::config;
using boost::system::error_code;
using boost::redis::connection;
using boost::redis::pubsub_message;
using asio::signal_set;

/* This example will subscribe and read pushes indefinitely.
Expand All @@ -43,13 +50,31 @@ using asio::signal_set;
* > CLIENT kill TYPE pubsub
*/

// Struct that will be stored in Redis using json serialization.
struct user {
std::string name;
std::string age;
std::string country;
};

// The type must be described for serialization to work.
BOOST_DESCRIBE_STRUCT(user, (), (name, age, country))

void boost_redis_from_bulk(
user& u,
boost::redis::resp3::node_view const& node,
boost::system::error_code&)
{
u = boost::json::value_to<user>(boost::json::parse(node.value));
}

// Receives server pushes.
auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
request req;
req.push("SUBSCRIBE", "channel");

generic_flat_response resp;
std::vector<pubsub_message<user>> resp;
conn->set_receive_response(resp);

// Loop while reconnection is enabled
Expand All @@ -66,12 +91,13 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>

// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem: resp.value().get_view())
std::cout << elem.value << "\n";
for (auto const& elem : resp)
std::cout << elem.channel << ": age=" << elem.payload.age
<< ", name=" << elem.payload.name << "\n";

std::cout << std::endl;

resp.value().clear();
resp.clear();
}
}
}
Expand Down
123 changes: 123 additions & 0 deletions include/boost/redis/pubsub_response.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#ifndef BOOST_REDIS_PUBSUB_RESPONSE_HPP
#define BOOST_REDIS_PUBSUB_RESPONSE_HPP

#include <boost/redis/adapter/detail/adapters.hpp>
#include <boost/redis/adapter/detail/response_traits.hpp>
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/type.hpp>

#include <boost/core/span.hpp>
#include <boost/system/detail/error_code.hpp>
#include <boost/system/error_code.hpp>

#include <string>
#include <string_view>
#include <vector>

namespace boost::redis {

template <class T = std::string>
struct pubsub_message {
std::string channel;
T payload;
};

namespace adapter::detail {

template <class T>
struct pubsub_adapter {
std::vector<pubsub_message<T>>* vec;
int resume_point{0};
bool ignore{false};
std::string channel_name{};

void on_init()
{
resume_point = 0;
ignore = false;
}
void on_done() { }

system::error_code on_node_impl(resp3::node_view const& nd)
{
if (ignore)
return system::error_code();

switch (resume_point) {
BOOST_REDIS_CORO_INITIAL

// The root node should be a push
if (nd.data_type != resp3::type::push) {
ignore = true;
return system::error_code();
}

BOOST_REDIS_YIELD(resume_point, 1, system::error_code())

// The next element should be the string "message"
if (nd.data_type != resp3::type::blob_string || nd.value != "message") {
ignore = true;
return system::error_code();
}

BOOST_REDIS_YIELD(resume_point, 2, system::error_code())

// The next element is the channel name
if (nd.data_type != resp3::type::blob_string) {
ignore = true;
return system::error_code();
}

channel_name = nd.value;

BOOST_REDIS_YIELD(resume_point, 3, system::error_code())

// The last element is the payload
if (nd.data_type != resp3::type::blob_string) {
ignore = true;
return system::error_code();
}

// Try to deserialize the payload, if required
T elm;
system::error_code ec;
boost_redis_from_bulk(elm, nd, ec);

if (ec)
return ec;

vec->push_back(pubsub_message<T>{std::move(channel_name), std::move(elm)});

return system::error_code();
}

return system::error_code();
}

template <class String>
void on_node(resp3::basic_node<String> const& nd, system::error_code& ec)
{
ec = on_node_impl(nd);
}
};

template <class T>
struct response_traits<std::vector<pubsub_message<T>>> {
using response_type = std::vector<pubsub_message<T>>;
using adapter_type = pubsub_adapter<T>;

static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};

} // namespace adapter::detail

} // namespace boost::redis

#endif