Skip to content

Commit 01c086b

Browse files
committed
Implement addListener/removeListener/raiseEvent to handle custom SSE triggers
1 parent 47d4ee4 commit 01c086b

File tree

6 files changed

+193
-1
lines changed

6 files changed

+193
-1
lines changed

src/AndroidClient/android/src/androidTest/java/net/servicestack/android/ServerEventClientTests.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import net.servicestack.client.sse.ServerEventUser;
2626
import net.servicestack.client.sse.ServerEventsClient;
2727
import net.servicestack.client.sse.SingletonInstanceResolver;
28+
import net.servicestack.func.Action;
2829
import net.servicestack.func.Func;
30+
import net.servicestack.func.Predicate;
2931

3032
import java.util.ArrayList;
3133
import java.util.HashMap;
@@ -1115,4 +1117,58 @@ public void execute(ServerEventMessage e) {
11151117
assertTrue(client2.getEventStreamUri().endsWith("?channels=B"));
11161118
}
11171119
}
1118-
}
1120+
1121+
@Test
1122+
public void test_Does_fire_multiple_listeners_for_custom_trigger() throws Exception {
1123+
final List<ServerEventMessage> msgs1 = new ArrayList<>();
1124+
final List<ServerEventMessage> msgs2 = new ArrayList<>();
1125+
1126+
Action<ServerEventMessage> handler = new Action<ServerEventMessage>() {
1127+
@Override
1128+
public void apply(ServerEventMessage e) {
1129+
msgs1.add(e);
1130+
}
1131+
};
1132+
1133+
try (ServerEventsClient client1 = createServerEventsClient("http://chat.servicestack.net")
1134+
.addListener("customEvent", handler)
1135+
.addListener("customEvent", new Action<ServerEventMessage>() {
1136+
@Override
1137+
public void apply(ServerEventMessage e) {
1138+
msgs2.add(e);
1139+
}
1140+
})
1141+
.start()
1142+
.waitTillConnected();
1143+
ServerEventsClient client2 = createServerEventsClient("http://chat.servicestack.net")
1144+
.start()
1145+
.waitTillConnected()) {
1146+
1147+
postRaw(client2, "trigger.customEvent", "arg");
1148+
1149+
while (msgs1.size() < 1 || msgs2.size() < 1) {
1150+
Thread.sleep(100);
1151+
}
1152+
1153+
assertEquals(1, msgs1.size());
1154+
assertEquals(1, msgs2.size());
1155+
1156+
client1.removeListener("customEvent", handler);
1157+
1158+
postRaw(client2, "trigger.customEvent", "arg");
1159+
1160+
while (msgs1.size() < 1 || msgs2.size() < 2) {
1161+
Thread.sleep(100);
1162+
}
1163+
1164+
assertEquals(1, msgs1.size());
1165+
assertEquals(2, msgs2.size());
1166+
1167+
assertTrue(Func.all(Func.concat(msgs1, msgs2), new Predicate<ServerEventMessage>() {
1168+
@Override
1169+
public boolean apply(ServerEventMessage msg) {
1170+
return "arg".equals(JsonUtils.fromJson(msg.getJson(), String.class));
1171+
}
1172+
}));
1173+
}
1174+
}}

src/AndroidClient/android/src/main/java/net/servicestack/client/sse/EventStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ protected void processEventMessage(ServerEventMessage e) {
182182
cb.execute(this.client, e);
183183
}
184184
}
185+
} else if ("trigger".equals(e.getOp())){
186+
client.onTriggerReceived(e);
185187
}
186188

187189
ServerEventCallback receiver = client.getNamedReceivers().get(e.getOp());

src/AndroidClient/android/src/main/java/net/servicestack/client/sse/ServerEventsClient.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import net.servicestack.client.JsonUtils;
99
import net.servicestack.client.Log;
1010
import net.servicestack.client.Utils;
11+
import net.servicestack.func.Action;
1112
import net.servicestack.func.Func;
1213
import net.servicestack.func.Function;
1314

@@ -47,6 +48,7 @@ public class ServerEventsClient implements Closeable {
4748

4849
protected Map<String,ServerEventCallback> handlers;
4950
protected Map<String,ServerEventCallback> namedReceivers;
51+
protected Map<String, List<Action<ServerEventMessage>>> listeners;
5052

5153
protected ServerEventConnectCallback onConnect;
5254
protected ServerEventMessageCallback onMessage;
@@ -76,6 +78,7 @@ public ServerEventsClient(String baseUri, String... channels) {
7678

7779
this.handlers = new HashMap<>();
7880
this.namedReceivers = new HashMap<>();
81+
this.listeners = new HashMap<>();
7982
}
8083

8184
public ServerEventsClient(String baseUrl, String channel) {
@@ -386,6 +389,17 @@ private void onCommandReceived(ServerEventMessage e) {
386389
onCommand.execute(e);
387390
}
388391

392+
protected void onTriggerReceived(ServerEventMessage e) {
393+
if (Log.isDebugEnabled())
394+
Log.d("[SSE-CLIENT] OnTriggerReceived: ("
395+
+ e.getClass().getSimpleName() + ") #"
396+
+ e.getEventId() + " on #"
397+
+ getConnectionDisplayName() + " ("
398+
+ Utils.join(channels, ",") + ")");
399+
400+
raiseEvent(e.getTarget(), e);
401+
}
402+
389403
private void onHeartbeatReceived(ServerEventMessage e) {
390404
if (Log.isDebugEnabled())
391405
Log.d("[SSE-CLIENT] OnHeartbeatReceived: ("
@@ -436,6 +450,37 @@ private void onConnectReceived() {
436450
startNewHeartbeat();
437451
}
438452

453+
public synchronized ServerEventsClient addListener(String eventName, Action<ServerEventMessage> handler){
454+
List<Action<ServerEventMessage>> handlers = listeners.get(eventName);
455+
if (handlers == null){
456+
handlers = new ArrayList<>();
457+
listeners.put(eventName, handlers);
458+
}
459+
handlers.add(handler);
460+
return this;
461+
}
462+
463+
public synchronized ServerEventsClient removeListener(String eventName, Action<ServerEventMessage> handler){
464+
List<Action<ServerEventMessage>> handlers = listeners.get(eventName);
465+
if (handlers != null){
466+
handlers.remove(handler);
467+
}
468+
return this;
469+
}
470+
471+
public synchronized void raiseEvent(String eventName, ServerEventMessage message) {
472+
List<Action<ServerEventMessage>> handlers = listeners.get(eventName);
473+
if (handlers != null){
474+
for (Action<ServerEventMessage> handler : handlers) {
475+
try {
476+
handler.apply(message);
477+
} catch (Exception e) {
478+
Log.e("Error whilst executing '" + eventName + "' handler", e);
479+
}
480+
}
481+
}
482+
}
483+
439484
Timer heratbeatTimer;
440485

441486
private void startNewHeartbeat() {

src/AndroidClient/client/src/main/java/net/servicestack/client/sse/EventStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ protected void processEventMessage(ServerEventMessage e) {
182182
cb.execute(this.client, e);
183183
}
184184
}
185+
} else if ("trigger".equals(e.getOp())){
186+
client.onTriggerReceived(e);
185187
}
186188

187189
ServerEventCallback receiver = client.getNamedReceivers().get(e.getOp());

src/AndroidClient/client/src/main/java/net/servicestack/client/sse/ServerEventsClient.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import net.servicestack.client.JsonUtils;
99
import net.servicestack.client.Log;
1010
import net.servicestack.client.Utils;
11+
import net.servicestack.func.Action;
1112
import net.servicestack.func.Func;
1213
import net.servicestack.func.Function;
1314

@@ -47,6 +48,7 @@ public class ServerEventsClient implements Closeable {
4748

4849
protected Map<String,ServerEventCallback> handlers;
4950
protected Map<String,ServerEventCallback> namedReceivers;
51+
protected Map<String, List<Action<ServerEventMessage>>> listeners;
5052

5153
protected ServerEventConnectCallback onConnect;
5254
protected ServerEventMessageCallback onMessage;
@@ -76,6 +78,7 @@ public ServerEventsClient(String baseUri, String... channels) {
7678

7779
this.handlers = new HashMap<>();
7880
this.namedReceivers = new HashMap<>();
81+
this.listeners = new HashMap<>();
7982
}
8083

8184
public ServerEventsClient(String baseUrl, String channel) {
@@ -386,6 +389,17 @@ private void onCommandReceived(ServerEventMessage e) {
386389
onCommand.execute(e);
387390
}
388391

392+
protected void onTriggerReceived(ServerEventMessage e) {
393+
if (Log.isDebugEnabled())
394+
Log.d("[SSE-CLIENT] OnTriggerReceived: ("
395+
+ e.getClass().getSimpleName() + ") #"
396+
+ e.getEventId() + " on #"
397+
+ getConnectionDisplayName() + " ("
398+
+ Utils.join(channels, ",") + ")");
399+
400+
raiseEvent(e.getTarget(), e);
401+
}
402+
389403
private void onHeartbeatReceived(ServerEventMessage e) {
390404
if (Log.isDebugEnabled())
391405
Log.d("[SSE-CLIENT] OnHeartbeatReceived: ("
@@ -436,6 +450,37 @@ private void onConnectReceived() {
436450
startNewHeartbeat();
437451
}
438452

453+
public synchronized ServerEventsClient addListener(String eventName, Action<ServerEventMessage> handler){
454+
List<Action<ServerEventMessage>> handlers = listeners.get(eventName);
455+
if (handlers == null){
456+
handlers = new ArrayList<>();
457+
listeners.put(eventName, handlers);
458+
}
459+
handlers.add(handler);
460+
return this;
461+
}
462+
463+
public synchronized ServerEventsClient removeListener(String eventName, Action<ServerEventMessage> handler){
464+
List<Action<ServerEventMessage>> handlers = listeners.get(eventName);
465+
if (handlers != null){
466+
handlers.remove(handler);
467+
}
468+
return this;
469+
}
470+
471+
public synchronized void raiseEvent(String eventName, ServerEventMessage message) {
472+
List<Action<ServerEventMessage>> handlers = listeners.get(eventName);
473+
if (handlers != null){
474+
for (Action<ServerEventMessage> handler : handlers) {
475+
try {
476+
handler.apply(message);
477+
} catch (Exception e) {
478+
Log.e("Error whilst executing '" + eventName + "' handler", e);
479+
}
480+
}
481+
}
482+
}
483+
439484
Timer heratbeatTimer;
440485

441486
private void startNewHeartbeat() {

src/AndroidClient/client/src/test/java/net/servicestack/client/ServerEventClientTests.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import net.servicestack.client.sse.ServerEventUser;
1111
import net.servicestack.client.sse.ServerEventsClient;
1212
import net.servicestack.client.sse.SingletonInstanceResolver;
13+
import net.servicestack.func.Action;
1314
import net.servicestack.func.Func;
1415

1516
import java.util.ArrayList;
@@ -902,4 +903,45 @@ public void test_Can_unsubscribe_from_channels_whilst_connected() throws Excepti
902903
assertTrue(client2.getEventStreamUri().endsWith("?channels=B"));
903904
}
904905
}
906+
907+
public void test_Does_fire_multiple_listeners_for_custom_trigger() throws Exception {
908+
List<ServerEventMessage> msgs1 = new ArrayList<>();
909+
List<ServerEventMessage> msgs2 = new ArrayList<>();
910+
911+
Action<ServerEventMessage> handler = msgs1::add;
912+
913+
try (ServerEventsClient client1 = createServerEventsClient("http://chat.servicestack.net")
914+
.addListener("customEvent", handler)
915+
.addListener("customEvent", msgs2::add)
916+
.start()
917+
.waitTillConnected();
918+
ServerEventsClient client2 = createServerEventsClient("http://chat.servicestack.net")
919+
.start()
920+
.waitTillConnected()) {
921+
922+
postRaw(client2, "trigger.customEvent", "arg");
923+
924+
while (msgs1.size() < 1 || msgs2.size() < 1) {
925+
Thread.sleep(100);
926+
}
927+
928+
assertEquals(1, msgs1.size());
929+
assertEquals(1, msgs2.size());
930+
931+
client1.removeListener("customEvent", handler);
932+
933+
postRaw(client2, "trigger.customEvent", "arg");
934+
935+
while (msgs1.size() < 1 || msgs2.size() < 2) {
936+
Thread.sleep(100);
937+
}
938+
939+
assertEquals(1, msgs1.size());
940+
assertEquals(2, msgs2.size());
941+
942+
assertTrue(Func.all(Func.concat(msgs1, msgs2), msg ->
943+
"arg".equals(JsonUtils.fromJson(msg.getJson(), String.class))));
944+
}
945+
}
946+
905947
}

0 commit comments

Comments
 (0)