Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public interface LedgerOffloaderStats extends AutoCloseable {

void recordReadOffloadBytes(String topic, long size);

default void recordReadOffloadCacheHit(String topic, long size) {
}

default void recordReadOffloadCacheMiss(String topic, long size) {
}

void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit);

void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ public void recordReadOffloadBytes(String topic, long size) {

}

@Override
public void recordReadOffloadCacheHit(String topic, long size) {

}

@Override
public void recordReadOffloadCacheMiss(String topic, long size) {

}

@Override
public void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
private static final String STATUS = "status";
private static final String SUCCEED = "succeed";
private static final String FAILED = "failed";
private static final String HIT = "hit";
private static final String MISS = "miss";

private final boolean exposeTopicLevelMetrics;
private final int interval;
Expand All @@ -58,13 +60,15 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
private final Gauge readOffloadRate;
private final Summary readOffloadIndexLatency;
private final Summary readOffloadDataLatency;
private final Counter readOffloadCacheOps;
private final Counter readOffloadCacheBytes;

private final Map<String, Long> topicAccess;
private final Map<String, Pair<LongAdder, LongAdder>> offloadAndReadOffloadBytesMap;

final AtomicBoolean closed = new AtomicBoolean(false);

private LedgerOffloaderStatsImpl(boolean exposeTopicLevelMetrics,
private LedgerOffloaderStatsImpl(boolean exposeTopicLevelMetrics,
ScheduledExecutorService scheduler, int interval) {
this.interval = interval;
this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
Expand Down Expand Up @@ -116,6 +120,13 @@ private LedgerOffloaderStatsImpl(boolean exposeTopicLevelMetrics,
? new String[]{NAMESPACE_LABEL, TOPIC_LABEL, STATUS} : new String[]{NAMESPACE_LABEL, STATUS};
this.deleteOffloadOps = Counter.build("brk_ledgeroffloader_delete_offload_ops", "-")
.labelNames(deleteOpsLabels).create().register();

String[] cacheLabels = exposeTopicLevelMetrics
? new String[]{NAMESPACE_LABEL, TOPIC_LABEL, STATUS} : new String[]{NAMESPACE_LABEL, STATUS};
this.readOffloadCacheOps = Counter.build("brk_ledgeroffloader_read_offload_cache_ops", "-")
.labelNames(cacheLabels).create().register();
this.readOffloadCacheBytes = Counter.build("brk_ledgeroffloader_read_offload_cache_bytes", "-")
.labelNames(cacheLabels).create().register();
}


Expand Down Expand Up @@ -177,6 +188,23 @@ public void recordReadOffloadBytes(String topic, long size) {
this.addOrUpdateTopicAccess(topic);
}

@Override
public void recordReadOffloadCacheHit(String topic, long size) {
recordReadOffloadCacheOperation(topic, size, HIT);
}

@Override
public void recordReadOffloadCacheMiss(String topic, long size) {
recordReadOffloadCacheOperation(topic, size, MISS);
}

private void recordReadOffloadCacheOperation(String topic, long size, String status) {
String[] labelValues = this.labelValues(topic, status);
this.readOffloadCacheOps.labels(labelValues).inc();
this.readOffloadCacheBytes.labels(labelValues).inc(size);
this.addOrUpdateTopicAccess(topic);
}

@Override
public void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit) {
String[] labelValues = this.labelValues(topic);
Expand Down Expand Up @@ -252,6 +280,12 @@ private void cleanExpiredTopicMetrics() {
this.deleteOffloadOps.remove(labelValues);
labelValues = this.labelValues(topic, FAILED);
this.deleteOffloadOps.remove(labelValues);
labelValues = this.labelValues(topic, HIT);
this.readOffloadCacheOps.remove(labelValues);
this.readOffloadCacheBytes.remove(labelValues);
labelValues = this.labelValues(topic, MISS);
this.readOffloadCacheOps.remove(labelValues);
this.readOffloadCacheBytes.remove(labelValues);

return true;
}
Expand Down Expand Up @@ -288,6 +322,8 @@ public synchronized void close() throws Exception {
CollectorRegistry.defaultRegistry.unregister(this.readOffloadIndexLatency);
CollectorRegistry.defaultRegistry.unregister(this.readOffloadDataLatency);
CollectorRegistry.defaultRegistry.unregister(this.deleteOffloadOps);
CollectorRegistry.defaultRegistry.unregister(this.readOffloadCacheOps);
CollectorRegistry.defaultRegistry.unregister(this.readOffloadCacheBytes);
instance = null;
}
}
Expand Down Expand Up @@ -346,6 +382,18 @@ public long getReadOffloadBytes(String topic) {
return totalBytes;
}

@VisibleForTesting
public long getReadOffloadCacheOps(String topic, boolean hit) {
String[] labels = this.labelValues(topic, hit ? HIT : MISS);
return (long) this.readOffloadCacheOps.labels(labels).get();
}

@VisibleForTesting
public long getReadOffloadCacheBytes(String topic, boolean hit) {
String[] labels = this.labelValues(topic, hit ? HIT : MISS);
return (long) this.readOffloadCacheBytes.labels(labels).get();
}

@VisibleForTesting
public Summary.Child.Value getReadLedgerLatency(String topic) {
String[] labels = this.labelValues(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,22 @@ public void testNamespaceLevelMetrics() throws Exception {
}
}

@Test
public void testReadOffloadCacheMetrics() throws Exception {
conf.setExposeTopicLevelMetricsInPrometheus(true);
super.baseSetup();

String topicName = "persistent://prop/ns-abc1/testMetrics" + UUID.randomUUID();

LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) pulsar.getOffloaderStats();
offloaderStats.recordReadOffloadCacheHit(topicName, 10);
offloaderStats.recordReadOffloadCacheMiss(topicName, 20);
offloaderStats.recordReadOffloadCacheMiss(topicName, 30);

assertEquals(offloaderStats.getReadOffloadCacheOps(topicName, true), 1);
assertEquals(offloaderStats.getReadOffloadCacheBytes(topicName, true), 10);
assertEquals(offloaderStats.getReadOffloadCacheOps(topicName, false), 2);
assertEquals(offloaderStats.getReadOffloadCacheBytes(topicName, false), 50);
}

}
Loading