Skip to content

Commit e9bbabc

Browse files
committed
fix(websocket): replace coroutine lambdas with named session tasks
1 parent 98c7e63 commit e9bbabc

2 files changed

Lines changed: 108 additions & 96 deletions

File tree

include/vix/websocket/session.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ namespace vix::websocket
231231
*/
232232
void stop_heartbeat();
233233

234+
static task<void> flush_write_loop(std::shared_ptr<Session> self);
235+
static task<void> close_sequence(std::shared_ptr<Session> self);
236+
234237
private:
235238
/** @brief Accepted native TCP stream owned by this session. */
236239
std::unique_ptr<tcp_stream> stream_;

src/session.cpp

Lines changed: 105 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,96 @@ namespace vix::websocket
267267
self->do_enqueue_message(false, payload); });
268268
}
269269

270+
task<void> Session::flush_write_loop(std::shared_ptr<Session> self)
271+
{
272+
bool must_close = false;
273+
274+
try
275+
{
276+
while (true)
277+
{
278+
PendingMessage msg{};
279+
280+
{
281+
std::lock_guard<std::mutex> lock(self->writeMutex_);
282+
283+
if (self->closing_)
284+
{
285+
self->writeQueue_.clear();
286+
self->writeInProgress_ = false;
287+
co_return;
288+
}
289+
290+
if (self->writeQueue_.empty())
291+
{
292+
self->writeInProgress_ = false;
293+
co_return;
294+
}
295+
296+
msg = std::move(self->writeQueue_.front());
297+
self->writeQueue_.pop_front();
298+
}
299+
300+
std::vector<std::byte> frame;
301+
if (msg.isBinary)
302+
{
303+
std::vector<std::byte> payload;
304+
payload.reserve(msg.data.size());
305+
306+
for (char ch : msg.data)
307+
{
308+
payload.push_back(
309+
static_cast<std::byte>(static_cast<unsigned char>(ch)));
310+
}
311+
312+
frame = detail::build_frame(
313+
detail::Opcode::Binary,
314+
payload,
315+
true,
316+
false);
317+
}
318+
else
319+
{
320+
frame = detail::build_text_frame(msg.data, false);
321+
}
322+
323+
co_await self->write_raw_frame(frame);
324+
}
325+
}
326+
catch (const std::exception &e)
327+
{
328+
{
329+
std::lock_guard<std::mutex> lock(self->writeMutex_);
330+
self->writeInProgress_ = false;
331+
}
332+
333+
self->emit_error(e.what());
334+
must_close = true;
335+
}
336+
337+
if (must_close)
338+
{
339+
co_await self->close_stream_only();
340+
}
341+
342+
co_return;
343+
}
344+
345+
task<void> Session::close_sequence(std::shared_ptr<Session> self)
346+
{
347+
try
348+
{
349+
co_await self->write_raw_frame(detail::build_close_frame(false));
350+
}
351+
catch (...)
352+
{
353+
}
354+
355+
notify_close_once(*self, self->router_, self->closeNotified_);
356+
co_await self->close_stream_only();
357+
co_return;
358+
}
359+
270360
void Session::send_binary(const void *data, std::size_t size)
271361
{
272362
if (closing_)
@@ -332,24 +422,9 @@ namespace vix::websocket
332422

333423
if (ioc_)
334424
{
335-
auto self = shared_from_this();
336425
spawn_detached(
337426
*ioc_,
338-
[self]() -> task<void>
339-
{
340-
try
341-
{
342-
co_await self->write_raw_frame(
343-
detail::build_close_frame(false));
344-
}
345-
catch (...)
346-
{
347-
}
348-
349-
notify_close_once(*self, self->router_, self->closeNotified_);
350-
co_await self->close_stream_only();
351-
co_return;
352-
}());
427+
Session::close_sequence(shared_from_this()));
353428
}
354429
}
355430

@@ -552,14 +627,22 @@ namespace vix::websocket
552627
return;
553628
}
554629

555-
writeQueue_.push_back(PendingMessage{
556-
isBinary,
557-
std::move(payload),
558-
});
630+
{
631+
std::lock_guard<std::mutex> lock(writeMutex_);
632+
633+
if (closing_)
634+
{
635+
return;
636+
}
637+
638+
writeQueue_.push_back(PendingMessage{
639+
isBinary,
640+
std::move(payload),
641+
});
642+
}
559643

560644
trigger_write_flush();
561645
}
562-
563646
void Session::emit_error(const std::string &message)
564647
{
565648
std::string lower = message;
@@ -609,83 +692,9 @@ namespace vix::websocket
609692
writeInProgress_ = true;
610693
}
611694

612-
auto self = shared_from_this();
613695
spawn_detached(
614696
*ioc_,
615-
[self]() -> task<void>
616-
{
617-
bool must_close = false;
618-
619-
try
620-
{
621-
while (true)
622-
{
623-
PendingMessage msg{};
624-
625-
{
626-
std::lock_guard<std::mutex> lock(self->writeMutex_);
627-
628-
if (self->closing_)
629-
{
630-
self->writeQueue_.clear();
631-
self->writeInProgress_ = false;
632-
co_return;
633-
}
634-
635-
if (self->writeQueue_.empty())
636-
{
637-
self->writeInProgress_ = false;
638-
co_return;
639-
}
640-
641-
msg = std::move(self->writeQueue_.front());
642-
self->writeQueue_.pop_front();
643-
}
644-
645-
std::vector<std::byte> frame;
646-
if (msg.isBinary)
647-
{
648-
std::vector<std::byte> payload;
649-
payload.reserve(msg.data.size());
650-
651-
for (char ch : msg.data)
652-
{
653-
payload.push_back(
654-
static_cast<std::byte>(static_cast<unsigned char>(ch)));
655-
}
656-
657-
frame = detail::build_frame(
658-
detail::Opcode::Binary,
659-
payload,
660-
true,
661-
false);
662-
}
663-
else
664-
{
665-
frame = detail::build_text_frame(msg.data, false);
666-
}
667-
668-
co_await self->write_raw_frame(frame);
669-
}
670-
}
671-
catch (const std::exception &e)
672-
{
673-
{
674-
std::lock_guard<std::mutex> lock(self->writeMutex_);
675-
self->writeInProgress_ = false;
676-
}
677-
678-
self->emit_error(e.what());
679-
must_close = true;
680-
}
681-
682-
if (must_close)
683-
{
684-
co_await self->close_stream_only();
685-
}
686-
687-
co_return;
688-
}());
697+
Session::flush_write_loop(shared_from_this()));
689698
}
690699

691700
task<std::string> Session::read_http_head()

0 commit comments

Comments
 (0)