@@ -114,24 +114,24 @@ public List<MerkleNode> add(List<NamedStreamable> files, boolean wrap, boolean h
114114 }
115115
116116 public List <MerkleNode > ls (Multihash hash ) throws IOException {
117- Map res = retrieveMap ("ls/ " + hash );
117+ Map res = retrieveMap ("ls?arg= " + hash );
118118 return ((List <Object >) res .get ("Objects" )).stream ().map (x -> MerkleNode .fromJSON ((Map ) x )).collect (Collectors .toList ());
119119 }
120120
121121 public byte [] cat (Multihash hash ) throws IOException {
122- return retrieve ("cat/ " + hash );
122+ return retrieve ("cat?arg= " + hash );
123123 }
124124
125125 public byte [] cat (Multihash hash , String subPath ) throws IOException {
126126 return retrieve ("cat?arg=" + hash + URLEncoder .encode (subPath , "UTF-8" ));
127127 }
128128
129129 public byte [] get (Multihash hash ) throws IOException {
130- return retrieve ("get/ " + hash );
130+ return retrieve ("get?arg= " + hash );
131131 }
132132
133133 public InputStream catStream (Multihash hash ) throws IOException {
134- return retrieveStream ("cat/ " + hash );
134+ return retrieveStream ("cat?arg= " + hash );
135135 }
136136
137137 public List <Multihash > refs (Multihash hash , boolean recursive ) throws IOException {
@@ -263,17 +263,16 @@ public Object peers(String topic) throws IOException {
263263 * @return
264264 * @throws IOException
265265 */
266- public Object pub (String topic , String data ) throws IOException {
266+ public Object pub (String topic , String data ) throws Exception {
267267 return retrieveAndParse ("pubsub/pub?arg=" +topic + "&arg=" + data );
268268 }
269269
270- public Supplier <Map <String , Object >> sub (String topic ) throws IOException {
270+ public Stream <Map <String , Object >> sub (String topic ) throws Exception {
271271 return sub (topic , ForkJoinPool .commonPool ());
272272 }
273273
274- public Supplier <Map <String , Object >> sub (String topic , ForkJoinPool threadSupplier ) throws IOException {
275- Supplier <Object > sup = retrieveAndParseStream ("pubsub/sub?arg=" + topic , threadSupplier );
276- return () -> (Map ) sup .get ();
274+ public Stream <Map <String , Object >> sub (String topic , ForkJoinPool threadSupplier ) throws Exception {
275+ return retrieveAndParseStream ("pubsub/sub?arg=" + topic , threadSupplier ).map (obj -> (Map )obj );
277276 }
278277
279278 /**
@@ -282,8 +281,8 @@ public Supplier<Map<String, Object>> sub(String topic, ForkJoinPool threadSuppli
282281 * @param results
283282 * @throws IOException
284283 */
285- public void sub (String topic , Consumer <Map <String , Object >> results ) throws IOException {
286- retrieveAndParseStream ("pubsub/sub?arg=" +topic , res -> results .accept ((Map )res ));
284+ public void sub (String topic , Consumer <Map <String , Object >> results , Consumer < IOException > error ) throws IOException {
285+ retrieveAndParseStream ("pubsub/sub?arg=" +topic , res -> results .accept ((Map )res ), error );
287286 }
288287
289288
@@ -417,8 +416,8 @@ public Map findprovs(Multihash hash) throws IOException {
417416 return retrieveMap ("dht/findprovs?arg=" + hash );
418417 }
419418
420- public Map query (MultiAddress addr ) throws IOException {
421- return retrieveMap ("dht/query?arg=" + addr .toString ());
419+ public Map query (Multihash peerId ) throws IOException {
420+ return retrieveMap ("dht/query?arg=" + peerId .toString ());
422421 }
423422
424423 public Map findpeer (Multihash id ) throws IOException {
@@ -443,7 +442,15 @@ public Map ls(Multihash path) throws IOException {
443442 // Network commands
444443
445444 public List <MultiAddress > bootstrap () throws IOException {
446- return ((List <String >)retrieveMap ("bootstrap/" ).get ("Peers" )).stream ().map (x -> new MultiAddress (x )).collect (Collectors .toList ());
445+ return ((List <String >)retrieveMap ("bootstrap/" ).get ("Peers" ))
446+ .stream ()
447+ .flatMap (x -> {
448+ try {
449+ return Stream .of (new MultiAddress (x ));
450+ } catch (Exception e ) {
451+ return Stream .empty ();
452+ }
453+ }).collect (Collectors .toList ());
447454 }
448455
449456 public class Bootstrap {
@@ -471,7 +478,14 @@ public List<MultiAddress> rm(MultiAddress addr, boolean all) throws IOException
471478 public class Swarm {
472479 public List <Peer > peers () throws IOException {
473480 Map m = retrieveMap ("swarm/peers?stream-channels=true" );
474- return ((List <Object >)m .get ("Peers" )).stream ().map (Peer ::fromJSON ).collect (Collectors .toList ());
481+ return ((List <Object >)m .get ("Peers" )).stream ()
482+ .flatMap (json -> {
483+ try {
484+ return Stream .of (Peer .fromJSON (json ));
485+ } catch (Exception e ) {
486+ return Stream .empty ();
487+ }
488+ }).collect (Collectors .toList ());
475489 }
476490
477491 public Map addrs () throws IOException {
@@ -603,16 +617,26 @@ private Object retrieveAndParse(String path) throws IOException {
603617 return JSONParser .parse (new String (res ));
604618 }
605619
606- private Supplier <Object > retrieveAndParseStream (String path , ForkJoinPool executor ) throws IOException {
607- BlockingQueue <byte []> objects = new LinkedBlockingQueue <>();
608- executor .submit (() -> getObjectStream (path , objects ::add ));
609- return () -> {
620+ private Stream <Object > retrieveAndParseStream (String path , ForkJoinPool executor ) throws IOException {
621+ BlockingQueue <CompletableFuture <byte []>> results = new LinkedBlockingQueue <>();
622+ InputStream in = retrieveStream (path );
623+ executor .submit (() -> getObjectStream (in ,
624+ res -> {
625+ results .add (CompletableFuture .completedFuture (res ));
626+ },
627+ err -> {
628+ CompletableFuture <byte []> fut = new CompletableFuture <>();
629+ fut .completeExceptionally (err );
630+ results .add (fut );
631+ })
632+ );
633+ return Stream .generate (() -> {
610634 try {
611- return JSONParser .parse (new String (objects .take ()));
612- } catch (InterruptedException e ) {
635+ return JSONParser .parse (new String (results .take (). get ()));
636+ } catch (Exception e ) {
613637 throw new RuntimeException (e );
614638 }
615- };
639+ }) ;
616640 }
617641
618642 /**
@@ -621,8 +645,8 @@ private Supplier<Object> retrieveAndParseStream(String path, ForkJoinPool execut
621645 * @param results
622646 * @throws IOException
623647 */
624- private void retrieveAndParseStream (String path , Consumer <Object > results ) throws IOException {
625- getObjectStream (path , d -> results .accept (JSONParser .parse (new String (d ))));
648+ private void retrieveAndParseStream (String path , Consumer <Object > results , Consumer < IOException > err ) throws IOException {
649+ getObjectStream (retrieveStream ( path ) , d -> results .accept (JSONParser .parse (new String (d ))), err );
626650 }
627651
628652 private byte [] retrieve (String path ) throws IOException {
@@ -652,11 +676,10 @@ private static byte[] get(URL target) throws IOException {
652676 }
653677 }
654678
655- private void getObjectStream (String path , Consumer <byte []> processor ) {
679+ private void getObjectStream (InputStream in , Consumer <byte []> processor , Consumer < IOException > error ) {
656680 byte LINE_FEED = (byte )10 ;
657681
658682 try {
659- InputStream in = retrieveStream (path );
660683 ByteArrayOutputStream resp = new ByteArrayOutputStream ();
661684
662685 byte [] buf = new byte [4096 ];
@@ -669,7 +692,7 @@ private void getObjectStream(String path, Consumer<byte[]> processor) {
669692 }
670693 }
671694 } catch (IOException e ) {
672- e . printStackTrace ( );
695+ error . accept ( e );
673696 }
674697 }
675698
@@ -682,7 +705,6 @@ private static InputStream getStream(URL target) throws IOException {
682705 HttpURLConnection conn = (HttpURLConnection ) target .openConnection ();
683706 conn .setRequestMethod ("GET" );
684707 conn .setRequestProperty ("Content-Type" , "application/json" );
685-
686708 return conn .getInputStream ();
687709 }
688710
0 commit comments