11package io .sentrius .sso .websocket ;
22
33import java .net .URI ;
4+ import java .security .GeneralSecurityException ;
5+
46import io .sentrius .sso .locator .KubernetesAgentLocator ;
57import lombok .RequiredArgsConstructor ;
8+
69import org .springframework .stereotype .Component ;
710import org .springframework .web .reactive .socket .WebSocketHandler ;
811import org .springframework .web .reactive .socket .WebSocketMessage ;
912import org .springframework .web .reactive .socket .WebSocketSession ;
1013import org .springframework .web .reactive .socket .client .ReactorNettyWebSocketClient ;
14+
15+ import io .sentrius .sso .core .services .security .CryptoService ;
16+ import lombok .extern .slf4j .Slf4j ;
1117import reactor .core .publisher .Mono ;
1218
1319@ Component
20+ @ Slf4j
1421@ RequiredArgsConstructor
1522public class AgentWebSocketProxyHandler implements WebSocketHandler {
1623
1724private final KubernetesAgentLocator agentLocator ;
25+ private final CryptoService cryptoService ;
1826
1927@ Override
2028public Mono <Void > handle (WebSocketSession clientSession ) {
21- String agentId = (String ) clientSession .getAttributes ().get ("agentId" );
22- URI agentUri = agentLocator .resolveWebSocketUri (agentId );
23-
24- ReactorNettyWebSocketClient proxyClient = new ReactorNettyWebSocketClient ();
25-
26- return proxyClient .execute (agentUri , agentSession -> {
27- // Forward messages from client to agent
28- Mono <Void > clientToAgent = clientSession .receive ()
29- .map (WebSocketMessage ::getPayload )
30- .map (dataBuffer -> agentSession .binaryMessage (factory -> dataBuffer ))
31- .as (agentSession ::send );
32-
33- // Forward messages from agent to client
34- Mono <Void > agentToClient = agentSession .receive ()
35- .map (WebSocketMessage ::getPayload )
36- .map (dataBuffer -> clientSession .binaryMessage (factory -> dataBuffer ))
37- .as (clientSession ::send );
38-
39- // Run both directions in parallel, complete when both are done
40- return Mono .zip (clientToAgent , agentToClient ).then ();
41- });
29+ try {
30+ String host = (String ) clientSession .getAttributes ().get ("host" );
31+ var decryptedHost = cryptoService .decrypt (host ); // Ensure host is decrypted if necessary
32+ String sessionId = (String ) clientSession .getAttributes ().get ("sessionId" );
33+ String chatGroupId = (String ) clientSession .getAttributes ().get ("chatGroupId" );
34+ String ztat = (String ) clientSession .getAttributes ().get ("ztat" );
35+ log .info ("Handling WebSocket connection for host: {}, sessionId: {}, chatGroupId: {}, ztat: {}" ,
36+ decryptedHost , sessionId , chatGroupId , ztat );
37+ URI agentUri = agentLocator .resolveWebSocketUri (decryptedHost , sessionId , chatGroupId , ztat );
38+
39+ ReactorNettyWebSocketClient proxyClient = new ReactorNettyWebSocketClient ();
40+
41+ return proxyClient .execute (agentUri , agentSession -> {
42+ // Forward messages from client to agent
43+ Mono <Void > clientToAgent = clientSession .receive ()
44+ .map (WebSocketMessage ::getPayload )
45+ .map (dataBuffer -> agentSession .binaryMessage (factory -> dataBuffer ))
46+ .as (agentSession ::send );
47+
48+ // Forward messages from agent to client
49+ Mono <Void > agentToClient = agentSession .receive ()
50+ .map (WebSocketMessage ::getPayload )
51+ .map (dataBuffer -> clientSession .binaryMessage (factory -> dataBuffer ))
52+ .as (clientSession ::send );
53+
54+ // Run both directions in parallel, complete when both are done
55+ return Mono .zip (clientToAgent , agentToClient ).then ();
56+ });
57+ } catch (GeneralSecurityException ex ) {
58+ throw new RuntimeException ("Failed to decrypt host" , ex );
59+ }
4260}
4361}
0 commit comments