Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.24)

project(native_streaming VERSION 1.0.14 LANGUAGES CXX)
project(native_streaming VERSION 1.0.15 LANGUAGES CXX)

if (NOT CMAKE_MESSAGE_CONTEXT)
set(CMAKE_MESSAGE_CONTEXT ${PROJECT_NAME})
Expand Down
3 changes: 3 additions & 0 deletions changelog
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@

## v1.0.14
- Fix missing include

## v1.0.15
- Fix issue with retrieving connection endpoint name on server app resuming in debug mode
3 changes: 2 additions & 1 deletion include/native_streaming/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ class Client : public std::enable_shared_from_this<Client>

/// @brief creates a connection Session using provided web-socket stream object
/// @param wsStream web-socket stream object which provides as a R/W interface for connection
/// @param endpointAddress string with the address in the format IP:port of the connection endpoint associated with the session
/// @return pointer to created Session object
std::shared_ptr<Session> createSession(std::shared_ptr<WebsocketStream> wsStream);
std::shared_ptr<Session> createSession(std::shared_ptr<WebsocketStream> wsStream, const std::string& endpointAddress);

/// async operations handler
std::shared_ptr<boost::asio::io_context> ioContextPtr;
Expand Down
5 changes: 4 additions & 1 deletion include/native_streaming/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ class Server : public std::enable_shared_from_this<Server>
/// @brief creates a connection Session using provided web-socket stream object
/// @param wsStream web-socket stream object which provides as a R/W interface for connection
/// @param user context, usualy a pointer to the authenticated user object
/// @param endpointAddress string with the address in the format IP:port of the connection endpoint associated with the session
/// @return pointer to created Session object
std::shared_ptr<Session> createSession(std::shared_ptr<WebsocketStream> wsStream, const std::shared_ptr<void>& userContext);
std::shared_ptr<Session> createSession(std::shared_ptr<WebsocketStream> wsStream,
const std::shared_ptr<void>& userContext,
const std::string& endpointAddress);

/// async operations handler
std::shared_ptr<boost::asio::io_context> ioContextPtr;
Expand Down
6 changes: 5 additions & 1 deletion include/native_streaming/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class Session : public std::enable_shared_from_this<Session>
std::shared_ptr<WebsocketStream> wsStream,
std::shared_ptr<void> userContext,
boost::beast::role_type role,
LogCallback logCallback);
LogCallback logCallback,
const std::string& endpointAddress);
~Session();

Session(const Session&) = delete;
Expand Down Expand Up @@ -122,6 +123,9 @@ class Session : public std::enable_shared_from_this<Session>

/// @brief interval of sending the websocket pongs
std::chrono::milliseconds heartbeatPeriod;

/// @brief string with the address in the format IP:port of the connection endpoint associated with the session
std::string endpointAddress;
};

END_NAMESPACE_NATIVE_STREAMING
18 changes: 15 additions & 3 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,25 @@ void Client::onUpgradeConnection(const boost::system::error_code& ec, std::share
return;
}

onNewSessionCallback(createSession(wsStream));
std::string endpointAddress;
try
{
auto remoteEp = wsStream->next_layer().socket().remote_endpoint();
endpointAddress = remoteEp.address().to_string() + ":" + std::to_string(remoteEp.port());
}
catch (const std::exception& e)
{
NS_LOG_E("Websocket connection aborted - cannot get connection endpoint: {}", e.what());
return;
}

onNewSessionCallback(createSession(wsStream, endpointAddress));
}

std::shared_ptr<Session> Client::createSession(std::shared_ptr<WebsocketStream> wsStream)
std::shared_ptr<Session> Client::createSession(std::shared_ptr<WebsocketStream> wsStream, const std::string& endpointAddress)
{
websocketStream.reset();
return std::make_shared<Session>(ioContextPtr, wsStream, nullptr, boost::beast::role_type::client, logCallback);
return std::make_shared<Session>(ioContextPtr, wsStream, nullptr, boost::beast::role_type::client, logCallback, endpointAddress);
}

END_NAMESPACE_NATIVE_STREAMING
39 changes: 32 additions & 7 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,21 +201,46 @@ void Server::onUpgradeConnection(const boost::system::error_code& ec,
std::shared_ptr<WebsocketStream> wsStream,
const std::shared_ptr<void>& userContext)
{
std::string id = wsStream->next_layer().socket().remote_endpoint().address().to_string() + ":" +
std::to_string(wsStream->next_layer().socket().remote_endpoint().port());
if (ec)
{
NS_LOG_E("Client {} - websocket connection failed: {}", id, ec.message());
NS_LOG_E("Connection failed to upgrade to websocket: {}", ec.message());
return;
}

NS_LOG_I("Client {} - websocket connection accepted", id);
onNewSessionCallback(createSession(wsStream, userContext));
// Pausing the server app with a debugger causes incoming re-/connection attempts to be rejected on the client side,
// but they remain queued on the server side. When the server resumes, it processes these connections,
// inevitably failing due to the sockets being in an invalid state. Although the socket appears open,
// it throws an exception when attempting to retrieve the endpoint address.
// To handle this, first verify the socket state and then safely attempt to retrieve the endpoint name.
std::string endpointAddress;
if (!(wsStream->is_open() && wsStream->next_layer().socket().is_open()))
{
NS_LOG_W("Websocket connection aborted: the socket is already closed");
return;
}
else
{
try
{
auto remoteEp = wsStream->next_layer().socket().remote_endpoint();
endpointAddress = remoteEp.address().to_string() + ":" + std::to_string(remoteEp.port());
}
catch (const std::exception& e)
{
NS_LOG_W("Websocket connection aborted - cannot get connection endpoint: {}", e.what());
return;
}
}

NS_LOG_I("Client {} - websocket connection accepted", endpointAddress);
onNewSessionCallback(createSession(wsStream, userContext, endpointAddress));
}

std::shared_ptr<Session> Server::createSession(std::shared_ptr<WebsocketStream> wsStream, const std::shared_ptr<void>& userContext)
std::shared_ptr<Session> Server::createSession(std::shared_ptr<WebsocketStream> wsStream,
const std::shared_ptr<void>& userContext,
const std::string& endpointAddress)
{
return std::make_shared<Session>(ioContextPtr, wsStream, userContext, boost::beast::role_type::server, logCallback);
return std::make_shared<Session>(ioContextPtr, wsStream, userContext, boost::beast::role_type::server, logCallback, endpointAddress);
}

END_NAMESPACE_NATIVE_STREAMING
8 changes: 4 additions & 4 deletions src/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ Session::Session(std::shared_ptr<boost::asio::io_context> ioContextPtr,
std::shared_ptr<WebsocketStream> wsStream,
std::shared_ptr<void> userContext,
boost::beast::role_type role,
LogCallback logCallback)
LogCallback logCallback,
const std::string& endpointAddress)
: role(role)
, logCallback(logCallback)
, ioContextPtr(ioContextPtr)
Expand All @@ -19,6 +20,7 @@ Session::Session(std::shared_ptr<boost::asio::io_context> ioContextPtr,
, userContext(userContext)
, heartbeatTimer(std::make_shared<boost::asio::steady_timer>(*ioContextPtr.get()))
, heartbeatPeriod(defaultHeartbeatPeriod)
, endpointAddress(endpointAddress)
{
setOptions();
}
Expand Down Expand Up @@ -173,9 +175,7 @@ std::shared_ptr<void> Session::getUserContext()

std::string Session::getEndpointAddress()
{
std::string address = wsStream->next_layer().socket().remote_endpoint().address().to_string();
address += std::string(":") + std::to_string(wsStream->next_layer().socket().remote_endpoint().port());
return address;
return endpointAddress;
}

void Session::setWriteTimedOutHandler(OnSessionErrorCallback writeTaskTimeoutHandler)
Expand Down
Loading