Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions include/stream/LocalClientStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace daq::stream {
class LocalClientStream : public LocalStream
{
public:
/// Resolver and socket require an io_context
explicit LocalClientStream(boost::asio::io_context& ioc, const std::string& endPointFile);
/// \param useAbstractNamespace true if unix domain socket is in abstract namespace
explicit LocalClientStream(boost::asio::io_context& ioc, const std::string& endPointFile, bool useAbstractNamespace);
LocalClientStream(const LocalClientStream&) = delete;
LocalClientStream& operator= (LocalClientStream&) = delete;

Expand All @@ -40,5 +40,6 @@ class LocalClientStream : public LocalStream

boost::asio::io_context& m_ioc;
std::string m_endpointFile;
bool m_useAbstractNamespace;
};
}
6 changes: 4 additions & 2 deletions include/stream/LocalServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@

namespace daq::stream {
class LocalServer : public Server {
public:
LocalServer(boost::asio::io_context& readerIoContext, NewStreamCb newStreamCb, const std::string &localEndpointFile);
public:
/// \param useAbstractNamespace true if unix domain socket is in abstract namespace
LocalServer(boost::asio::io_context& readerIoContext, NewStreamCb newStreamCb, const std::string &localEndpointFile, bool useAbstractNamespace);
LocalServer(const LocalServer&) = delete;
LocalServer& operator= (const LocalServer&) = delete;
virtual ~LocalServer();
Expand All @@ -35,6 +36,7 @@ namespace daq::stream {
void onAccept(const boost::system::error_code& ec, boost::asio::local::stream_protocol::socket&& streamSocket);

std::string m_localEndpointFile;
bool m_useAbstractNamespace;
boost::asio::local::stream_protocol::acceptor m_localAcceptor;
};
}
25 changes: 18 additions & 7 deletions src/LocalClientStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,26 @@
#include "stream/LocalClientStream.hpp"

namespace daq::stream {
LocalClientStream::LocalClientStream(boost::asio::io_context& ioc, const std::string &endPointFile)
LocalClientStream::LocalClientStream(boost::asio::io_context& ioc, const std::string &endPointFile, bool useAbstractNamespace)
: LocalStream(ioc)
, m_ioc(ioc)
, m_endpointFile(endPointFile)
, m_useAbstractNamespace(useAbstractNamespace)
{
}

// placing a '\0' at the beginning of the endpoint name creates an abstract unix domain socket.
// See man page (man 7 unix) for details
static std::string getEndPointFileName(const std::string& localEndpointFile, bool useAbstractNamespace)
{
std::string endpointFileName;
if (useAbstractNamespace) {
endpointFileName = std::string("\0", 1);
}
endpointFileName += std::string(localEndpointFile);
return endpointFileName;
}

void LocalClientStream::asyncInit(CompletionCb completionCb)
{
m_initCompletionCb = completionCb;
Expand All @@ -27,10 +40,9 @@ namespace daq::stream {
return;
}

// placing a '\0' at the beginning of the endpoint name creates an abstract unix domain socket.
// See man page (man 7 unix) for details
std::string endpointFileName = getEndPointFileName(m_endpointFile, m_useAbstractNamespace);
m_socket.async_connect(
boost::asio::local::stream_protocol::endpoint(std::string("\0", 1) + std::string(m_endpointFile)),
boost::asio::local::stream_protocol::endpoint(endpointFileName),
std::bind(m_initCompletionCb, std::placeholders::_1));
}

Expand All @@ -40,10 +52,9 @@ namespace daq::stream {
return boost::system::error_code();
}
boost::system::error_code ec;
// placing a '\0' at the beginning of the endpoint name creates an abstract unix domain socket.
// See man page (man 7 unix) for details
std::string endpointFileName = getEndPointFileName(m_endpointFile, m_useAbstractNamespace);
m_socket.connect(
boost::asio::local::stream_protocol::endpoint(std::string("\0", 1) + std::string(m_endpointFile)),
boost::asio::local::stream_protocol::endpoint(endpointFileName),
ec);
return ec;
}
Expand Down
49 changes: 37 additions & 12 deletions src/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,61 @@
#include "stream/LocalServerStream.hpp"

namespace daq::stream {
LocalServer::LocalServer(boost::asio::io_context& readerIoContext, NewStreamCb newStreamCb, const std::string& localEndpointFile)
// placing a '\0' at the beginning of the endpoint name creates an abstract unix domain socket.
// See man page (man 7 unix) for details
static std::string getEndPointFileName(const std::string& localEndpointFile, bool useAbstractNamespace)
{
std::string endpointFileName;
if (useAbstractNamespace) {
endpointFileName = std::string("\0", 1);
}
endpointFileName += std::string(localEndpointFile);
return endpointFileName;
}

LocalServer::LocalServer(boost::asio::io_context& readerIoContext, NewStreamCb newStreamCb, const std::string& localEndpointFile, bool useAbstractNamespace)
: Server(newStreamCb)
, m_localEndpointFile(localEndpointFile)
, m_localAcceptor(readerIoContext, std::string("\0", 1) + std::string(localEndpointFile))
, m_useAbstractNamespace(useAbstractNamespace)
, m_localAcceptor(readerIoContext)
{
}



LocalServer::~LocalServer()
{
stop();
}



int LocalServer::start()
{
syslog(LOG_INFO, "Starting local server");
startAccept();
return 0;
if (!m_useAbstractNamespace) {
// When not using abstract name space, try to unlink the existing unix domain socket endpoint file.
::unlink(m_localEndpointFile.c_str());
}

try {
m_localAcceptor.open();
m_localAcceptor.bind(getEndPointFileName(m_localEndpointFile, m_useAbstractNamespace));
m_localAcceptor.listen();
syslog(LOG_INFO, "Starting local server");
startAccept();
return 0;
} catch (const std::runtime_error& exc) {
syslog(LOG_ERR, "Exception on start of local server: %s", exc.what());
return -1;
}
}

void LocalServer::stop()
{
syslog(LOG_INFO, "Stopping local server");
m_localAcceptor.close();
if (!m_useAbstractNamespace) {
// unlink non-abstract unix domain socket
::unlink(m_localEndpointFile.c_str());
}
}



void LocalServer::startAccept()
{
m_localAcceptor.async_accept(std::bind(&LocalServer::onAccept, this, std::placeholders::_1, std::placeholders::_2));
Expand Down
32 changes: 21 additions & 11 deletions test/LocalStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@


namespace daq::stream {
static constexpr bool useAbstractNamespace = false;
class LocalStreamTest : public ::testing::Test {

protected:
static const std::string GoodByeMsg;
static const std::string localEndpointFile;

LocalStreamTest()
: m_server(m_ioContext, std::bind(&LocalStreamTest::NewStreamCb, this, std::placeholders::_1), localEndpointFile)
: m_server(m_ioContext, std::bind(&LocalStreamTest::NewStreamCb, this, std::placeholders::_1), localEndpointFile, useAbstractNamespace)
{
}

Expand Down Expand Up @@ -127,7 +128,7 @@ namespace daq::stream {

{
// succesfull connect
LocalClientStream clientStream(m_ioContext, localEndpointFile);
LocalClientStream clientStream(m_ioContext, localEndpointFile, useAbstractNamespace);
serverHost = clientStream.remoteHost();
endpointUrl = clientStream.endPointUrl();
ASSERT_EQ("", serverHost);
Expand All @@ -140,10 +141,14 @@ namespace daq::stream {

{
// wrong endpoint file
LocalClientStream clientStream(m_ioContext, localEndpointFile + "bla");
LocalClientStream clientStream(m_ioContext, localEndpointFile + "bla", useAbstractNamespace);

boost::system::error_code ec = clientStream.init();
ASSERT_EQ(ec, boost::system::errc::connection_refused);
if (useAbstractNamespace) {
ASSERT_EQ(ec, boost::system::errc::connection_refused);
} else {
ASSERT_EQ(ec, boost::system::errc::no_such_file_or_directory);
}
}
}

Expand All @@ -154,7 +159,7 @@ namespace daq::stream {

{
// succesfull connect
LocalClientStream clientStream(m_ioContext, localEndpointFile);
LocalClientStream clientStream(m_ioContext, localEndpointFile, useAbstractNamespace);
serverHost = clientStream.remoteHost();
endpointUrl = clientStream.endPointUrl();
ASSERT_EQ("", serverHost);
Expand All @@ -174,7 +179,7 @@ namespace daq::stream {

{
// wrong endpoint file
LocalClientStream clientStream(m_ioContext, localEndpointFile + "bla");
LocalClientStream clientStream(m_ioContext, localEndpointFile + "bla", useAbstractNamespace);

std::promise < boost::system::error_code > initPromise;
std::future < boost::system::error_code > initFuture = initPromise.get_future();
Expand All @@ -186,14 +191,19 @@ namespace daq::stream {

clientStream.asyncInit(completionCb);
initFuture.wait();
ASSERT_EQ(initFuture.get(), boost::system::errc::connection_refused);
if (useAbstractNamespace) {
ASSERT_EQ(initFuture.get(), boost::system::errc::connection_refused);
} else {
ASSERT_EQ(initFuture.get(), boost::system::errc::no_such_file_or_directory);
}

}
}


TEST_F(LocalStreamTest, test_asyncwrite_asyncread)
{
LocalClientStream clientStream(m_ioContext, localEndpointFile);
LocalClientStream clientStream(m_ioContext, localEndpointFile, useAbstractNamespace);

{
std::promise < boost::system::error_code > initPromise;
Expand Down Expand Up @@ -297,7 +307,7 @@ namespace daq::stream {

TEST_F(LocalStreamTest, test_write_read)
{
LocalClientStream clientStream(m_ioContext, localEndpointFile);
LocalClientStream clientStream(m_ioContext, localEndpointFile, useAbstractNamespace);
boost::system::error_code ec;
ec = clientStream.init();

Expand Down Expand Up @@ -331,7 +341,7 @@ namespace daq::stream {
{
boost::system::error_code ec;
std::size_t bytesWritten;
LocalClientStream clientStream(m_ioContext, localEndpointFile);
LocalClientStream clientStream(m_ioContext, localEndpointFile, useAbstractNamespace);


{
Expand Down Expand Up @@ -420,7 +430,7 @@ namespace daq::stream {

TEST_F(LocalStreamTest, test_disconnect_by_server)
{
LocalClientStream clientStream(m_ioContext, localEndpointFile);
LocalClientStream clientStream(m_ioContext, localEndpointFile, useAbstractNamespace);

{
std::promise < boost::system::error_code > initPromise;
Expand Down