Skip to content

Commit 3c18663

Browse files
committed
Update pubsub api to enable handling errors e.g. ipfs daemon terminating
1 parent 2d07776 commit 3c18663

File tree

2 files changed

+35
-23
lines changed

2 files changed

+35
-23
lines changed

src/main/java/io/ipfs/api/IPFS.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -242,17 +242,18 @@ public Object peers(String topic) throws IOException {
242242
* @return
243243
* @throws IOException
244244
*/
245-
public Object pub(String topic, String data) throws IOException {
245+
public Object pub(String topic, String data) throws Exception {
246246
return retrieveAndParse("pubsub/pub?arg="+topic + "&arg=" + data);
247247
}
248248

249-
public Supplier<Map<String, Object>> sub(String topic) throws IOException {
249+
public Supplier<CompletableFuture<Map<String, Object>>> sub(String topic) throws Exception {
250250
return sub(topic, ForkJoinPool.commonPool());
251251
}
252252

253-
public Supplier<Map<String, Object>> sub(String topic, ForkJoinPool threadSupplier) throws IOException {
254-
Supplier<Object> sup = retrieveAndParseStream("pubsub/sub?arg=" + topic, threadSupplier);
255-
return () -> (Map) sup.get();
253+
public Supplier<CompletableFuture<Map<String, Object>>> sub(String topic, ForkJoinPool threadSupplier) throws Exception {
254+
Supplier<CompletableFuture<Object>> sup = retrieveAndParseStream("pubsub/sub?arg=" + topic, threadSupplier);
255+
return () -> sup.get()
256+
.thenApply(obj -> (Map)obj);
256257
}
257258

258259
/**
@@ -261,8 +262,8 @@ public Supplier<Map<String, Object>> sub(String topic, ForkJoinPool threadSuppli
261262
* @param results
262263
* @throws IOException
263264
*/
264-
public void sub(String topic, Consumer<Map<String, Object>> results) throws IOException {
265-
retrieveAndParseStream("pubsub/sub?arg="+topic, res -> results.accept((Map)res));
265+
public void sub(String topic, Consumer<Map<String, Object>> results, Consumer<Throwable> error) throws IOException {
266+
retrieveAndParseStream("pubsub/sub?arg="+topic, res -> results.accept((Map)res), error);
266267
}
267268

268269

@@ -582,12 +583,23 @@ private Object retrieveAndParse(String path) throws IOException {
582583
return JSONParser.parse(new String(res));
583584
}
584585

585-
private Supplier<Object> retrieveAndParseStream(String path, ForkJoinPool executor) throws IOException {
586-
BlockingQueue<byte[]> objects = new LinkedBlockingQueue<>();
587-
executor.submit(() -> getObjectStream(path, objects::add));
586+
private Supplier<CompletableFuture<Object>> retrieveAndParseStream(String path, ForkJoinPool executor) throws IOException {
587+
BlockingQueue<CompletableFuture<byte[]>> futures = new LinkedBlockingQueue<>();
588+
CompletableFuture<byte[]> first = new CompletableFuture<>();
589+
futures.add(first);
590+
executor.submit(() -> getObjectStream(path,
591+
res -> {
592+
futures.peek().complete(res);
593+
futures.add(new CompletableFuture<>());
594+
},
595+
err -> {
596+
futures.peek().completeExceptionally(err);
597+
futures.add(new CompletableFuture<>());
598+
})
599+
);
588600
return () -> {
589601
try {
590-
return JSONParser.parse(new String(objects.take()));
602+
return futures.take().thenApply(arr -> JSONParser.parse(new String(arr)));
591603
} catch (InterruptedException e) {
592604
throw new RuntimeException(e);
593605
}
@@ -600,8 +612,8 @@ private Supplier<Object> retrieveAndParseStream(String path, ForkJoinPool execut
600612
* @param results
601613
* @throws IOException
602614
*/
603-
private void retrieveAndParseStream(String path, Consumer<Object> results) throws IOException {
604-
getObjectStream(path, d -> results.accept(JSONParser.parse(new String(d))));
615+
private void retrieveAndParseStream(String path, Consumer<Object> results, Consumer<Throwable> err) throws IOException {
616+
getObjectStream(path, d -> results.accept(JSONParser.parse(new String(d))), err);
605617
}
606618

607619
private byte[] retrieve(String path) throws IOException {
@@ -631,7 +643,7 @@ private static byte[] get(URL target) throws IOException {
631643
}
632644
}
633645

634-
private void getObjectStream(String path, Consumer<byte[]> processor) {
646+
private void getObjectStream(String path, Consumer<byte[]> processor, Consumer<Throwable> error) {
635647
byte LINE_FEED = (byte)10;
636648

637649
try {
@@ -648,7 +660,7 @@ private void getObjectStream(String path, Consumer<byte[]> processor) {
648660
}
649661
}
650662
} catch (IOException e) {
651-
e.printStackTrace();
663+
error.accept(e);
652664
}
653665
}
654666

src/test/java/io/ipfs/api/APITest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.io.*;
1010
import java.nio.file.*;
1111
import java.util.*;
12+
import java.util.concurrent.*;
1213
import java.util.function.*;
1314
import java.util.stream.*;
1415

@@ -381,12 +382,12 @@ public void bulkBlockTest() throws IOException {
381382
}
382383

383384
@Test
384-
public void pubsubSynchronous() throws IOException {
385+
public void pubsubSynchronous() throws Exception {
385386
String topic = "topic" + System.nanoTime();
386387
List<Map<String, Object>> res = Collections.synchronizedList(new ArrayList<>());
387388
new Thread(() -> {
388389
try {
389-
ipfs.pubsub.sub(topic, res::add);
390+
ipfs.pubsub.sub(topic, res::add, t -> t.printStackTrace());
390391
} catch (IOException e) {
391392
throw new RuntimeException(e);}
392393
}).start();
@@ -406,17 +407,16 @@ public void pubsubSynchronous() throws IOException {
406407
}
407408

408409
@Test
409-
public void pubsub() throws IOException {
410+
public void pubsub() throws Exception {
410411
Object ls = ipfs.pubsub.ls();
411412
Object peers = ipfs.pubsub.peers();
412413
String topic = "topic" + System.nanoTime();
413-
Supplier<Map<String, Object>> sub = ipfs.pubsub.sub(topic);
414-
Map<String, Object> first = sub.get();
415-
Assert.assertTrue(first.equals(Collections.emptyMap()));
414+
Supplier<CompletableFuture<Map<String, Object>>> sub = ipfs.pubsub.sub(topic);
415+
Thread.sleep(100); // There's a race condition in ipfs
416416
String data = "Hello!";
417417
Object pub = ipfs.pubsub.pub(topic, data);
418-
Map second = sub.get();
419-
Assert.assertTrue( ! second.equals(Collections.emptyMap()));
418+
Map result = sub.get().get();
419+
Assert.assertTrue( ! result.equals(Collections.emptyMap()));
420420
}
421421

422422
private static String toEscapedHex(byte[] in) throws IOException {

0 commit comments

Comments
 (0)