Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ object Routes {
object DeadLetter {
const val LETTERS = "dlq-query-dead-letters"
const val SEQUENCE_SIZE = "dlq-query-dead-letter-sequence-size"
// Paginated lookup of letters within a single sequence — added in AF5 framework-client
// 5.1.0 so the platform UI can browse very long sequences without loading them all.
const val SEQUENCE_LETTERS = "dlq-query-dead-letter-sequence-letters"

const val DELETE_SEQUENCE = "dlq-command-delete-sequence"
const val DELETE_ALL_SEQUENCES = "dlq-command-delete-all-sequences"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ data class SupportedFeatures(
val pauseReports: Boolean? = false,
/* Whether the client supports thread dumps.*/
val threadDump: Boolean? = false,
/* Whether the client supports DLQ insights. Can be FULL, LIMITED, MASKED, or NONE (default).*/
val deadLetterQueuesInsights: AxoniqConsoleDlqMode = AxoniqConsoleDlqMode.NONE,
/* DLQ insight level for this client. `null` means the application has no DLQ library on
* its classpath, so DLQ inspection isn't a feature of this client at all (distinct from
* `NONE`, which means the feature exists but the operator hid all letter data). */
val deadLetterQueuesInsights: AxoniqConsoleDlqMode? = null,
/* Whether the client supports domain events insights. Can be FULL, LOAD_DOMAIN_STATE_ONLY, PREVIEW_PAYLOAD_ONLY, or NONE (default).*/
val domainEventsInsights: DomainEventAccessMode = DomainEventAccessMode.NONE,
/* Whether the client supports client status updates .*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,28 @@ data class DeadLetterProcessRequest(
val processingGroup: String,
val messageIdentifier: String
)

/**
* Request paginated letters belonging to a single sequence inside the DLQ. Used by the platform UI
* detail modal to browse long sequences without loading them all up-front.
*
* @param processingGroup The processing group / DLQ identifier.
* @param sequenceIdentifier Synthetic sequence id as previously returned by [DeadLetter.sequenceIdentifier].
* @param offset Zero-based offset into the sequence.
* @param size Number of letters to return (capped server-side).
*/
data class FetchSequenceLettersRequest(
val processingGroup: String,
val sequenceIdentifier: String,
val offset: Int,
val size: Int,
)

/**
* Response payload for [FetchSequenceLettersRequest]. Carries the requested slice of letters along
* with the total number of letters in the sequence so the UI can render full pagination.
*/
data class SequenceLettersResponse(
val letters: List<DeadLetter>,
val totalCount: Long = letters.size.toLong(),
)
6 changes: 6 additions & 0 deletions framework-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.axoniq.framework</groupId>
<artifactId>axoniq-dead-letter</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>tools.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package io.axoniq.platform.framework;

import io.axoniq.platform.framework.api.DomainEventAccessMode;
import io.axoniq.platform.framework.api.AxoniqConsoleDlqMode;
import org.axonframework.common.BuilderUtils;

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -44,6 +48,9 @@ public class AxoniqPlatformConfiguration {

private DomainEventAccessMode domainEventAccessMode = DomainEventAccessMode.NONE;

private AxoniqConsoleDlqMode dlqMode = AxoniqConsoleDlqMode.NONE;
private List<String> dlqDiagnosticsWhitelist = new ArrayList<>();

/**
* Constructor to instantiate a {@link AxoniqPlatformConfiguration} based on the fields contained in the
* {@link AxoniqPlatformConfiguration}. Requires the {@code environmentId}, {@code accessToken} and
Expand Down Expand Up @@ -208,4 +215,44 @@ public Long getInitialDelay() {
public DomainEventAccessMode getDomainEventAccessMode() {
return domainEventAccessMode;
}

/**
* Controls how much DLQ data is exposed through the platform API. Defaults to
* {@link AxoniqConsoleDlqMode#NONE} so applications must deliberately opt into exposing letter
* contents (which may include personal data and would make the platform a data processor under
* GDPR). Use {@code MASKED} when sequence identifiers must still be addressable but contents
* must not leak, {@code LIMITED} to strip payload but keep sequence identifiers as-is for
* filtered diagnostics, or {@code FULL} for unrestricted access (typically only safe in
* development).
*
* @param dlqMode The dead-letter exposure mode.
* @return The builder for fluent interfacing.
*/
public AxoniqPlatformConfiguration dlqMode(AxoniqConsoleDlqMode dlqMode) {
BuilderUtils.assertNonNull(dlqMode, "Axoniq Platform dlqMode may not be null");
this.dlqMode = dlqMode;
return this;
}

/**
* Adds a diagnostics metadata key to the whitelist that survives {@link AxoniqConsoleDlqMode#LIMITED}
* and {@link AxoniqConsoleDlqMode#MASKED} modes. All other keys are dropped before the letter
* leaves this client.
*
* @param key The diagnostics metadata key to permit.
* @return The builder for fluent interfacing.
*/
public AxoniqPlatformConfiguration addDlqDiagnosticsWhitelistKey(String key) {
BuilderUtils.assertNonEmpty(key, "Axoniq Platform diagnostics whitelist key may not be null or empty");
this.dlqDiagnosticsWhitelist.add(key);
return this;
}

public AxoniqConsoleDlqMode getDlqMode() {
return dlqMode;
}

public List<String> getDlqDiagnosticsWhitelist() {
return Collections.unmodifiableList(dlqDiagnosticsWhitelist);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.axoniq.platform.framework.client

import io.axoniq.platform.framework.AxoniqPlatformConfiguration
import io.axoniq.platform.framework.api.AxonServerEventStoreMessageSourceInformation
import io.axoniq.platform.framework.api.AxoniqConsoleDlqMode
import io.axoniq.platform.framework.api.CommandBusInformation
import io.axoniq.platform.framework.api.DomainEventAccessMode
import io.axoniq.platform.framework.api.CommonProcessorInformation
Expand Down Expand Up @@ -87,6 +88,7 @@ class SetupPayloadCreator(
licenseEntitlement = hasEntitlementManager(),
modelInspection = hasStateManager(),
domainEventsInsights = resolveDomainEventAccessMode(),
deadLetterQueuesInsights = resolveDeadLetterQueuesInsights(),
)
)
}
Expand Down Expand Up @@ -363,6 +365,38 @@ class SetupPayloadCreator(
}
}

/**
* Resolves the [AxoniqPlatformConfiguration] from the application configuration, returning `null`
* when the platform module hasn't been wired (legacy or non-Spring setups). The caller falls back
* to `NONE` so applications that haven't deliberately opted in stay closed by default — exposing
* letter contents (which can include personal data) requires an explicit `dlqMode` override
* (`LIMITED`/`MASKED`/`FULL`) on the application's [AxoniqPlatformConfiguration].
*/
private fun axoniqPlatformConfiguration(): AxoniqPlatformConfiguration? =
configuration.getOptionalComponent(AxoniqPlatformConfiguration::class.java).orElse(null)

/**
* Returns the DLQ insight level reported on the setup payload, or `null` when this application
* has no DLQ library on its classpath (in which case DLQ inspection isn't a feature of this
* client at all — semantically distinct from [AxoniqConsoleDlqMode.NONE], which means the feature
* exists but the operator hid all letter data).
*/
private fun resolveDeadLetterQueuesInsights(): AxoniqConsoleDlqMode? {
if (!isDeadLetterLibraryAvailable()) return null
return axoniqPlatformConfiguration()?.dlqMode ?: AxoniqConsoleDlqMode.NONE
}

private fun isDeadLetterLibraryAvailable(): Boolean = try {
Class.forName(
"io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue",
false,
SetupPayloadCreator::class.java.classLoader,
)
true
} catch (_: ClassNotFoundException) {
false
}

/**
* Checks whether the PlatformLicenseSource have been configured, in which case we want updates of licenses from Platform.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.platform.framework.eventprocessor;

import io.axoniq.platform.framework.AxoniqPlatformConfiguration;
import io.axoniq.platform.framework.client.RSocketHandlerRegistrar;
import org.axonframework.common.configuration.ComponentDefinition;
import org.axonframework.common.configuration.ComponentRegistry;
import org.axonframework.common.configuration.ConfigurationEnhancer;
import org.axonframework.common.lifecycle.Phase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER;

/**
* Service-loaded enhancer that registers the dead-letter queue inspection components only when the
* {@code axoniq-dead-letter} module is present on the classpath. Kept free of direct references to
* {@link DeadLetterManager} or {@link RSocketDlqResponder} (which import optional types) so the class can be
* loaded even when the addon is absent.
*/
public class AxoniqPlatformDeadLetterConfigurerEnhancer implements ConfigurationEnhancer {

private static final Logger LOGGER =
LoggerFactory.getLogger(AxoniqPlatformDeadLetterConfigurerEnhancer.class);
private static final String DEAD_LETTER_PROBE_CLASS =
"io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue";

@Override
public void enhance(ComponentRegistry registry) {
if (!registry.hasComponent(AxoniqPlatformConfiguration.class)) {
return;
}
// Enhancers can be invoked more than once during context refresh — bail out if the DLQ
// components are already registered to avoid ComponentOverrideException.
if (registry.hasComponent(DeadLetterManager.class)
|| registry.hasComponent(ProcessingGroupInfoSource.class)) {
return;
}
if (!isClasspathAvailable()) {
LOGGER.debug("axoniq-dead-letter not on classpath; skipping dead-letter queue inspection wiring.");
return;
}
register(registry);
}

@Override
public int order() {
// Run after the main platform enhancer so the RSocketHandlerRegistrar component is already declared.
return PLATFORM_ENHANCER_ORDER + 1;
}

private static void register(ComponentRegistry registry) {
registry.registerComponent(ComponentDefinition
.ofType(DeadLetterManager.class)
.withBuilder(c -> {
AxoniqPlatformConfiguration platformConfig =
c.getComponent(AxoniqPlatformConfiguration.class);
return new DeadLetterManager(
c,
platformConfig.getDlqMode(),
platformConfig.getDlqDiagnosticsWhitelist());
})
// Discover DLQs after event processors have started, by which point the
// EventHandlingComponent decorator chain has materialised every DLQ.
.onStart(Phase.INSTRUCTION_COMPONENTS, DeadLetterManager::start));

// The Spring-backed ComponentRegistry exposes a registered component under all of its
// implemented interfaces automatically, so registering DeadLetterManager already makes
// ProcessingGroupInfoSource available. The plain AF5 ComponentRegistry is exact-typed
// though, so only register the seam there to keep ProcessorReportCreator's lookup
// (`getOptionalComponent(ProcessingGroupInfoSource.class)`) working in both worlds.
if (!registry.hasComponent(ProcessingGroupInfoSource.class)) {
registry.registerComponent(ComponentDefinition
.ofType(ProcessingGroupInfoSource.class)
.withBuilder(c -> c.getComponent(DeadLetterManager.class)));
}

registry.registerComponent(ComponentDefinition
.ofType(RSocketDlqResponder.class)
.withBuilder(c -> new RSocketDlqResponder(
c.getComponent(DeadLetterManager.class),
c.getComponent(RSocketHandlerRegistrar.class)))
.onStart(Phase.EXTERNAL_CONNECTIONS, RSocketDlqResponder::start));
}

private static boolean isClasspathAvailable() {
try {
Class.forName(DEAD_LETTER_PROBE_CLASS, false,
AxoniqPlatformDeadLetterConfigurerEnhancer.class.getClassLoader());
return true;
} catch (ClassNotFoundException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,14 @@ import org.axonframework.messaging.core.Message
import org.axonframework.messaging.core.MessageStream
import org.axonframework.messaging.core.QualifiedName
import org.axonframework.messaging.core.unitofwork.ProcessingContext
import org.axonframework.messaging.eventhandling.EventHandler
import org.axonframework.messaging.eventhandling.EventHandlerRegistry
import org.axonframework.messaging.eventhandling.EventHandlingComponent
import org.axonframework.messaging.eventhandling.EventMessage
import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.Segment
import org.axonframework.messaging.eventhandling.replay.ResetContext
import java.time.Instant
import java.time.temporal.ChronoUnit

class AxoniqPlatformEventHandlingComponent(
private val delegate: EventHandlingComponent,
val delegate: EventHandlingComponent,
private val processorName: String,
private val handlerMetricsRegistry: HandlerMetricsRegistry,
private val processorMetricRegistry: ProcessorMetricsRegistry
Expand Down
Loading
Loading