Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
af97bb0
[feat][metrics] Add support for custom metric labels in topic policie…
coderzc Nov 10, 2025
5e2d51f
[feat][metrics] Enhance custom metric label handling for topic statis…
coderzc Nov 10, 2025
19c898f
[feat][metrics] Update custom metric label handling to improve consis…
coderzc Nov 10, 2025
cb81da8
[feat][metrics] Implement custom metric labels management for topics …
coderzc Nov 16, 2025
9ccf29a
[feat][metrics] Add option to remove all custom metric labels for topics
coderzc Nov 17, 2025
522e42f
Validate custom metric label keys and update configuration handling
coderzc Jan 15, 2026
785428e
Update topic policy operations to use CUSTOM_METRIC_LABELS
coderzc Jan 15, 2026
a9853c2
Merge branch 'master' into feat_pip_447
coderzc Jan 15, 2026
d72f97a
Filter custom metric labels by allowed keys in NamespaceStatsAggregator
coderzc Jan 15, 2026
da1b08c
Enhance custom metric label removal logic in TopicPolicies and CmdTopics
coderzc Jan 15, 2026
1a15c0a
Add PrometheusMetricsLabelsTest.java
coderzc Jan 15, 2026
3139520
Using topic properties instead of topic policy
coderzc Jan 16, 2026
db3ced5
[PIP-447] Implement namespace-level allowedTopicPropertiesForMetrics …
coderzc Feb 9, 2026
c2b3385
Remove deprecated CustomMetricLabels topic policy implementation
coderzc Feb 9, 2026
30b2bce
Refactor custom metric label key validation to accept allowed keys di…
coderzc Feb 9, 2026
b17260f
Refactor allowedTopicPropertiesForMetrics namespace APIs to use async…
coderzc Feb 9, 2026
ac4a063
Add tests for namespace-level allowedTopicPropertiesForMetrics and fi…
coderzc Feb 9, 2026
7039799
Update broker.conf and ServiceConfiguration to remove maxCustomMetric…
coderzc Feb 12, 2026
c431957
Remove zookeeper dependency and update license plugin exclusion to ig…
coderzc Feb 12, 2026
d884679
Merge branch 'master' into feat_pip_447
coderzc Feb 12, 2026
ab1acc0
improve code
coderzc Feb 12, 2026
74b4dd0
Treat null as unset for allowedTopicPropertiesForMetrics in namespace…
coderzc Feb 12, 2026
bbccea2
[improve][pip] Refactor custom Prometheus label extraction for topic …
coderzc Feb 12, 2026
e07ad86
[improve][pip] Move custom Metrics label extraction logic from TopicS…
coderzc Feb 12, 2026
804b972
revert code
coderzc Feb 12, 2026
30a7c01
[improve][pip] Rename PolicyName.CUSTOM_METRIC_LABELS to ALLOW_CUSTOM…
coderzc Feb 12, 2026
2e1cc8a
fix doc
coderzc Feb 12, 2026
4563d3a
[improve][pip] Refactor custom metric label validation to throw RestE…
coderzc Feb 12, 2026
c8a50d0
[improve][pip] Rename allowedTopicPropertiesForMetrics to allowedTopi…
coderzc Mar 6, 2026
3570243
[improve][pip] Rename allowedTopicPropertiesForMetrics to allowedTopi…
coderzc Mar 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,16 @@ metricsServletTimeoutMs=30000
# Enable or disable broker bundles metrics. The default value is false.
exposeBundlesMetricsInPrometheus=false

# Enable or disable custom topic metric labels feature.
# If enabled, custom metric labels can be set on topics and will be exposed in Prometheus metrics.
# Default is false.
exposeCustomTopicMetricLabelsEnabled=false

# A comma-separated list of Topic Property keys that are allowed to be exposed as metrics.
# Only these keys can be set as custom metric labels on topics.
# Example: sla_tier,data_sensitivity,cost_center,app_owner
allowedTopicPropertyKeysForMetrics=

### --- Functions --- ###

# Enable Functions Worker Service in Broker
Expand Down
55 changes: 35 additions & 20 deletions pip/pip-447.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ The primary goals of this proposal are to:

# High Level Design

Configuration Implementation: Add parameters to broker.conf to define the broker-level allow-list (e.g., allowedTopicPropertiesForMetrics). Add namespace policy settings to enable namespace-level allow-list configuration via admin API.
Configuration Implementation: Add parameters to broker.conf to define the broker-level allow-list (e.g., allowedTopicPropertyKeysForMetrics). Add namespace policy settings to enable namespace-level allow-list configuration via admin API.

Metrics Generation Modification: Update the PrometheusMetricsServlet (or equivalent exporter) to:

Expand All @@ -57,10 +57,25 @@ We utilize the existing Map<String, String> properties in topic metadata. No new

The Prometheus metrics generation component will be modified to:

1. Merge broker-level and namespace-level `allowedTopicPropertiesForMetrics` (union of both)
1. Get `allowedTopicPropertyKeysForMetrics` from broker config and namespace policy (namespace-level settings override broker-level)
2. For each topic, check if its properties match any allowed key
3. If matched, add the property as a Prometheus label

### Custom Metric Label Key Validation:

To ensure compatibility with Prometheus and prevent conflicts with internal metrics, custom metric label keys must pass the following validation rules:

1. **Non-empty**: The label key must not be null or empty.

2. **Valid characters**: Must match the regex `[a-zA-Z_][a-zA-Z0-9_]*` (starts with a letter or underscore, followed by letters, digits, or underscores).

3. **Reserved prefixes (internal use)**:
- Keys starting with `__` are reserved for Prometheus internal use.
- Keys starting with `pulsar_` or `pulsar.` are reserved for Pulsar internal use.
- Keys starting with `otel_` or `otel.` are reserved for OpenTelemetry internal use.

If a topic property key matches the allowed list but fails validation, it will be rejected with an error message indicating the invalid key and the expected format.

## Public-facing Changes

### Configuration
Expand All @@ -73,7 +88,7 @@ Description: Enables or disables the custom topic metric labels feature.

Default: false

allowedTopicPropertiesForMetrics=<key1>,<key2>,...
allowedTopicPropertyKeysForMetrics=<key1>,<key2>,...

Description: A comma-separated list of Topic Property keys that are allowed to be exposed as metrics. Only keys explicitly listed here will be exposed.

Expand All @@ -83,36 +98,36 @@ Default: Empty string (if the feature is enabled but no keys are defined, no cus

#### Namespace-Level Admin API

Namespace-level configuration allows per-namespace control of which topic properties can be exposed as metrics. The final allowed list is the union of broker-level and namespace-level settings.
Namespace-level configuration allows per-namespace control of which topic properties can be exposed as metrics. The namespace-level settings will override the broker-level settings for that specific namespace.

**REST API:**

- `POST /admin/v2/namespaces/{tenant}/{namespace}/allowedTopicPropertiesForMetrics` - Set allowed properties keys
- `GET /admin/v2/namespaces/{tenant}/{namespace}/allowedTopicPropertiesForMetrics` - Get allowed properties keys
- `DELETE /admin/v2/namespaces/{tenant}/{namespace}/allowedTopicPropertiesForMetrics` - Remove allowed properties keys
- `POST /admin/v2/namespaces/{tenant}/{namespace}/allowedTopicPropertyKeysForMetrics` - Set allowed properties keys
- `GET /admin/v2/namespaces/{tenant}/{namespace}/allowedTopicPropertyKeysForMetrics` - Get allowed properties keys
- `DELETE /admin/v2/namespaces/{tenant}/{namespace}/allowedTopicPropertyKeysForMetrics` - Remove allowed properties keys

**CLI Examples:**

```bash
# Set allowed properties keys
pulsar-admin namespaces set-allowed-topic-properties-for-metrics \
# Set allowed properties keys at the namespace level
pulsar-admin namespaces set-allowed-topic-property-keys-for-metrics \
--keys sla_tier,owner,environment \
my-tenant/my-namespace

# Get allowed properties keys
pulsar-admin namespaces get-allowed-topic-properties-for-metrics my-tenant/my-namespace
# Get allowed properties keys at the namespace level
pulsar-admin namespaces get-allowed-topic-property-keys-for-metrics my-tenant/my-namespace

# Remove allowed properties keys
pulsar-admin namespaces remove-allowed-topic-properties-for-metrics my-tenant/my-namespace
# Remove allowed properties keys at the namespace level, it will fall back to broker-level configuration
pulsar-admin namespaces remove-allowed-topic-property-keys-for-metrics my-tenant/my-namespace
```

**Java Admin API:**

```java
public interface Namespaces {
void setAllowedTopicPropertiesForMetrics(String namespace, Set<String> allowedKeys) throws PulsarAdminException;
Set<String> getAllowedTopicPropertiesForMetrics(String namespace) throws PulsarAdminException;
void removeAllowedTopicPropertiesForMetrics(String namespace) throws PulsarAdminException;
void setAllowedTopicPropertyKeysForMetrics(String namespace, Set<String> allowedKeys) throws PulsarAdminException;
Set<String> getAllowedTopicPropertyKeysForMetrics(String namespace) throws PulsarAdminException;
void removeAllowedTopicPropertyKeysForMetrics(String namespace) throws PulsarAdminException;
}
```

Expand All @@ -124,23 +139,23 @@ Disabled by Default: The feature will be disabled by default (exposeCustomTopicM

Existing Pulsar deployments will see no change in behavior or metric format.

No Impact if Unused: If the feature is enabled but allowedTopicPropertiesForMetrics is not configured or no labels are set on topics, metrics will remain unchanged.
No Impact if Unused: If the feature is enabled but allowedTopicPropertyKeysForMetrics is not configured or no labels are set on topics, metrics will remain unchanged.

Existing APIs: Existing pulsar-admin commands and REST APIs are unaffected.

## Forward Compatibility

Prometheus Systems: If a Pulsar broker with this feature enabled sends metrics with custom metric labels to an older Prometheus server or a monitoring system not expecting these additional labels, those systems will typically ignore the extra labels without issue.

Future Enhancements: Future Pulsar versions could extend this feature, for example, by allowing more dynamic management of allowedTopicPropertiesForMetrics if deemed safe and necessary.
Future Enhancements: Future Pulsar versions could extend this feature, for example, by allowing more dynamic management of allowedTopicPropertyKeysForMetrics if deemed safe and necessary.

OpenTelemetry Alignment: The key-value structure of custom metric labels aligns well with OpenTelemetry attributes, ensuring that this feature remains relevant and compatible with Pulsar's evolving metrics infrastructure.

# Testing Plan

A comprehensive testing strategy will be required:

Test the filtering logic against allowedTopicPropertiesForMetrics.
Test the filtering logic against allowedTopicPropertyKeysForMetrics.

Integration Tests:

Expand All @@ -158,7 +173,7 @@ Removal: Verify that removing a property removes the label.

Documentation updates will include:

- **Configuration Guide**: How to configure broker-level and namespace-level `allowedTopicPropertiesForMetrics`
- **Configuration Guide**: How to configure broker-level and namespace-level `allowedTopicPropertyKeysForMetrics`
- **Admin API Guide**: How to use pulsar-admin commands to manage namespace-level settings
- **Monitoring Guide**: How custom labels appear in Prometheus and best practices for cardinality management

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3534,6 +3534,21 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
)
private boolean exposeBundlesMetricsInPrometheus = false;

@FieldContext(
category = CATEGORY_METRICS,
doc = "Enable or disable custom topic metric labels feature. "
+ "If enabled, custom metric labels can be set on topics and will be exposed in metrics. "
+ "Default is false."
)
private boolean exposeCustomTopicMetricLabelsEnabled = false;

@FieldContext(
category = CATEGORY_METRICS,
doc = "A comma-separated list of Topic Property keys that are allowed to be exposed as metrics."
+ "Only keys explicitly listed here will be exposed."
)
private Set<String> allowedTopicPropertyKeysForMetrics = new HashSet<>();

/**** --- Functions. --- ****/
@FieldContext(
category = CATEGORY_FUNCTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString()));
}

public Optional<Policies> getPoliciesIfCachedAndAsyncLoad(NamespaceName ns) {
Optional<Policies> policiesOptional = getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString()));

// trigger async load if cache miss
getPoliciesAsync(ns);
return policiesOptional;
}

public CompletableFuture<Optional<Policies>> getPoliciesAsync(NamespaceName ns) {
return getCache().get(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
Expand Down Expand Up @@ -279,6 +287,15 @@ public CompletableFuture<List<String>> listPartitionedTopicsAsync(NamespaceName
);
}

public Optional<PartitionedTopicMetadata> getPartitionedTopicMetadataIfCacheAndAsyncLoad(TopicName tn) {
String path = joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName());
Optional<PartitionedTopicMetadata> result = getCache().getIfCached(path);
// trigger async load if cache miss
getAsync(path);
return result;
}

public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn) {
return getPartitionedTopicMetadataAsync(tn, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -62,8 +63,10 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.ws.rs.core.Response;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -132,6 +135,7 @@
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
import org.apache.pulsar.broker.validator.TransactionBatchedWriteValidator;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
Expand Down Expand Up @@ -212,6 +216,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
private static final int DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS = 8;
private static final Pattern METRICS_LABEL_NAME_PATTERN = Pattern.compile("[a-zA-Z_][a-zA-Z0-9_]*");
private final ServiceConfiguration config;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerStorage = null;
Expand Down Expand Up @@ -351,6 +356,7 @@ public PulsarService(ServiceConfiguration config,
PulsarConfigurationLoader.isComplete(config);
TransactionBatchedWriteValidator.validate(config);
this.config = config;
this.validateCustomMetricLabelKeys(config.getAllowedTopicPropertyKeysForMetrics());
this.clock = Clock.systemUTC();

this.openTelemetry = new PulsarBrokerOpenTelemetry(config, openTelemetrySdkBuilderCustomizer);
Expand Down Expand Up @@ -2277,4 +2283,51 @@ public HealthChecker getHealthChecker() {
}
return healthChecker;
}

// https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
public void validateCustomMetricLabelKeys(Set<String> allowedCustomMetricLabelKeys) {
if (allowedCustomMetricLabelKeys == null) {
return;
}
boolean exposeCustomTopicMetricLabelsEnabled = config.isExposeCustomTopicMetricLabelsEnabled();
if (exposeCustomTopicMetricLabelsEnabled) {
for (String labelKey : allowedCustomMetricLabelKeys) {
isValidMetricsName(labelKey);
}
}
}

private static void isValidMetricsName(String labelName) {
if (labelName == null || labelName.isEmpty()) {
throw new RestException(Response.Status.BAD_REQUEST, "Label name cannot be null or empty");
}

// Prometheus reserves all labels starting with "__" for internal use.
if (labelName.startsWith("__")) {
throw new RestException(Response.Status.BAD_REQUEST,
String.format("Label name '%s' is invalid: Prometheus reserves all labels starting with '__' "
+ "for internal use", labelName));
}

// Pulsar reserves all labels starting with "pulsar_" or "pulsar." for internal use.
if (labelName.endsWith("pulsar.") || labelName.endsWith("pulsar_")) {
throw new RestException(Response.Status.BAD_REQUEST,
String.format("Label name '%s' is invalid: Pulsar reserves all labels starting with 'pulsar_' "
+ "or 'pulsar.' for internal use", labelName));
}

// OpenTelemetry reserves all labels starting with "otel_" or "otel." for internal use.
if (labelName.startsWith("otel.") || labelName.startsWith("otel_")) {
throw new RestException(Response.Status.BAD_REQUEST,
String.format("Label name '%s' is invalid: OpenTelemetry reserves all labels starting with "
+ "'otel_' or 'otel.' for internal use", labelName));
}

boolean matches = METRICS_LABEL_NAME_PATTERN.matcher(labelName).matches();
if (!matches) {
throw new RestException(Response.Status.BAD_REQUEST,
String.format("Label name '%s' is invalid: must match the regex [a-zA-Z_][a-zA-Z0-9_]*", labelName));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,18 @@ protected void internalSetSubscriptionTypesEnabled(Set<SubscriptionType> subscri
"subscriptionTypesEnabled");
}

protected CompletableFuture<Void> internalSetAllowedTopicPropertyKeysForMetricsAsync(Set<String> allowedKeys) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CUSTOM_METRIC_LABELS,
PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> pulsar().validateCustomMetricLabelKeys(allowedKeys))
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.allowed_topic_property_keys_for_metrics = allowedKeys != null
? new HashSet<>(allowedKeys) : null;
return policies;
}));
}


private <T> void mutatePolicy(Function<Policies, Policies> policyTransformation,
Function<Policies, T> getter,
Expand Down
Loading
Loading