Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean bookkeeperClientExposeStatsToPrometheus = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "whether expose metadataStore stats to prometheus"
)
private boolean metadataStoreExposeStatsToPrometheus = false;

@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "whether limit per_channel_bookie_client metrics of bookkeeper client stats"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.authentication.AuthenticationService;
Expand Down Expand Up @@ -126,6 +129,7 @@
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
Expand Down Expand Up @@ -312,6 +316,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private String brokerId;
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>();
private final List<Runnable> pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>();
private StatsProvider metadataStoreStatsProvider = new NullStatsProvider();

public enum State {
Init, Started, Closing, Closed
Expand Down Expand Up @@ -430,6 +435,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro
}
return MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
MetadataStoreConfig.builder()
.statsProvider(metadataStoreStatsProvider)
.sessionTimeoutMillis((int) config.getMetadataStoreSessionTimeoutMillis())
.allowReadOnlyOperations(config.isMetadataStoreAllowReadOnlyOperations())
.configFilePath(configFilePath)
Expand Down Expand Up @@ -738,6 +744,9 @@ public CompletableFuture<Void> closeAsync(boolean waitForWebServiceToStop) {
openTelemetryTopicStats = null;
}

if (metadataStoreStatsProvider != null) {
metadataStoreStatsProvider.stop();
}
asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));

// add timeout handling for closing executors
Expand Down Expand Up @@ -886,6 +895,14 @@ public void start() throws PulsarServerException {
localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
: null;
Configuration configuration = new ClientConfiguration();
if (config.isMetadataStoreExposeStatsToPrometheus()) {
configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
config.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, config.getClusterName());
metadataStoreStatsProvider = new PrometheusMetricsProvider();
}
metadataStoreStatsProvider.start(configuration);
localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer,
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
Expand Down Expand Up @@ -1344,6 +1361,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
throws MetadataStoreException, PulsarServerException {
return MetadataStoreExtended.create(config.getMetadataStoreUrl(),
MetadataStoreConfig.builder()
.statsProvider(metadataStoreStatsProvider)
.sessionTimeoutMillis((int) config.getMetadataStoreSessionTimeoutMillis())
.allowReadOnlyOperations(config.isMetadataStoreAllowReadOnlyOperations())
.configFilePath(config.getMetadataStoreConfigPath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ protected ByteBuf generateMetrics(List<PrometheusRawMetricsProvider> metricsProv

generateManagedLedgerBookieClientMetrics(pulsar, stream);

generateMetadataStoreMetrics(pulsar, stream);

if (metricsProviders != null) {
for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) {
metricsProvider.generate(stream);
Expand Down Expand Up @@ -499,8 +501,20 @@ private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsa
if (statsProvider instanceof NullStatsProvider) {
return;
}
writeStatsProviderMetrics(statsProvider, stream);
}
}

try (Writer writer = new OutputStreamWriter(new BufferedOutputStream(new OutputStream() {
private static void generateMetadataStoreMetrics(PulsarService pulsar, SimpleTextOutputStream stream) {
StatsProvider statsProvider = pulsar.getMetadataStoreStatsProvider();
if (statsProvider instanceof NullStatsProvider) {
return;
}
writeStatsProviderMetrics(statsProvider, stream);
}

private static void writeStatsProviderMetrics(StatsProvider statsProvider, SimpleTextOutputStream stream) {
try (Writer writer = new OutputStreamWriter(new BufferedOutputStream(new OutputStream() {
@Override
public void write(int b) throws IOException {
stream.writeByte(b);
Expand All @@ -513,9 +527,8 @@ public void write(byte b[], int off, int len) throws IOException {
}), StandardCharsets.UTF_8)) {
statsProvider.writeAllMetrics(writer);
} catch (IOException e) {
log.error().exception(e).log("Failed to write managed ledger bookie client metrics");
log.error().exception(e).log("Failed to write metrics");
}
}
}

public MetricsBuffer renderToBuffer(Executor executor, List<PrometheusRawMetricsProvider> metricsProviders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,30 @@
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.io.StringWriter;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.metadata.TestZKServer;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -74,6 +81,70 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testZKMetadataStoreMetricsCollectedByPrometheus() throws Exception {
@Cleanup
TestZKServer zkServer = new TestZKServer();

PrometheusMetricsProvider prometheusMetricsProvider = new PrometheusMetricsProvider();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS, 60);
bkClientConf.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, "test");
prometheusMetricsProvider.start(bkClientConf);

try {
MetadataStoreConfig config = MetadataStoreConfig.builder()
.metadataStoreName("test-zk-prometheus")
.fsyncEnable(false)
.statsProvider(prometheusMetricsProvider)
.build();

@Cleanup
MetadataStore store = MetadataStoreFactory.create(
"zk:" + zkServer.getConnectionString(), config);

String path = "/test-prometheus-metrics-" + UUID.randomUUID();
store.put(path, "test-value".getBytes(), java.util.Optional.empty()).join();
store.get(path).join();
store.delete(path, Optional.empty()).join();

// Write all metrics via PrometheusMetricsProvider.writeAllMetrics
StringWriter writer = new StringWriter();
prometheusMetricsProvider.writeAllMetrics(writer);
String metricsOutput = writer.toString();

// Parse the metrics output
Multimap<String, Metric> metricsMap = parseMetrics(metricsOutput);

String metricsDebugMessage = "Assertion failed with metrics:\n" + metricsOutput + "\n";

// Verify the "multi" opStats metric exists.
Assert.assertTrue(metricsMap.containsKey("ZkMetadataStore_storeName_test_zk_prometheus_zk_multi_count"),
"Expected ZKMetadataStore_zk_multi_count metric to be present. " + metricsDebugMessage);

Assert.assertTrue(metricsMap.containsKey("ZkMetadataStore_storeName_test_zk_prometheus_zk_multi_sum"),
"Expected ZKMetadataStore_zk_multi_sum metric to be present. " + metricsDebugMessage);

// Verify that multi metrics have the correct cluster tag
for (Metric m : metricsMap.get("ZkMetadataStore_storeName_test_zk_prometheus_zk_multi_count")) {
Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
}

// Verify other ZK operation metrics are also present
// (these are registered by PulsarZooKeeperClient in its constructor)
Assert.assertTrue(metricsMap.containsKey("ZkMetadataStore_storeName_test_zk_prometheus_zk_create_count"),
"Expected ZKMetadataStore_zk_create_count metric to be present. " + metricsDebugMessage);

Assert.assertTrue(metricsMap.containsKey("ZkMetadataStore_storeName_test_zk_prometheus_zk_get_data_count"),
"Expected ZKMetadataStore_zk_get_data_count metric to be present. " + metricsDebugMessage);

Assert.assertTrue(metricsMap.containsKey("ZkMetadataStore_storeName_test_zk_prometheus_zk_exists_count"),
"Expected ZKMetadataStore_zk_exists_count metric to be present. " + metricsDebugMessage);
} finally {
prometheusMetricsProvider.stop();
}
}

@Test
public void testMetadataStoreStats() throws Exception {
String ns = "prop/ns-abc1";
Expand Down Expand Up @@ -256,4 +327,4 @@ private boolean isExpectedLabel(String metadataStoreName, Set<String> expectedLa
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;

/**
* The configuration builder for a {@link MetadataStore} config.
Expand Down Expand Up @@ -107,4 +109,7 @@ public class MetadataStoreConfig {

@Builder.Default
private final int numSerDesThreads = 1;

@Builder.Default
private StatsProvider statsProvider = new NullStatsProvider();
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ public List<OpResult> call() throws KeeperException, InterruptedException {
public void multi(final Iterable<Op> ops,
final MultiCallback cb,
final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, multiStats) {

final MultiCallback multiCb = new MultiCallback() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.stream.Collectors;
import lombok.CustomLog;
import lombok.SneakyThrows;
import org.apache.bookkeeper.stats.StatsLogger;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's BookKeeper stats provider. We shouldn't be using it here

Copy link
Copy Markdown
Contributor Author

@zhaizhibo zhaizhibo May 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat But we had used org.apache.bookkeeper.stats.OpStatsLogger and org.apache.bookkeeper.stats.StatsLogger in PulsarZooKeeperClient, and the builder of PulsarZooKeeperClient is also org.apache.bookkeeper.stats.StatsLogger. Should we also replace all references to this type?

import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -99,12 +100,19 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf
this.rootPath = new ConnectStringParser(zkConnectString).getChrootPath();

isZkManaged = true;
StatsLogger statsLogger = null;
if (metadataStoreConfig.getStatsProvider() != null) {
statsLogger = metadataStoreConfig.getStatsProvider()
.getStatsLogger("ZkMetadataStore")
.scopeLabel("storeName", metadataStoreConfig.getMetadataStoreName());
}
zkc = PulsarZooKeeperClient.newBuilder().connectString(zkConnectString)
.connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE))
.allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations())
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
.watchers(Collections.singleton(this::processSessionWatcher))
.configPath(metadataStoreConfig.getConfigFilePath())
.statsLogger(statsLogger)
.build();
if (enableSessionWatcher) {
sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
Expand Down
Loading