@@ -234,14 +234,14 @@ public Object peers(String topic) throws IOException {
234234 }
235235
236236 public Object pub (String topic , String data ) throws IOException {
237- return retrieveAndParse ("pubsub/peers?arg=" +topic + "&data =" + data );
237+ return retrieveAndParse ("pubsub/peers?arg=" +topic + "&arg =" + data );
238238 }
239239
240- public Stream <Object > sub (String topic ) throws IOException {
240+ public Supplier <Object > sub (String topic ) throws IOException {
241241 return sub (topic , ForkJoinPool .commonPool ());
242242 }
243243
244- public Stream <Object > sub (String topic , ForkJoinPool threadSupplier ) throws IOException {
244+ public Supplier <Object > sub (String topic , ForkJoinPool threadSupplier ) throws IOException {
245245 return retrieveAndParseStream ("pubsub/sub?arg=" +topic , threadSupplier );
246246 }
247247 }
@@ -556,10 +556,16 @@ private Object retrieveAndParse(String path) throws IOException {
556556 return JSONParser .parse (new String (res ));
557557 }
558558
559- private Stream <Object > retrieveAndParseStream (String path , ForkJoinPool executor ) throws IOException {
560- Queue <byte []> objects = new LinkedBlockingQueue <>();
559+ private Supplier <Object > retrieveAndParseStream (String path , ForkJoinPool executor ) throws IOException {
560+ BlockingQueue <byte []> objects = new LinkedBlockingQueue <>();
561561 executor .submit (() -> getObjectStream (path , objects ::add ));
562- return Stream .generate (() -> JSONParser .parse (new String (objects .poll ())));
562+ return () -> {
563+ try {
564+ return JSONParser .parse (new String (objects .take ()));
565+ } catch (InterruptedException e ) {
566+ throw new RuntimeException (e );
567+ }
568+ };
563569 }
564570
565571 private byte [] retrieve (String path ) throws IOException {
0 commit comments