Skip to content

Commit 31be2cf

Browse files
committed
Add a synchronous version of subscribe which consumes the calling thread. This allows much lower latency (1ms)
1 parent 9856047 commit 31be2cf

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

src/main/java/io/ipfs/api/IPFS.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,18 @@ public Supplier<Object> sub(String topic) throws IOException {
253253
public Supplier<Object> sub(String topic, ForkJoinPool threadSupplier) throws IOException {
254254
return retrieveAndParseStream("pubsub/sub?arg="+topic, threadSupplier);
255255
}
256+
257+
/**
258+
* A synchronous method to subscribe which consumes the calling thread
259+
* @param topic
260+
* @param results
261+
* @throws IOException
262+
*/
263+
public void sub(String topic, Consumer<Object> results) throws IOException {
264+
retrieveAndParseStream("pubsub/sub?arg="+topic, results);
265+
}
266+
267+
256268
}
257269

258270
/* 'ipfs block' is a plumbing command used to manipulate raw ipfs blocks.
@@ -581,6 +593,16 @@ private Supplier<Object> retrieveAndParseStream(String path, ForkJoinPool execut
581593
};
582594
}
583595

596+
/**
597+
* A synchronous stream retriever that consumes the calling thread
598+
* @param path
599+
* @param results
600+
* @throws IOException
601+
*/
602+
private void retrieveAndParseStream(String path, Consumer<Object> results) throws IOException {
603+
getObjectStream(path, d -> results.accept(JSONParser.parse(new String(d))));
604+
}
605+
584606
private byte[] retrieve(String path) throws IOException {
585607
URL target = new URL("http", host, port, version + path);
586608
return IPFS.get(target);

src/test/java/io/ipfs/api/APITest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,31 @@ public void bulkBlockTest() throws IOException {
380380
System.out.println();
381381
}
382382

383+
@Test
384+
public void pubsubSynchronous() throws IOException {
385+
String topic = "topic" + System.nanoTime();
386+
List<Object> res = Collections.synchronizedList(new ArrayList<>());
387+
new Thread(() -> {
388+
try {
389+
ipfs.pubsub.sub(topic, res::add);
390+
} catch (IOException e) {
391+
throw new RuntimeException(e);}
392+
}).start();
393+
394+
long start = System.currentTimeMillis();
395+
for (int i=1; i < 100; ) {
396+
long t1 = System.currentTimeMillis();
397+
ipfs.pubsub.pub(topic, "Hello!");
398+
if (res.size() >= i) {
399+
long t2 = System.currentTimeMillis();
400+
System.out.println("pub => sub took " + (t2 - t1));
401+
i++;
402+
}
403+
}
404+
long duration = System.currentTimeMillis() - start;
405+
Assert.assertTrue("Fast synchronous pub-sub", duration < 1000);
406+
}
407+
383408
@Test
384409
public void pubsub() throws IOException {
385410
Object ls = ipfs.pubsub.ls();

0 commit comments

Comments
 (0)