Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface BackoffStrategy {
* @param attempt consecutive reconnect attempt number (1-based)
* @param previousDelayMs last delay used (0 for first attempt)
* @param serverRetryHintMs value from server 'retry:' (ms) or HTTP Retry-After, or null if none
* @return delay in milliseconds (>= 0)
* @return delay in milliseconds, greater than or equal to {@code 0}
*/
long nextDelayMs(int attempt, long previousDelayMs, Long serverRetryHintMs);

Expand All @@ -49,7 +49,7 @@ public interface BackoffStrategy {
* @param attempt consecutive reconnect attempt number (1-based)
* @param previousDelayMs last delay used (0 for first attempt)
* @param serverRetryHintMs value from server 'retry:' (ms) or HTTP Retry-After, or null if none
* @return true to reconnect, false to stop
* @return {@code true} to reconnect, {@code false} to stop
*/
default boolean shouldReconnect(final int attempt, final long previousDelayMs, final Long serverRetryHintMs) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* <p>Implementations should keep handlers lightweight and non-blocking.
* If you need to do heavy work, offload to your own executor.</p>
*
* <h3>Invocation & threading</h3>
* <h3>Invocation &amp; threading</h3>
* <ul>
* <li>Methods may be invoked on an internal callback executor supplied to the
* {@code EventSource} (or on the caller thread if none was supplied).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ static CloseableHttpAsyncClient getSharedClient() {
*
* <p>Use this when you want to set defaults such as headers, backoff,
* parser strategy (char vs. byte), or custom executors for scheduling and callbacks.</p>
* @return a new {@link SseExecutorBuilder}
*/
public static SseExecutorBuilder custom() {
return new SseExecutorBuilder();
Expand All @@ -137,6 +138,7 @@ public static SseExecutorBuilder custom() {
* <p>Streams opened by this executor will share one underlying {@link CloseableHttpAsyncClient}
* instance. {@link #close()} will be a no-op; call {@link #closeSharedClient()} to
* explicitly shut the shared client down (for tests / application shutdown).</p>
* @return a new {@link SseExecutor}
*/
public static SseExecutor newInstance() {
final CloseableHttpAsyncClient c = getSharedClient();
Expand All @@ -152,6 +154,8 @@ public static SseExecutor newInstance() {
* @param client an already constructed async client
* @throws NullPointerException if {@code client} is {@code null}
* @throws IllegalStateException if the client is shutting down or shut down
* @return a new {@link SseExecutor}
*
*/
public static SseExecutor newInstance(final CloseableHttpAsyncClient client) {
Args.notNull(client, "HTTP Async Client");
Expand All @@ -164,6 +168,7 @@ public static SseExecutor newInstance(final CloseableHttpAsyncClient client) {
* Closes and clears the shared async client, if present.
*
* <p>Useful for tests or orderly application shutdown.</p>
* @throws IOException if closing the client fails
*/
public static void closeSharedClient() throws IOException {
LOCK.lock();
Expand Down Expand Up @@ -213,6 +218,7 @@ public static void closeSharedClient() throws IOException {
/**
* Closes the underlying async client if this executor does <em>not</em> use
* the process-wide shared client. No-op otherwise.
* @throws IOException if closing the client fails
*/
public void close() throws IOException {
if (!isSharedClient) {
Expand All @@ -225,6 +231,7 @@ public void close() throws IOException {
*
* @param uri target SSE endpoint (must produce {@code text/event-stream})
* @param listener event callbacks
* @return the created {@link EventSource}
*/
public EventSource open(final URI uri, final EventSourceListener listener) {
return open(uri, this.defaultHeaders, listener, this.defaultConfig,
Expand All @@ -237,6 +244,7 @@ public EventSource open(final URI uri, final EventSourceListener listener) {
* @param uri target SSE endpoint
* @param headers extra request headers (merged with executor defaults)
* @param listener event callbacks
* @return the created {@link EventSource}
*/
public EventSource open(final URI uri,
final Map<String, String> headers,
Expand All @@ -252,6 +260,7 @@ public EventSource open(final URI uri,
* @param headers extra request headers (merged with executor defaults)
* @param listener event callbacks
* @param config reconnect/backoff config
* @return the created {@link EventSource}
*/
public EventSource open(final URI uri,
final Map<String, String> headers,
Expand All @@ -271,6 +280,7 @@ public EventSource open(final URI uri,
* @param parser parsing strategy ({@link SseParser#CHAR} or {@link SseParser#BYTE})
* @param scheduler scheduler for reconnects (nullable → internal shared scheduler)
* @param callbackExecutor executor for listener callbacks (nullable → run inline)
* @return the created {@link EventSource}
*/
public EventSource open(final URI uri,
final Map<String, String> headers,
Expand All @@ -292,6 +302,7 @@ public EventSource open(final URI uri,

/**
* Returns the underlying {@link CloseableHttpAsyncClient}.
* @return the client
*/
public CloseableHttpAsyncClient getClient() {
return client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public final class SseExecutorBuilder {
/**
* Supplies a custom async HTTP client. The caller owns its lifecycle and
* {@link SseExecutor#close()} will close it.
* @param client the client to use
* @return this builder
*/
public SseExecutorBuilder setHttpClient(final CloseableHttpAsyncClient client) {
this.client = Args.notNull(client, "HTTP Async Client");
Expand All @@ -86,6 +88,8 @@ public SseExecutorBuilder setHttpClient(final CloseableHttpAsyncClient client) {
/**
* Sets the scheduler to use for reconnect delays. If not provided, the internal shared
* scheduler is used.
* @param scheduler the scheduler to use
* @return this builder
*/
public SseExecutorBuilder setScheduler(final ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
Expand All @@ -95,6 +99,8 @@ public SseExecutorBuilder setScheduler(final ScheduledExecutorService scheduler)
/**
* Sets the executor used to dispatch {@link EventSourceListener} callbacks.
* If not provided, callbacks run inline on the I/O thread.
* @param callbackExecutor the executor to use
* @return this builder
*/
public SseExecutorBuilder setCallbackExecutor(final Executor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
Expand All @@ -103,6 +109,8 @@ public SseExecutorBuilder setCallbackExecutor(final Executor callbackExecutor) {

/**
* Sets the default reconnect/backoff configuration applied to opened streams.
* @param cfg the reconnect configuration
* @return this builder
*/
public SseExecutorBuilder setEventSourceConfig(final EventSourceConfig cfg) {
this.config = Args.notNull(cfg, "EventSourceConfig");
Expand All @@ -111,6 +119,8 @@ public SseExecutorBuilder setEventSourceConfig(final EventSourceConfig cfg) {

/**
* Replaces the default headers (sent on every opened stream).
* @param headers the headers to use
* @return this builder
*/
public SseExecutorBuilder setDefaultHeaders(final Map<String, String> headers) {
this.defaultHeaders.clear();
Expand All @@ -122,6 +132,9 @@ public SseExecutorBuilder setDefaultHeaders(final Map<String, String> headers) {

/**
* Adds or replaces a single default header.
* @param name the header name
* @param value the header value
* @return this builder
*/
public SseExecutorBuilder addDefaultHeader(final String name, final String value) {
this.defaultHeaders.put(Args.notNull(name, "name"), value);
Expand All @@ -131,6 +144,8 @@ public SseExecutorBuilder addDefaultHeader(final String name, final String value
/**
* Chooses the parser strategy: {@link SseParser#CHAR} (spec-level, default)
* or {@link SseParser#BYTE} (byte-level framing with minimal decoding).
* @param parser the parser strategy to use
* @return this builder
*/
public SseExecutorBuilder setParserStrategy(final SseParser parser) {
this.parserStrategy = parser != null ? parser : SseParser.CHAR;
Expand All @@ -139,6 +154,7 @@ public SseExecutorBuilder setParserStrategy(final SseParser parser) {

/**
* Builds the {@link SseExecutor}.
* @return a new {@link SseExecutor}
*/
public SseExecutor build() {
final CloseableHttpAsyncClient c = (client != null) ? client : SseExecutor.getSharedClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
* <li>{@code retry} is parsed without creating a substring; only {@code data/event/id} values
* create substrings when needed.</li>
* </ul>
* </p>
*/
@Internal
public final class ServerSentEventReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public enum SseParser {
*/
CHAR,
/**
* ByteBuffer → byte-level framing & minimal decode.
* ByteBuffer → byte-level framing &amp; minimal decode.
*/
BYTE
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,31 +54,31 @@
* </ul>
*
* <h2>Quick start</h2>
* <pre>{@code
* <pre>
* import java.net.URI;
* import java.util.Collections;
* import org.apache.hc.client5.http.sse.*;
*
* SseExecutor exec = SseExecutor.newInstance(); // shared async client
*
* EventSourceListener listener = new EventSourceListener() {
* @Override public void onOpen() { System.out.println("open"); }
* @Override public void onEvent(String id, String type, String data) {
* &#64;Override public void onOpen() { System.out.println("open"); }
* &#64;Override public void onEvent(String id, String type, String data) {
* System.out.println(type + " id=" + id + " data=" + data);
* }
* @Override public void onClosed() { System.out.println("closed"); }
* @Override public void onFailure(Throwable t, boolean willReconnect) {
* &#64;Override public void onClosed() { System.out.println("closed"); }
* &#64;Override public void onFailure(Throwable t, boolean willReconnect) {
* t.printStackTrace();
* }
* };
*
* EventSource es = exec.open(URI.create("https://example.com/stream"),
* Collections.<String,String>emptyMap(),
* Collections.&lt;String,String&gt;emptyMap(),
* listener);
* es.start();
*
* Runtime.getRuntime().addShutdownHook(new Thread(es::cancel));
* }</pre>
* </pre>
*
* <h2>Configuration</h2>
* <ul>
Expand Down
11 changes: 10 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<tag>5.7-alpha1-SNAPSHOT</tag>
</scm>

<distributionManagement>
<distributionManagement>
<site>
<id>apache.website</id>
<name>Apache HttpComponents Website</name>
Expand Down Expand Up @@ -493,6 +493,15 @@
<title>Apache HttpClient Windows features</title>
<packages>org.apache.hc.client5.http.impl.win*</packages>
</group>
<group>
<title>Apache HttpClient Observations</title>
<packages>org.apache.hc.client5.http.observation*</packages>
</group>
<group>
<title>Apache HttpClient SSE</title>
<packages>org.apache.hc.client5.http.sse*</packages>
</group>

</groups>
</configuration>
<reportSets>
Expand Down