3131import java .net .*;
3232import java .net .http .*;
3333import java .net .http .HttpResponse .ResponseInfo ;
34+ import java .nio .ByteBuffer ;
3435import java .security .*;
3536import java .security .cert .X509Certificate ;
3637import java .util .*;
38+ import java .util .concurrent .Flow ;
3739import java .util .stream .Collectors ;
3840
3941public class HttpProxyClient extends HttpClient {
@@ -45,14 +47,14 @@ public HttpProxyClient(HttpClientSettings httpClientSettings, String url, HttpRe
4547 this .proxySettings = httpClientSettings .getProxyClientSettings ();
4648 }
4749
48- public ActionResult <HttpResponse <byte [] >> toResponse ( ) {
50+ public < T > ActionResult <HttpResponse <T >> toHttpResponse ( HttpResponse . BodyHandler < T > handler ) {
4951 return switch (proxySettings .getType ()) {
50- case HTTP , DIRECT -> handleHttpProxyRequest ();
51- default -> handleSocksProxyRequest ();
52+ case HTTP , DIRECT -> handleHttpProxyRequest (handler );
53+ default -> handleSocksProxyRequest (handler );
5254 };
5355 }
5456
55- public ActionResult <HttpResponse <byte [] >> handleHttpProxyRequest () {
57+ public < T > ActionResult <HttpResponse <T >> handleHttpProxyRequest (HttpResponse . BodyHandler < T > handler ) {
5658 var builder = java .net .http .HttpClient .newBuilder ()
5759 .followRedirects (java .net .http .HttpClient .Redirect .NORMAL )
5860 .cookieHandler (new CookieManager ())
@@ -67,7 +69,7 @@ public ActionResult<HttpResponse<byte[]>> handleHttpProxyRequest() {
6769 var client = builder .build ();
6870 var request = prepareRequest ();
6971
70- var response = client .send (request , HttpResponse . BodyHandlers . ofByteArray () );
72+ var response = client .send (request , handler );
7173 if (response .statusCode () != 200 )
7274 continue ;
7375 return ActionResult .success (response );
@@ -77,7 +79,7 @@ public ActionResult<HttpResponse<byte[]>> handleHttpProxyRequest() {
7779 throw new TikTokProxyRequestException (e );
7880 } catch (IOException e ) {
7981 if (e .getMessage ().contains ("503" ) && proxySettings .isFallback ()) // Indicates proxy protocol is not supported
80- return super .toHttpResponse (HttpResponse . BodyHandlers . ofByteArray () );
82+ return super .toHttpResponse (handler );
8183 throw new TikTokProxyRequestException (e );
8284 } catch (Exception e ) {
8385 throw new TikTokLiveRequestException (e );
@@ -86,7 +88,7 @@ public ActionResult<HttpResponse<byte[]>> handleHttpProxyRequest() {
8688 throw new TikTokLiveRequestException ("No more proxies available!" );
8789 }
8890
89- private ActionResult <HttpResponse <byte [] >> handleSocksProxyRequest () {
91+ private < T > ActionResult <HttpResponse <T >> handleSocksProxyRequest (HttpResponse . BodyHandler < T > handler ) {
9092 try {
9193 SSLContext sc = SSLContext .getInstance ("SSL" );
9294 sc .init (null , new TrustManager []{ new X509TrustManager () {
@@ -95,7 +97,8 @@ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) {}
9597 public X509Certificate [] getAcceptedIssuers () { return null ; }
9698 }}, null );
9799
98- URL url = toUri ().toURL ();
100+ URI uri = toUri ();
101+ URL url = uri .toURL ();
99102
100103 if (proxySettings .hasNext ()) {
101104 try {
@@ -117,12 +120,22 @@ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) {}
117120
118121 var responseInfo = createResponseInfo (socksConnection .getResponseCode (), headers );
119122
120- var response = createHttpResponse (body , toUri (), responseInfo );
123+ HttpResponse .BodySubscriber <T > subscriber = handler .apply (responseInfo );
124+
125+ subscriber .onSubscribe (new Flow .Subscription () {
126+ @ Override public void request (long n ) {}
127+ @ Override public void cancel () {}
128+ });
129+
130+ subscriber .onNext (List .of (ByteBuffer .wrap (body )));
131+ subscriber .onComplete ();
132+
133+ var response = createHttpResponse (subscriber .getBody ().toCompletableFuture ().join (), uri , responseInfo );
121134
122135 return ActionResult .success (response );
123136 } catch (IOException e ) {
124137 if (e .getMessage ().contains ("503" ) && proxySettings .isFallback ()) // Indicates proxy protocol is not supported
125- return super .toHttpResponse (HttpResponse . BodyHandlers . ofByteArray () );
138+ return super .toHttpResponse (handler );
126139 if (proxySettings .isAutoDiscard ())
127140 proxySettings .remove ();
128141 throw new TikTokProxyRequestException (e );
@@ -160,7 +173,7 @@ public java.net.http.HttpClient.Version version() {
160173 };
161174 }
162175
163- private HttpResponse <byte [] > createHttpResponse (byte [] body ,
176+ private < T > HttpResponse <T > createHttpResponse (T body ,
164177 URI uri ,
165178 ResponseInfo info ) {
166179 return new HttpResponse <>()
@@ -176,7 +189,7 @@ public HttpRequest request() {
176189 }
177190
178191 @ Override
179- public Optional <HttpResponse <byte [] >> previousResponse () {
192+ public Optional <HttpResponse <T >> previousResponse () {
180193 return Optional .empty ();
181194 }
182195
@@ -186,7 +199,7 @@ public HttpHeaders headers() {
186199 }
187200
188201 @ Override
189- public byte [] body () {
202+ public T body () {
190203 return body ;
191204 }
192205
0 commit comments