88import net .servicestack .client .Utils ;
99
1010import java .io .BufferedInputStream ;
11+ import java .io .FileNotFoundException ;
1112import java .io .IOException ;
1213import java .io .InputStream ;
1314import java .net .HttpURLConnection ;
@@ -53,6 +54,7 @@ public class ServerEventsClient implements AutoCloseable {
5354 static int BufferSize = 1024 * 64 ;
5455 static int DefaultHeartbeatMs = 10 * 1000 ;
5556 static int DefaultIdleTimeoutMs = 30 * 1000 ;
57+ public static String UnknownChannel = "*" ;
5658
5759 public ServerEventsClient (String baseUri , String [] channels ) {
5860 setBaseUri (baseUri );
@@ -67,6 +69,10 @@ public ServerEventsClient(String baseUrl, String channel) {
6769 this (baseUrl , new String []{ channel });
6870 }
6971
72+ public ServerEventsClient (String baseUrl ) {
73+ this (baseUrl , new String []{});
74+ }
75+
7076 public String getBaseUri () {
7177 return baseUri ;
7278 }
@@ -85,8 +91,8 @@ public String[] getChannels() {
8591 }
8692
8793 public void setChannels (String [] channels ) {
88- if (channels == null || channels . length == 0 )
89- throw new IllegalArgumentException ( " channels is empty" ) ;
94+ if (channels == null )
95+ channels = new String [ 0 ] ;
9096
9197 this .channels = channels ;
9298 buildEventStreamUri ();
@@ -100,6 +106,10 @@ public String getEventStreamUri() {
100106 return eventStreamUri ;
101107 }
102108
109+ public JsonServiceClient getServiceClient () {
110+ return this .serviceClient ;
111+ }
112+
103113 public ServerEventsClient setOnConnect (ServerEventConnectCallback onConnect ) {
104114 this .onConnect = onConnect ;
105115 return this ;
@@ -151,6 +161,12 @@ public ServerEventConnect getConnectionInfo() {
151161 return connectionInfo ;
152162 }
153163
164+ public String getSubscriptionId (){
165+ return connectionInfo != null
166+ ? connectionInfo .getId ()
167+ : null ;
168+ }
169+
154170 public String getConnectionDisplayName () {
155171 return connectionInfo != null
156172 ? connectionInfo .getDisplayName ()
@@ -187,6 +203,7 @@ public synchronized void restart() {
187203
188204 } catch (Exception ex ){
189205 Log .e ("[SSE-CLIENT] Error whilst restarting: " + ex .getMessage (), ex );
206+ ex .printStackTrace ();
190207 }
191208 }
192209
@@ -218,7 +235,7 @@ public synchronized void stop(){
218235
219236 private synchronized void internalStop () {
220237 if (Log .isDebugEnabled ())
221- Log .d ("Stop()" );
238+ Log .d ("Stop() " + getConnectionDisplayName () );
222239
223240 if (connectionInfo != null && connectionInfo .getUnRegisterUrl () != null ) {
224241 try {
@@ -227,7 +244,9 @@ private synchronized void internalStop() {
227244 }
228245
229246 connectionInfo = null ;
230- bgThread .interrupt ();
247+ if (bgThread != null )
248+ bgThread .interrupt ();
249+
231250 bgThread = null ;
232251 }
233252
@@ -271,6 +290,9 @@ protected void onExceptionReceived(Exception ex) {
271290 Log .e ("[SSE-CLIENT] OnExceptionReceived: "
272291 + ex .getMessage () + " on #" + getConnectionDisplayName (), ex );
273292
293+ if (Log .isDebugEnabled ())
294+ Log .d (Utils .getStackTrace (ex ));
295+
274296 if (onException != null )
275297 onException .execute (ex );
276298
@@ -303,13 +325,13 @@ private void startNewHeartbeat() {
303325 if (heratbeatTimer != null )
304326 heratbeatTimer .cancel ();
305327
306- heratbeatTimer = new Timer ();
328+ heratbeatTimer = new Timer ("ServerEventsClient Heartbeat" );
307329 heratbeatTimer .schedule (new TimerTask () {
308330 @ Override
309331 public void run () {
310332 Heartbeat ();
311333 }
312- }, 0 , connectionInfo .getHeartbeatIntervalMs ());
334+ }, connectionInfo .getHeartbeatIntervalMs (), Integer . MAX_VALUE );
313335 }
314336
315337 public void Heartbeat (){
@@ -422,56 +444,52 @@ public void run() {
422444 InputStream is = new BufferedInputStream (req .getInputStream ());
423445 readStream (is );
424446 } catch (IOException e ) {
425- e .printStackTrace ();
447+ Log .e ("Error reading from event-stream" , e );
448+ Log .e (Utils .getStackTrace (e ));
449+ } finally {
450+ client .restart ();
426451 }
427452 }
428453
429- private void readStream (InputStream inputStream ) {
454+ private void readStream (InputStream inputStream ) throws IOException {
430455 byte [] buffer = new byte [BufferSize ];
431456 String overflowText = "" ;
432457
433- try {
434- int len = 0 ;
435- while (true ) {
436- len = inputStream .read (buffer );
437- if (len <= 0 )
438- break ;
439-
440- String text = overflowText + new String (buffer , 0 , len , "UTF-8" );
458+ int len = 0 ;
459+ while (true ) {
460+ len = inputStream .read (buffer );
461+ if (len <= 0 )
462+ break ;
441463
442- int pos ;
443- while ((pos = text .indexOf ('\n' )) >= 0 ) {
444- if (pos == 0 ) {
445- if (currentMsg != null )
446- processEventMessage (currentMsg );
447- currentMsg = null ;
464+ String text = overflowText + new String (buffer , 0 , len , "UTF-8" );
448465
449- text = text .substring (pos + 1 );
466+ int pos ;
467+ while ((pos = text .indexOf ('\n' )) >= 0 ) {
468+ if (pos == 0 ) {
469+ if (currentMsg != null )
470+ processEventMessage (currentMsg );
471+ currentMsg = null ;
450472
451- if (!Utils .isEmpty (text ))
452- continue ;
473+ text = text .substring (pos + 1 );
453474
454- break ;
455- }
475+ if (! Utils . isEmpty ( text ))
476+ continue ;
456477
457- String line = text .substring (0 , pos );
458- if (!Utils .isNullOrWhiteSpace (line ))
459- processLine (line );
460- if (text .length () > pos + 1 )
461- text = text .substring (pos + 1 );
478+ break ;
462479 }
463480
464- overflowText = text ;
481+ String line = text .substring (0 , pos );
482+ if (!Utils .isNullOrWhiteSpace (line ))
483+ processLine (line );
484+ if (text .length () > pos + 1 )
485+ text = text .substring (pos + 1 );
465486 }
466487
467- if (Log .isDebugEnabled ())
468- Log .d ("Connection ended on " + client .getConnectionDisplayName ());
469-
470- } catch (IOException e ) {
471- e .printStackTrace ();
472- } finally {
473- client .restart ();
488+ overflowText = text ;
474489 }
490+
491+ if (Log .isDebugEnabled ())
492+ Log .d ("Connection ended on " + client .getConnectionDisplayName ());
475493 }
476494
477495 private void processLine (String line ) {
0 commit comments