Skip to content

Commit b4fffc3

Browse files
committed
Update pubsub api
1 parent acce27a commit b4fffc3

File tree

2 files changed

+20
-25
lines changed

2 files changed

+20
-25
lines changed

src/main/java/io/ipfs/api/IPFS.java

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -246,14 +246,12 @@ public Object pub(String topic, String data) throws Exception {
246246
return retrieveAndParse("pubsub/pub?arg="+topic + "&arg=" + data);
247247
}
248248

249-
public Supplier<CompletableFuture<Map<String, Object>>> sub(String topic) throws Exception {
249+
public Stream<Map<String, Object>> sub(String topic) throws Exception {
250250
return sub(topic, ForkJoinPool.commonPool());
251251
}
252252

253-
public Supplier<CompletableFuture<Map<String, Object>>> sub(String topic, ForkJoinPool threadSupplier) throws Exception {
254-
Supplier<CompletableFuture<Object>> sup = retrieveAndParseStream("pubsub/sub?arg=" + topic, threadSupplier);
255-
return () -> sup.get()
256-
.thenApply(obj -> (Map)obj);
253+
public Stream<Map<String, Object>> sub(String topic, ForkJoinPool threadSupplier) throws Exception {
254+
return retrieveAndParseStream("pubsub/sub?arg=" + topic, threadSupplier).map(obj -> (Map)obj);
257255
}
258256

259257
/**
@@ -262,7 +260,7 @@ public Supplier<CompletableFuture<Map<String, Object>>> sub(String topic, ForkJo
262260
* @param results
263261
* @throws IOException
264262
*/
265-
public void sub(String topic, Consumer<Map<String, Object>> results, Consumer<Throwable> error) throws IOException {
263+
public void sub(String topic, Consumer<Map<String, Object>> results, Consumer<IOException> error) throws IOException {
266264
retrieveAndParseStream("pubsub/sub?arg="+topic, res -> results.accept((Map)res), error);
267265
}
268266

@@ -598,27 +596,25 @@ private Object retrieveAndParse(String path) throws IOException {
598596
return JSONParser.parse(new String(res));
599597
}
600598

601-
private Supplier<CompletableFuture<Object>> retrieveAndParseStream(String path, ForkJoinPool executor) throws IOException {
602-
BlockingQueue<CompletableFuture<byte[]>> futures = new LinkedBlockingQueue<>();
603-
CompletableFuture<byte[]> first = new CompletableFuture<>();
604-
futures.add(first);
599+
private Stream<Object> retrieveAndParseStream(String path, ForkJoinPool executor) {
600+
BlockingQueue<CompletableFuture<byte[]>> results = new LinkedBlockingQueue<>();
605601
executor.submit(() -> getObjectStream(path,
606602
res -> {
607-
futures.peek().complete(res);
608-
futures.add(new CompletableFuture<>());
603+
results.add(CompletableFuture.completedFuture(res));
609604
},
610605
err -> {
611-
futures.peek().completeExceptionally(err);
612-
futures.add(new CompletableFuture<>());
606+
CompletableFuture<byte[]> fut = new CompletableFuture<>();
607+
fut.completeExceptionally(err);
608+
results.add(fut);
613609
})
614610
);
615-
return () -> {
611+
return Stream.generate(() -> {
616612
try {
617-
return futures.take().thenApply(arr -> JSONParser.parse(new String(arr)));
618-
} catch (InterruptedException e) {
613+
return JSONParser.parse(new String(results.take().get()));
614+
} catch (Exception e) {
619615
throw new RuntimeException(e);
620616
}
621-
};
617+
});
622618
}
623619

624620
/**
@@ -627,7 +623,7 @@ private Supplier<CompletableFuture<Object>> retrieveAndParseStream(String path,
627623
* @param results
628624
* @throws IOException
629625
*/
630-
private void retrieveAndParseStream(String path, Consumer<Object> results, Consumer<Throwable> err) throws IOException {
626+
private void retrieveAndParseStream(String path, Consumer<Object> results, Consumer<IOException> err) throws IOException {
631627
getObjectStream(path, d -> results.accept(JSONParser.parse(new String(d))), err);
632628
}
633629

@@ -658,7 +654,7 @@ private static byte[] get(URL target) throws IOException {
658654
}
659655
}
660656

661-
private void getObjectStream(String path, Consumer<byte[]> processor, Consumer<Throwable> error) {
657+
private void getObjectStream(String path, Consumer<byte[]> processor, Consumer<IOException> error) {
662658
byte LINE_FEED = (byte)10;
663659

664660
try {

src/test/java/io/ipfs/api/APITest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -408,15 +408,14 @@ public void pubsubSynchronous() throws Exception {
408408

409409
@Test
410410
public void pubsub() throws Exception {
411-
Object ls = ipfs.pubsub.ls();
412-
Object peers = ipfs.pubsub.peers();
413411
String topic = "topic" + System.nanoTime();
414-
Supplier<CompletableFuture<Map<String, Object>>> sub = ipfs.pubsub.sub(topic);
412+
Stream<Map<String, Object>> sub = ipfs.pubsub.sub(topic);
415413
Thread.sleep(100); // There's a race condition in ipfs
416414
String data = "Hello!";
417415
Object pub = ipfs.pubsub.pub(topic, data);
418-
Map result = sub.get().get();
419-
Assert.assertTrue( ! result.equals(Collections.emptyMap()));
416+
Object pub2 = ipfs.pubsub.pub(topic, "G'day");
417+
List<Map> results = sub.limit(2).collect(Collectors.toList());
418+
Assert.assertTrue( ! results.get(0).equals(Collections.emptyMap()));
420419
}
421420

422421
private static String toEscapedHex(byte[] in) throws IOException {

0 commit comments

Comments
 (0)