88import net .servicestack .client .Utils ;
99
1010import java .io .BufferedInputStream ;
11+ import java .io .FileNotFoundException ;
1112import java .io .IOException ;
1213import java .io .InputStream ;
1314import java .net .HttpURLConnection ;
2122import java .util .Timer ;
2223import java .util .TimerTask ;
2324import java .util .concurrent .TimeoutException ;
25+ import java .util .concurrent .atomic .AtomicBoolean ;
2426
2527/**
2628 * Created by mythz on 2/9/2017.
2729 */
2830
29- public class ServerEventsClient {
31+ public class ServerEventsClient implements AutoCloseable {
3032 private String baseUri ;
3133 private String [] channels ;
3234 private String eventStreamPath ;
@@ -47,11 +49,12 @@ public class ServerEventsClient {
4749
4850 private Date lastPulseAt ;
4951 private Thread bgThread ;
50- private boolean stopped ;
52+ private final AtomicBoolean stopped = new AtomicBoolean ( false ) ;
5153
5254 static int BufferSize = 1024 * 64 ;
5355 static int DefaultHeartbeatMs = 10 * 1000 ;
5456 static int DefaultIdleTimeoutMs = 30 * 1000 ;
57+ public static String UnknownChannel = "*" ;
5558
5659 public ServerEventsClient (String baseUri , String [] channels ) {
5760 setBaseUri (baseUri );
@@ -66,6 +69,10 @@ public ServerEventsClient(String baseUrl, String channel) {
6669 this (baseUrl , new String []{ channel });
6770 }
6871
72+ public ServerEventsClient (String baseUrl ) {
73+ this (baseUrl , new String []{});
74+ }
75+
6976 public String getBaseUri () {
7077 return baseUri ;
7178 }
@@ -84,8 +91,8 @@ public String[] getChannels() {
8491 }
8592
8693 public void setChannels (String [] channels ) {
87- if (channels == null || channels . length == 0 )
88- throw new IllegalArgumentException ( " channels is empty" ) ;
94+ if (channels == null )
95+ channels = new String [ 0 ] ;
8996
9097 this .channels = channels ;
9198 buildEventStreamUri ();
@@ -99,6 +106,10 @@ public String getEventStreamUri() {
99106 return eventStreamUri ;
100107 }
101108
109+ public JsonServiceClient getServiceClient () {
110+ return this .serviceClient ;
111+ }
112+
102113 public ServerEventsClient setOnConnect (ServerEventConnectCallback onConnect ) {
103114 this .onConnect = onConnect ;
104115 return this ;
@@ -150,6 +161,12 @@ public ServerEventConnect getConnectionInfo() {
150161 return connectionInfo ;
151162 }
152163
164+ public String getSubscriptionId (){
165+ return connectionInfo != null
166+ ? connectionInfo .getId ()
167+ : null ;
168+ }
169+
153170 public String getConnectionDisplayName () {
154171 return connectionInfo != null
155172 ? connectionInfo .getDisplayName ()
@@ -162,6 +179,7 @@ public ServerEventsClient start(){
162179 bgThread = null ;
163180 }
164181
182+ stopped .set (false );
165183 bgThread = new Thread (new EventStream (this ));
166184 bgThread .start ();
167185 lastPulseAt = new Date ();
@@ -173,7 +191,7 @@ public synchronized void restart() {
173191 try {
174192 internalStop ();
175193
176- if (stopped )
194+ if (stopped . get () )
177195 return ;
178196
179197 try {
@@ -185,6 +203,7 @@ public synchronized void restart() {
185203
186204 } catch (Exception ex ){
187205 Log .e ("[SSE-CLIENT] Error whilst restarting: " + ex .getMessage (), ex );
206+ ex .printStackTrace ();
188207 }
189208 }
190209
@@ -210,13 +229,13 @@ private void sleepBackOffMultiplier(int continuousErrorsCount) throws Interrupte
210229 }
211230
212231 public synchronized void stop (){
213- stopped = true ;
232+ stopped . set ( true ) ;
214233 internalStop ();
215234 }
216235
217236 private synchronized void internalStop () {
218237 if (Log .isDebugEnabled ())
219- Log .d ("Stop()" );
238+ Log .d ("Stop() " + getConnectionDisplayName () );
220239
221240 if (connectionInfo != null && connectionInfo .getUnRegisterUrl () != null ) {
222241 try {
@@ -225,7 +244,9 @@ private synchronized void internalStop() {
225244 }
226245
227246 connectionInfo = null ;
228- bgThread .interrupt ();
247+ if (bgThread != null )
248+ bgThread .interrupt ();
249+
229250 bgThread = null ;
230251 }
231252
@@ -269,6 +290,9 @@ protected void onExceptionReceived(Exception ex) {
269290 Log .e ("[SSE-CLIENT] OnExceptionReceived: "
270291 + ex .getMessage () + " on #" + getConnectionDisplayName (), ex );
271292
293+ if (Log .isDebugEnabled ())
294+ Log .d (Utils .getStackTrace (ex ));
295+
272296 if (onException != null )
273297 onException .execute (ex );
274298
@@ -283,10 +307,10 @@ private void onConnectReceived() {
283307 connectionInfo .getId (),
284308 Utils .join (channels , "," )));
285309
286- startNewHeartbeat ();
287-
288310 if (onConnect != null )
289311 onConnect .execute (connectionInfo );
312+
313+ startNewHeartbeat ();
290314 }
291315
292316 Timer heratbeatTimer ;
@@ -295,16 +319,19 @@ private void startNewHeartbeat() {
295319 if (connectionInfo == null || connectionInfo .getHeartbeatUrl () == null )
296320 return ;
297321
298- if (heratbeatTimer != null )
299- heratbeatTimer .cancel ();
322+ if (stopped .get ())
323+ return ;
324+
325+ if (heratbeatTimer == null )
326+ heratbeatTimer = new Timer ("ServerEventsClient Heartbeat" );
300327
301- heratbeatTimer = new Timer ();
328+ //reschedule timer on every heartbeat
302329 heratbeatTimer .schedule (new TimerTask () {
303330 @ Override
304331 public void run () {
305332 Heartbeat ();
306333 }
307- }, 0 , connectionInfo .getHeartbeatIntervalMs ());
334+ }, connectionInfo .getHeartbeatIntervalMs (), Integer . MAX_VALUE );
308335 }
309336
310337 public void Heartbeat (){
@@ -314,6 +341,9 @@ public void Heartbeat(){
314341 if (connectionInfo == null )
315342 return ;
316343
344+ if (stopped .get ())
345+ return ;
346+
317347 long elapsedMs = (new Date ().getTime () - lastPulseAt .getTime ());
318348 if (elapsedMs > connectionInfo .getIdleTimeoutMs ())
319349 {
@@ -330,7 +360,16 @@ public void Heartbeat(){
330360 if (Log .isDebugEnabled ())
331361 Log .d ("[SSE-CLIENT] Sending Heartbeat..." );
332362
333- String response = Utils .readToEnd (conn );
363+ try {
364+ String response = Utils .readToEnd (conn .getInputStream (), "UTF-8" );
365+ } catch (FileNotFoundException notFound ) {
366+
367+ if (stopped .get ())
368+ return ;
369+
370+ Log .e (conn .getResponseMessage (), notFound );
371+ throw notFound ;
372+ }
334373
335374 if (Log .isDebugEnabled ())
336375 Log .d ("[SSE-CLIENT] Heartbeat sent to: " + heartbeatUrl );
@@ -382,6 +421,11 @@ private void processOnHeartbeatMessage(ServerEventMessage e) {
382421 onHeartbeatReceived (new ServerEventHeartbeat ().populate (e , JsonUtils .toJsonObject (e .getJson ())));
383422 }
384423
424+ @ Override
425+ public void close () throws Exception {
426+ stop ();
427+ }
428+
385429 class EventStream implements Runnable {
386430
387431 ServerEventsClient client ;
@@ -400,56 +444,52 @@ public void run() {
400444 InputStream is = new BufferedInputStream (req .getInputStream ());
401445 readStream (is );
402446 } catch (IOException e ) {
403- e .printStackTrace ();
447+ Log .e ("Error reading from event-stream" , e );
448+ Log .e (Utils .getStackTrace (e ));
449+ } finally {
450+ client .restart ();
404451 }
405452 }
406453
407- private void readStream (InputStream inputStream ) {
454+ private void readStream (InputStream inputStream ) throws IOException {
408455 byte [] buffer = new byte [BufferSize ];
409456 String overflowText = "" ;
410457
411- try {
412- int len = 0 ;
413- while (true ) {
414- len = inputStream .read (buffer );
415- if (len <= 0 )
416- break ;
458+ int len = 0 ;
459+ while (true ) {
460+ len = inputStream .read (buffer );
461+ if (len <= 0 )
462+ break ;
417463
418- String text = overflowText + new String (buffer , 0 , len , "UTF-8" );
464+ String text = overflowText + new String (buffer , 0 , len , "UTF-8" );
419465
420- int pos ;
421- while ((pos = text .indexOf ('\n' )) >= 0 ) {
422- if (pos == 0 ) {
423- if (currentMsg != null )
424- processEventMessage (currentMsg );
425- currentMsg = null ;
466+ int pos ;
467+ while ((pos = text .indexOf ('\n' )) >= 0 ) {
468+ if (pos == 0 ) {
469+ if (currentMsg != null )
470+ processEventMessage (currentMsg );
471+ currentMsg = null ;
426472
427- text = text .substring (pos + 1 );
473+ text = text .substring (pos + 1 );
428474
429- if (!Utils .isEmpty (text ))
430- continue ;
475+ if (!Utils .isEmpty (text ))
476+ continue ;
431477
432- break ;
433- }
434-
435- String line = text .substring (0 , pos );
436- if (!Utils .isNullOrWhiteSpace (line ))
437- processLine (line );
438- if (text .length () > pos + 1 )
439- text = text .substring (pos + 1 );
478+ break ;
440479 }
441480
442- overflowText = text ;
481+ String line = text .substring (0 , pos );
482+ if (!Utils .isNullOrWhiteSpace (line ))
483+ processLine (line );
484+ if (text .length () > pos + 1 )
485+ text = text .substring (pos + 1 );
443486 }
444487
445- if (Log .isDebugEnabled ())
446- Log .d ("Connection ended on " + client .getConnectionDisplayName ());
447-
448- } catch (IOException e ) {
449- e .printStackTrace ();
450- } finally {
451- client .restart ();
488+ overflowText = text ;
452489 }
490+
491+ if (Log .isDebugEnabled ())
492+ Log .d ("Connection ended on " + client .getConnectionDisplayName ());
453493 }
454494
455495 private void processLine (String line ) {
0 commit comments