Skip to content

Commit 8d2a2fb

Browse files
committed
Fix potential race condition in sub
1 parent b4fffc3 commit 8d2a2fb

File tree

2 files changed

+5
-7
lines changed

2 files changed

+5
-7
lines changed

src/main/java/io/ipfs/api/IPFS.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -596,9 +596,10 @@ private Object retrieveAndParse(String path) throws IOException {
596596
return JSONParser.parse(new String(res));
597597
}
598598

599-
private Stream<Object> retrieveAndParseStream(String path, ForkJoinPool executor) {
599+
private Stream<Object> retrieveAndParseStream(String path, ForkJoinPool executor) throws IOException {
600600
BlockingQueue<CompletableFuture<byte[]>> results = new LinkedBlockingQueue<>();
601-
executor.submit(() -> getObjectStream(path,
601+
InputStream in = retrieveStream(path);
602+
executor.submit(() -> getObjectStream(in,
602603
res -> {
603604
results.add(CompletableFuture.completedFuture(res));
604605
},
@@ -624,7 +625,7 @@ private Stream<Object> retrieveAndParseStream(String path, ForkJoinPool executor
624625
* @throws IOException
625626
*/
626627
private void retrieveAndParseStream(String path, Consumer<Object> results, Consumer<IOException> err) throws IOException {
627-
getObjectStream(path, d -> results.accept(JSONParser.parse(new String(d))), err);
628+
getObjectStream(retrieveStream(path), d -> results.accept(JSONParser.parse(new String(d))), err);
628629
}
629630

630631
private byte[] retrieve(String path) throws IOException {
@@ -654,11 +655,10 @@ private static byte[] get(URL target) throws IOException {
654655
}
655656
}
656657

657-
private void getObjectStream(String path, Consumer<byte[]> processor, Consumer<IOException> error) {
658+
private void getObjectStream(InputStream in, Consumer<byte[]> processor, Consumer<IOException> error) {
658659
byte LINE_FEED = (byte)10;
659660

660661
try {
661-
InputStream in = retrieveStream(path);
662662
ByteArrayOutputStream resp = new ByteArrayOutputStream();
663663

664664
byte[] buf = new byte[4096];
@@ -684,7 +684,6 @@ private static InputStream getStream(URL target) throws IOException {
684684
HttpURLConnection conn = (HttpURLConnection) target.openConnection();
685685
conn.setRequestMethod("GET");
686686
conn.setRequestProperty("Content-Type", "application/json");
687-
688687
return conn.getInputStream();
689688
}
690689

src/test/java/io/ipfs/api/APITest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,6 @@ public void pubsubSynchronous() throws Exception {
410410
public void pubsub() throws Exception {
411411
String topic = "topic" + System.nanoTime();
412412
Stream<Map<String, Object>> sub = ipfs.pubsub.sub(topic);
413-
Thread.sleep(100); // There's a race condition in ipfs
414413
String data = "Hello!";
415414
Object pub = ipfs.pubsub.pub(topic, data);
416415
Object pub2 = ipfs.pubsub.pub(topic, "G'day");

0 commit comments

Comments
 (0)