@@ -22,15 +22,17 @@ package net.ccbluex.netty.http.websocket
2222import io.netty.channel.Channel
2323import io.netty.channel.ChannelHandlerContext
2424import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
25+ import kotlinx.coroutines.CoroutineExceptionHandler
2526import kotlinx.coroutines.CoroutineScope
2627import kotlinx.coroutines.Job
2728import kotlinx.coroutines.SupervisorJob
2829import kotlinx.coroutines.asCoroutineDispatcher
2930import kotlinx.coroutines.cancel
3031import kotlinx.coroutines.joinAll
3132import kotlinx.coroutines.launch
32- import net.ccbluex.netty.http.coroutines.awaitSuspend
33+ import net.ccbluex.netty.http.HttpServer.Companion.logger
3334import net.ccbluex.netty.http.coroutines.syncSuspend
35+ import java.nio.charset.Charset
3436import java.util.concurrent.CopyOnWriteArraySet
3537import java.util.function.BiConsumer
3638
@@ -42,7 +44,9 @@ class WebSocketController(
4244) {
4345
4446 private val scope = CoroutineScope (
45- serverChannel.eventLoop().asCoroutineDispatcher() + SupervisorJob ()
47+ serverChannel.eventLoop().asCoroutineDispatcher() + SupervisorJob () + CoroutineExceptionHandler { _, err ->
48+ logger.error(" Uncaught exception in websocket controller" , err)
49+ }
4650 )
4751
4852 init {
@@ -59,26 +63,39 @@ class WebSocketController(
5963 * Broadcasts a message to all connected clients.
6064 *
6165 * @param text The message to broadcast.
62- * @param onFailure The action to take if a failure occurs.
66+ * @param charset The charset to use for encoding the message. Defaults to [Charsets.UTF_8].
67+ * @param onFailure The action to take if a failure occurs. Defaults to `null`.
6368 */
64- fun broadcast (text : String , onFailure : BiConsumer <ChannelHandlerContext , Throwable >? = null): Job =
65- scope.launch {
66- val frame = TextWebSocketFrame (text)
69+ fun broadcast (
70+ text : CharSequence ,
71+ charset : Charset = Charsets .UTF_8 ,
72+ onFailure : BiConsumer <ChannelHandlerContext , Throwable >? = null,
73+ ): Job = scope.launch {
74+ val frameByteBuf = serverChannel.alloc().buffer()
75+ frameByteBuf.writeCharSequence(text, charset)
76+
77+ val frame = TextWebSocketFrame (frameByteBuf)
6778
68- activeContexts.map { handlerContext ->
69- launch {
79+ activeContexts.map { handlerContext ->
80+ launch {
81+ if (handlerContext.channel().isActive) {
7082 try {
7183 handlerContext.channel()
7284 .writeAndFlush(frame.retainedDuplicate())
7385 .syncSuspend()
74- } catch (e: Exception ) {
86+ } catch (e: Throwable ) {
7587 onFailure?.accept(handlerContext, e)
7688 }
89+ } else {
90+ // Channel is not active, close and remove it
91+ handlerContext.close().syncSuspend()
92+ removeContext(handlerContext)
7793 }
78- }.joinAll()
94+ }
95+ }.joinAll()
7996
80- frame.release()
81- }
97+ frame.release()
98+ }
8299
83100 /* *
84101 * Closes all active contexts.
@@ -97,10 +114,10 @@ class WebSocketController(
97114 * @param context The context to add.
98115 */
99116 fun addContext (context : ChannelHandlerContext ) {
100- activeContexts.add(context)
101117 context.channel().closeFuture().addListener {
102118 removeContext(context)
103119 }
120+ activeContexts.add(context)
104121 }
105122
106123 /* *
0 commit comments