Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<groupId>io.axoniq</groupId>
<artifactId>axonserver-connector-java</artifactId>
<version>2025.2.2-SNAPSHOT</version>
<version>2026.0.0-SNAPSHOT</version>

<name>AxonServer Connector</name>
<description>
Expand Down Expand Up @@ -422,6 +422,8 @@
</executions>
<configuration>
<encoding>UTF-8</encoding>
<failOnError>false</failOnError>
<failOnWarnings>false</failOnWarnings>
<tags>
<tag>
<name>apiNote</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@ public class AxonServerConnectionFactory {
private volatile boolean shutdown;

/**
* Instantiates an {@link AxonServerConnectionFactory} with the given {@code builder}.
* Instantiates an {@link AxonServerConnectionFactory} with the given {@code builder}. The clientInstanceId is
* postfixed by a random hex string to ensure uniqueness between different instances of the application.
*
* @param builder the {@link Builder} used to set all the specifics of an {@link AxonServerConnectionFactory}
*/
protected AxonServerConnectionFactory(Builder builder) {
this.componentName = builder.componentName;
this.clientInstanceId = builder.clientInstanceId;
this.clientInstanceId = builder.clientInstanceId + "-" + randomHex(8);
this.token = builder.token;
this.tags.putAll(builder.tags);
this.executorService = builder.executorService;
Expand All @@ -121,7 +122,6 @@ protected AxonServerConnectionFactory(Builder builder) {
* information and should be the same only for instances of the same application or component.
*
* @param componentName The name of the component connecting to AxonServer
*
* @return a builder instance for further configuration of the connector
* @see #forClient(String, String)
*/
Expand All @@ -131,8 +131,8 @@ public static Builder forClient(String componentName) {


/**
* Returns a builder to configure a ConnectionFactory instance for the given {@code componentName} and {@code
* clientInstanceId}.
* Returns a builder to configure a ConnectionFactory instance for the given {@code componentName} and
* {@code clientInstanceId}.
* <p>
* The clientInstanceId MUST be a unique value across all instances that connect to AxonServer. The componentName is
* used in monitoring information and should be the same only for instances of the same application or component.
Expand All @@ -142,7 +142,6 @@ public static Builder forClient(String componentName) {
*
* @param componentName The name of the component connecting to AxonServer
* @param clientInstanceId The unique instance identifier for this instance of the component
*
* @return a builder instance for further configuration of the connector
* @see #forClient(String)
*/
Expand All @@ -154,7 +153,6 @@ public static Builder forClient(String componentName, String clientInstanceId) {
* Connects to the given {@code context} using the settings defined in this ConnectionFactory.
*
* @param context The name of the context to connect to
*
* @return a Connection allowing interaction with the mentioned context
*/
public AxonServerConnection connect(String context) {
Expand Down Expand Up @@ -206,7 +204,8 @@ private ManagedChannel createChannel(ServerAddress address, String context) {
return builder.intercept(
new GrpcBufferingInterceptor(50),
new HeaderAttachingInterceptor<>(Headers.CONTEXT, context),
new HeaderAttachingInterceptor<>(Headers.ACCESS_TOKEN, token)
new HeaderAttachingInterceptor<>(Headers.ACCESS_TOKEN, token),
new HeaderAttachingInterceptor<>(Headers.CLIENT_ID, clientInstanceId)
).build();
}

Expand Down Expand Up @@ -234,12 +233,24 @@ public void shutdown() {
}
}

/**
* Returns the unique client instance identifier used by this ConnectionFactory. This is the value configured as
* {@code clientInstanceId} on the Builder, postfixed with a random hex string to ensure uniqueness between
* different instances of the application.
*
* @return the unique client instance identifier used by this ConnectionFactory
*/
public String getClientInstanceId() {
return clientInstanceId;
}

/**
* Builder for AxonServerConnectionFactory instances. The methods on this class allow for configuration of the
* {@link AxonServerConnectionFactory} instance used to connect to an AxonServer (cluster).
* <p>
* This class is not intended to be instantiated directly, but rather through {@link
* AxonServerConnectionFactory#forClient(String)} or {@link AxonServerConnectionFactory#forClient(String, String)}.
* This class is not intended to be instantiated directly, but rather through
* {@link AxonServerConnectionFactory#forClient(String)} or
* {@link AxonServerConnectionFactory#forClient(String, String)}.
*/
public static class Builder {

Expand Down Expand Up @@ -281,7 +292,6 @@ protected Builder(String componentName, String clientInstanceId) {
* Defaults to "localhost:8024".
*
* @param serverAddresses The addresses to try to set up the initial connection with.
*
* @return this builder for further configuration
*/
public Builder routingServers(ServerAddress... serverAddresses) {
Expand All @@ -297,7 +307,6 @@ public Builder routingServers(ServerAddress... serverAddresses) {
*
* @param interval The amount of time to wait in between connection attempts
* @param timeUnit The unit in which the interval is expressed
*
* @return this builder for further configuration
*/
public Builder reconnectInterval(long interval, TimeUnit timeUnit) {
Expand All @@ -312,7 +321,6 @@ public Builder reconnectInterval(long interval, TimeUnit timeUnit) {
*
* @param timeout The amount of time to wait for a connection to be established
* @param timeUnit The unit in which the timout is expressed
*
* @return this builder for further configuration
*/
public Builder connectTimeout(long timeout, TimeUnit timeUnit) {
Expand All @@ -330,7 +338,6 @@ public Builder connectTimeout(long timeout, TimeUnit timeUnit) {
* By default, no tags are defined.
*
* @param additionalClientTags additional tags that define this client component
*
* @return this builder for further configuration
*/
public Builder clientTags(Map<String, String> additionalClientTags) {
Expand All @@ -349,7 +356,6 @@ public Builder clientTags(Map<String, String> additionalClientTags) {
*
* @param key the key of the Tag to configure
* @param value the value of the Tag to configure
*
* @return this builder for further configuration
*/
public Builder clientTag(String key, String value) {
Expand All @@ -364,7 +370,6 @@ public Builder clientTag(String key, String value) {
* AxonServer.
*
* @param token The token to which the required authorizations have been assigned.
*
* @return this builder for further configuration
*/
public Builder token(String token) {
Expand All @@ -391,7 +396,6 @@ public Builder useTransportSecurity() {
* Defaults to not using TLS.
*
* @param sslContext The context defining TLS parameters
*
* @return this builder for further configuration
* @see SslContextBuilder#forClient()
*/
Expand All @@ -403,14 +407,13 @@ public Builder useTransportSecurity(SslContext sslContext) {
/**
* Indicates whether the connector should always reconnect via the Routing Servers. When {@code true} (default),
* the connector will contact the Routing Servers for a new destination each time a connection is dropped. When
* {@code false}, the connector will first attempt to re-establish a connection to the node is was
* previously connected to. When that fails, only then will it contact the Routing Servers.
* {@code false}, the connector will first attempt to re-establish a connection to the node is was previously
* connected to. When that fails, only then will it contact the Routing Servers.
* <p>
* Default to {@code true}, forcing the failed connection to be abandoned and a new one to be requested via the
* routing servers.
*
* @param forceReconnectViaRoutingServers whether to force a reconnect to the Cluster via the RoutingServers.
*
* @return this builder for further configuration
*/
public Builder forceReconnectViaRoutingServers(boolean forceReconnectViaRoutingServers) {
Expand All @@ -426,7 +429,6 @@ public Builder forceReconnectViaRoutingServers(boolean forceReconnectViaRoutingS
* Defaults to 2.
*
* @param poolSize The number of threads to assign to Connection related activities.
*
* @return this builder for further configuration
*/
public Builder threadPoolSize(int poolSize) {
Expand All @@ -449,7 +451,6 @@ public Builder threadPoolSize(int poolSize) {
* @param interval time without read activity before sending a keepalive ping
* @param timeout the time waiting for read activity after sending a keepalive ping
* @param timeUnit the unit in which the interval and timeout are expressed
*
* @return this builder for further configuration
*/
public Builder usingKeepAlive(long interval, long timeout, TimeUnit timeUnit, boolean keepAliveWithoutCalls) {
Expand All @@ -466,7 +467,6 @@ public Builder usingKeepAlive(long interval, long timeout, TimeUnit timeUnit, bo
* Default to 4 MiB.
*
* @param bytes The number of bytes to limit inbound message to
*
* @return this builder for further configuration
*/
public Builder maxInboundMessageSize(int bytes) {
Expand All @@ -482,7 +482,6 @@ public Builder maxInboundMessageSize(int bytes) {
* feature.
*
* @param customization A function defining the customization to make on the ManagedChannelBuilder
*
* @return this builder for further configuration
*/
public Builder customize(UnaryOperator<ManagedChannelBuilder<?>> customization) {
Expand All @@ -496,7 +495,6 @@ public Builder customize(UnaryOperator<ManagedChannelBuilder<?>> customization)
*
* @param interval The interval in which to send status updates
* @param unit The unit of time in which the interval is expressed
*
* @return this builder for further configuration
*/
public Builder processorInfoUpdateFrequency(long interval, TimeUnit unit) {
Expand All @@ -511,7 +509,6 @@ public Builder processorInfoUpdateFrequency(long interval, TimeUnit unit) {
* Values lower than 16 will be replaced with 16.
*
* @param permits The number of initial permits
*
* @return this builder for further configuration
*/
public Builder queryPermits(int permits) {
Expand All @@ -526,7 +523,6 @@ public Builder queryPermits(int permits) {
* Values lower than 16 will be replaced with 16.
*
* @param permits The number of initial permits
*
* @return this builder for further configuration
*/
public Builder commandPermits(int permits) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public synchronized void connect() {
responseObserver.getInstructionsForPlatform();

try {
logger.info("Connected instruction stream for context '{}'. Sending client identification", context);
logger.info("Connected instruction stream for context '{}'. Sending client identification with clientId {}", context, clientIdentification.getClientId());
instructionsForPlatform.onNext(PlatformInboundInstruction.newBuilder()
.setRegister(clientIdentification)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ public abstract class Headers {
public static final Metadata.Key<String> ACCESS_TOKEN =
Metadata.Key.of("AxonIQ-Access-Token", Metadata.ASCII_STRING_MARSHALLER);

/**
* A {@link Metadata.Key} defining the client ID of the application sending this message.
*/
public static final Metadata.Key<String> CLIENT_ID =
Metadata.Key.of("AxonIQ-ClientId", Metadata.ASCII_STRING_MARSHALLER);

private Headers() {
// Utility class
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ void moveSegmentInstructionIsPickedUpByHandler() throws Exception {

String segmentToMove = "0";
String segmentsPath = "/v1/components/" + getClass().getSimpleName() + "/processors/testProcessor/segments/" +
segmentToMove + "/move?tokenStoreIdentifier=TokenStoreId&context=default&target=foo";
segmentToMove + "/move?tokenStoreIdentifier=TokenStoreId&context=default&target=" + clientToMoveTo.getClientInstanceId();
assertWithin(2, TimeUnit.SECONDS, () -> sendToAxonServer(HttpPatch::new, segmentsPath));
assertWithin(2, TimeUnit.SECONDS,
() -> assertTrue(instructionHandler.instructions.contains("release" + segmentToMove)));
Expand Down