Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,9 @@ jobs:
DAPR_REF:
TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64
steps:
- name: Install Stable Docker
id: setup_docker
uses: docker/setup-docker-action@v4
- name: Check Docker version
run: docker version
- uses: actions/checkout@v5
- name: Check Docker version
run: docker version
- name: Set up OpenJDK ${{ env.JDK_VER }}
uses: actions/setup-java@v5
with:
Expand Down Expand Up @@ -146,8 +143,6 @@ jobs:
- name: Integration tests using spring boot version ${{ matrix.spring-boot-version }}
id: integration_tests
run: PRODUCT_SPRING_BOOT_VERSION=${{ matrix.spring-boot-version }} ./mvnw -B -Pintegration-tests dependency:copy-dependencies verify
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Upload failsafe test report for sdk-tests on failure
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
uses: actions/upload-artifact@v4
Expand Down
42 changes: 0 additions & 42 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ jobs:
with:
distribution: 'temurin'
java-version: ${{ env.JDK_VER }}
- name: Install Stable Docker
id: setup_docker
uses: docker/setup-docker-action@v4
- name: Check Docker version
run: docker version
- name: Set up Dapr CLI
Expand Down Expand Up @@ -109,114 +106,75 @@ jobs:
run: sleep 30 && docker logs dapr_scheduler && nc -vz localhost 50006
- name: Install jars
run: ./mvnw clean install -DskipTests -q
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate workflows example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/workflows/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate Spring Boot examples
working-directory: ./spring-boot-examples
run: |
mm.py README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate Spring Boot Workflow examples
working-directory: ./spring-boot-examples/workflows
run: |
mm.py README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate Jobs example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/jobs/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate conversation ai example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/conversation/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate invoke http example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/invoke/http/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate invoke grpc example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/invoke/grpc/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate tracing example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/tracing/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate expection handling example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/exception/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate state example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/state/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate pubsub example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate bindings HTTP example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/bindings/http/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate secrets example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/secrets/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate unit testing example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/unittesting/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate Configuration API example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/configuration/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate actors example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/actors/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate query state HTTP example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/querystate/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate streaming subscription example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/stream/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}

30 changes: 12 additions & 18 deletions examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The subscriber uses the `DaprPreviewClient` interface to use a new feature where

The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.

In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription.
In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux<CloudEvent<T>>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.

```java
public class Subscriber {
Expand All @@ -59,25 +59,19 @@ public class Subscriber {
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
var subscription = client.subscribeToEvents(
// Subscribe to events using the Flux-based reactive API
// The stream will emit CloudEvent<String> objects as they arrive
client.subscribeToEvents(
PUBSUB_NAME,
topicName,
new SubscriptionListener<>() {

@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
System.out.println("Subscriber got: " + event.getData());
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
},
TypeRef.STRING);

subscription.awaitTermination();
TypeRef.STRING)
.doOnNext(event -> {
System.out.println("Subscriber got: " + event.getData());
})
.doOnError(throwable -> {
System.out.println("Subscriber got exception: " + throwable.getMessage());
})
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
package io.dapr.examples.pubsub.stream;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.SubscriptionListener;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;

/**
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
Expand All @@ -44,25 +41,19 @@ public class Subscriber {
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
var subscription = client.subscribeToEvents(
// Subscribe to events using the Flux-based reactive API
// The stream will emit CloudEvent<String> objects as they arrive
client.subscribeToEvents(
PUBSUB_NAME,
topicName,
new SubscriptionListener<>() {

@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
System.out.println("Subscriber got: " + event.getData());
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
},
TypeRef.STRING);

subscription.awaitTermination();
TypeRef.STRING)
.doOnNext(event -> {
System.out.println("Subscriber got: " + event.getData());
})
.doOnError(throwable -> {
System.out.println("Subscriber got exception: " + throwable.getMessage());
})
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
Expand Down Expand Up @@ -55,6 +56,7 @@
public class WorkflowsCrossAppCallActivityIT {

private static final Network DAPR_NETWORK = Network.newNetwork();
private static final DockerImageName JAVA_WORKER_IMAGE = DockerImageName.parse("eclipse-temurin:17-jdk");

@Container
private final static DaprPlacementContainer sharedPlacementContainer = new DaprPlacementContainer(DAPR_PLACEMENT_IMAGE_TAG)
Expand Down Expand Up @@ -113,7 +115,7 @@ public class WorkflowsCrossAppCallActivityIT {

// TestContainers for each app
@Container
private static GenericContainer<?> crossappWorker = new GenericContainer<>("openjdk:17-jdk-slim")
private static GenericContainer<?> crossappWorker = new GenericContainer<>(JAVA_WORKER_IMAGE)
.withCopyFileToContainer(MountableFile.forHostPath("target"), "/app")
.withWorkingDirectory("/app")
.withCommand("java", "-cp", "test-classes:classes:dependency/*:*",
Expand All @@ -127,7 +129,7 @@ public class WorkflowsCrossAppCallActivityIT {
.withLogConsumer(outputFrame -> System.out.println("CrossAppWorker: " + outputFrame.getUtf8String()));

@Container
private final static GenericContainer<?> app2Worker = new GenericContainer<>("openjdk:17-jdk-slim")
private final static GenericContainer<?> app2Worker = new GenericContainer<>(JAVA_WORKER_IMAGE)
.withCopyFileToContainer(MountableFile.forHostPath("target"), "/app")
.withWorkingDirectory("/app")
.withCommand("java", "-cp", "test-classes:classes:dependency/*:*",
Expand All @@ -141,7 +143,7 @@ public class WorkflowsCrossAppCallActivityIT {
.withLogConsumer(outputFrame -> System.out.println("App2Worker: " + outputFrame.getUtf8String()));

@Container
private final static GenericContainer<?> app3Worker = new GenericContainer<>("openjdk:17-jdk-slim")
private final static GenericContainer<?> app3Worker = new GenericContainer<>(JAVA_WORKER_IMAGE)
.withCopyFileToContainer(MountableFile.forHostPath("target"), "/app")
.withWorkingDirectory("/app")
.withCommand("java", "-cp", "test-classes:classes:dependency/*:*",
Expand Down
37 changes: 37 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.internal.subscription.EventSubscriberStreamObserver;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.DefaultContentTypeConverter;
Expand Down Expand Up @@ -475,6 +476,42 @@ public <T> Subscription subscribeToEvents(
return buildSubscription(listener, type, request);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) {
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.build();
DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
.setInitialRequest(initialRequest)
.build();

return Flux.create(sink -> {
DaprGrpc.DaprStub interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
EventSubscriberStreamObserver<T> eventSubscriber = new EventSubscriberStreamObserver<>(
interceptedStub,
sink,
type,
this.objectSerializer
);
StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1> requestStream = eventSubscriber.start(request);

// Cleanup when Flux is cancelled or completed
sink.onDispose(() -> {
try {
requestStream.onCompleted();
} catch (Exception e) {
logger.debug("Completing the subscription stream resulted in an error: {}", e.getMessage());
}
});
}, FluxSink.OverflowStrategy.BUFFER);
}

@Nonnull
private <T> Subscription<T> buildSubscription(
SubscriptionListener<T> listener,
Expand Down
16 changes: 15 additions & 1 deletion sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.ConversationRequest;
import io.dapr.client.domain.ConversationRequestAlpha2;
import io.dapr.client.domain.ConversationResponse;
Expand All @@ -32,6 +33,7 @@
import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.query.Query;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
Expand Down Expand Up @@ -271,12 +273,24 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
* @param topic Name of the topic to subscribe to.
* @param listener Callback methods to process events.
* @param type Type for object deserialization.
* @return An active subscription.
* @param <T> Type of object deserialization.
* @return An active subscription.
* @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach.
*/
@Deprecated
<T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);

/**
* Subscribe to pubsub events via streaming using Project Reactor Flux.
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param type Type for object deserialization.
* @return A Flux of CloudEvents containing deserialized event payloads and metadata.
* @param <T> Type of the event payload.
*/
<T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);

/**
* Schedules a job using the provided job request details.
*
Expand Down
1 change: 1 addition & 0 deletions sdk/src/main/java/io/dapr/client/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Streaming subscription of events for Dapr's pubsub.
* @param <T> Application's object type.
*/
@Deprecated
public class Subscription<T> implements Closeable {

private final BlockingQueue<DaprProtos.SubscribeTopicEventsRequestAlpha1> ackQueue = new LinkedBlockingQueue<>(50);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Callback interface to receive events from a streaming subscription of events.
* @param <T> Object type for deserialization.
*/
@Deprecated
public interface SubscriptionListener<T> {

/**
Expand Down
Loading
Loading