11package io .ipfs .api ;
22
33import io .ipfs .cid .*;
4+ import io .ipfs .multibase .*;
45import io .ipfs .multihash .Multihash ;
56import io .ipfs .multiaddr .MultiAddress ;
67
@@ -276,16 +277,24 @@ public Object peers(String topic) throws IOException {
276277 * @return
277278 * @throws IOException
278279 */
279- public Object pub (String topic , String data ) throws Exception {
280- return retrieveAndParse ("pubsub/pub?arg=" +topic + "&arg=" + data );
280+ public void pub (String topic , String data ) {
281+ String encodedTopic = Multibase .encode (Multibase .Base .Base64Url , topic .getBytes ());
282+ Multipart m = new Multipart (protocol +"://" + host + ":" + port + version +"pubsub/pub?arg=" + encodedTopic , "UTF-8" );
283+ try {
284+ m .addFilePart ("file" , Paths .get ("" ), new NamedStreamable .ByteArrayWrapper (data .getBytes ()));
285+ String res = m .finish ();
286+ } catch (IOException e ) {
287+ throw new RuntimeException (e .getMessage (), e );
288+ }
281289 }
282290
283291 public Stream <Map <String , Object >> sub (String topic ) throws Exception {
284292 return sub (topic , ForkJoinPool .commonPool ());
285293 }
286294
287295 public Stream <Map <String , Object >> sub (String topic , ForkJoinPool threadSupplier ) throws Exception {
288- return retrieveAndParseStream ("pubsub/sub?arg=" + topic , threadSupplier ).map (obj -> (Map )obj );
296+ String encodedTopic = Multibase .encode (Multibase .Base .Base64Url , topic .getBytes ());
297+ return retrieveAndParseStream ("pubsub/sub?arg=" + encodedTopic , threadSupplier ).map (obj -> (Map )obj );
289298 }
290299
291300 /**
@@ -295,10 +304,9 @@ public Stream<Map<String, Object>> sub(String topic, ForkJoinPool threadSupplier
295304 * @throws IOException
296305 */
297306 public void sub (String topic , Consumer <Map <String , Object >> results , Consumer <IOException > error ) throws IOException {
298- retrieveAndParseStream ("pubsub/sub?arg=" +topic , res -> results .accept ((Map )res ), error );
307+ String encodedTopic = Multibase .encode (Multibase .Base .Base64Url , topic .getBytes ());
308+ retrieveAndParseStream ("pubsub/sub?arg=" +encodedTopic , res -> results .accept ((Map )res ), error );
299309 }
300-
301-
302310 }
303311
304312 /* 'ipfs block' is a plumbing command used to manipulate raw ipfs blocks.
@@ -778,7 +786,15 @@ private InputStream retrieveStream(String path) throws IOException {
778786
779787 private static InputStream getStream (URL target , int connectTimeoutMillis , int readTimeoutMillis ) throws IOException {
780788 HttpURLConnection conn = configureConnection (target , "POST" , connectTimeoutMillis , readTimeoutMillis );
781- return conn .getInputStream ();
789+ try {
790+ return conn .getInputStream ();
791+ } catch (IOException e ) {
792+ e .printStackTrace ();
793+ InputStream errorStream = conn .getErrorStream ();
794+ String err = errorStream == null ? e .getMessage () : new String (readFully (errorStream ));
795+ List <String > trailer = conn .getHeaderFields ().get ("Trailer" );
796+ throw new RuntimeException ("IOException contacting IPFS daemon.\n " +err +"\n Trailer: " + trailer , e );
797+ }
782798 }
783799
784800 private Map postMap (String path , byte [] body , Map <String , String > headers ) throws IOException {
0 commit comments