Skip to content

Commit b4e97ec

Browse files
authored
#585 Add Storm V2 metrics support with backward-compatible bridge (#1846)
* #585 Add Storm V2 metrics support with backward-compatible bridge Introduce a CrawlerMetrics factory that routes metric registration to Storm V1, V2 (Codahale/Dropwizard), or both APIs based on the config property `stormcrawler.metrics.version` ("v1" default, "v2", "both"). This enables gradual migration from deprecated V1 metrics without breaking existing deployments or dashboards. - New metrics bridge infrastructure in core (ScopedCounter, ScopedReducedMetric interfaces with V1/V2/Dual implementations) - Migrated all bolt/spout metric registration across core and all external modules (opensearch, sql, solr, aws, tika, warc, urlfrontier) - Added V2 ScheduledStormReporter implementations for OpenSearch, SQL, and Solr that write the same document schema as V1 MetricsConsumer * Address reviewer comments
1 parent 47d6bab commit b4e97ec

48 files changed

Lines changed: 2004 additions & 188 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,6 @@
4848
import org.apache.commons.lang3.StringUtils;
4949
import org.apache.http.HttpHeaders;
5050
import org.apache.storm.Config;
51-
import org.apache.storm.metric.api.MeanReducer;
52-
import org.apache.storm.metric.api.MultiCountMetric;
53-
import org.apache.storm.metric.api.MultiReducedMetric;
5451
import org.apache.storm.task.OutputCollector;
5552
import org.apache.storm.task.TopologyContext;
5653
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -61,13 +58,15 @@
6158
import org.apache.storm.utils.Utils;
6259
import org.apache.stormcrawler.Constants;
6360
import org.apache.stormcrawler.Metadata;
61+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
62+
import org.apache.stormcrawler.metrics.ScopedCounter;
63+
import org.apache.stormcrawler.metrics.ScopedReducedMetric;
6464
import org.apache.stormcrawler.persistence.Status;
6565
import org.apache.stormcrawler.protocol.Protocol;
6666
import org.apache.stormcrawler.protocol.ProtocolFactory;
6767
import org.apache.stormcrawler.protocol.ProtocolResponse;
6868
import org.apache.stormcrawler.protocol.RobotRules;
6969
import org.apache.stormcrawler.util.ConfUtils;
70-
import org.apache.stormcrawler.util.PerSecondReducer;
7170
import org.apache.stormcrawler.util.URLUtil;
7271
import org.slf4j.LoggerFactory;
7372

@@ -112,16 +111,16 @@ public class FetcherBolt extends StatusEmitterBolt {
112111

113112
private FetchItemQueues fetchQueues;
114113

115-
private MultiCountMetric eventCounter;
116-
private MultiReducedMetric averagedMetrics;
114+
private ScopedCounter eventCounter;
115+
private ScopedReducedMetric averagedMetrics;
117116

118117
private ProtocolFactory protocolFactory;
119118

120119
private int taskId = -1;
121120

122121
boolean sitemapsAutoDiscovery = false;
123122

124-
private MultiReducedMetric perSecMetrics;
123+
private ScopedReducedMetric perSecMetrics;
125124

126125
private File debugfiletrigger;
127126

@@ -909,42 +908,34 @@ public void prepare(
909908
// The data can be accessed by registering a "MetricConsumer" in the
910909
// topology
911910
this.eventCounter =
912-
context.registerMetric(
913-
"fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);
911+
CrawlerMetrics.registerCounter(
912+
context, stormConf, "fetcher_counter", metricsTimeBucketSecs);
914913

915914
// create gauges
916-
context.registerMetric(
917-
"activethreads",
918-
() -> {
919-
return activeThreads.get();
920-
},
921-
metricsTimeBucketSecs);
915+
CrawlerMetrics.registerGauge(
916+
context, stormConf, "activethreads", activeThreads::get, metricsTimeBucketSecs);
922917

923-
context.registerMetric(
918+
CrawlerMetrics.registerGauge(
919+
context,
920+
stormConf,
924921
"in_queues",
925-
() -> {
926-
return fetchQueues.inQueues.get();
927-
},
922+
() -> fetchQueues.inQueues.get(),
928923
metricsTimeBucketSecs);
929924

930-
context.registerMetric(
925+
CrawlerMetrics.registerGauge(
926+
context,
927+
stormConf,
931928
"num_queues",
932-
() -> {
933-
return fetchQueues.queues.size();
934-
},
929+
() -> fetchQueues.queues.size(),
935930
metricsTimeBucketSecs);
936931

937932
this.averagedMetrics =
938-
context.registerMetric(
939-
"fetcher_average_perdoc",
940-
new MultiReducedMetric(new MeanReducer()),
941-
metricsTimeBucketSecs);
933+
CrawlerMetrics.registerMeanMetric(
934+
context, stormConf, "fetcher_average_perdoc", metricsTimeBucketSecs);
942935

943936
this.perSecMetrics =
944-
context.registerMetric(
945-
"fetcher_average_persec",
946-
new MultiReducedMetric(new PerSecondReducer()),
947-
metricsTimeBucketSecs);
937+
CrawlerMetrics.registerPerSecMetric(
938+
context, stormConf, "fetcher_average_persec", metricsTimeBucketSecs);
948939

949940
protocolFactory = ProtocolFactory.getInstance(conf);
950941

core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.stream.Stream;
3636
import org.apache.commons.lang3.StringUtils;
3737
import org.apache.http.HttpHeaders;
38-
import org.apache.storm.metric.api.MultiCountMetric;
3938
import org.apache.storm.task.OutputCollector;
4039
import org.apache.storm.task.TopologyContext;
4140
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -44,6 +43,8 @@
4443
import org.apache.storm.tuple.Values;
4544
import org.apache.stormcrawler.Constants;
4645
import org.apache.stormcrawler.Metadata;
46+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
47+
import org.apache.stormcrawler.metrics.ScopedCounter;
4748
import org.apache.stormcrawler.parse.DocumentFragmentBuilder;
4849
import org.apache.stormcrawler.parse.JSoupFilter;
4950
import org.apache.stormcrawler.parse.JSoupFilters;
@@ -82,7 +83,7 @@ public class JSoupParserBolt extends StatusEmitterBolt {
8283

8384
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(JSoupParserBolt.class);
8485

85-
private MultiCountMetric eventCounter;
86+
private ScopedCounter eventCounter;
8687

8788
private ParseFilter parseFilters = null;
8889

@@ -132,7 +133,7 @@ public void prepare(
132133
super.prepare(conf, context, collector);
133134

134135
eventCounter =
135-
context.registerMetric(this.getClass().getSimpleName(), new MultiCountMetric(), 10);
136+
CrawlerMetrics.registerCounter(context, conf, this.getClass().getSimpleName(), 10);
136137

137138
parseFilters = ParseFilters.fromConf(conf);
138139

core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@
3939
import org.apache.commons.lang3.StringUtils;
4040
import org.apache.http.HttpHeaders;
4141
import org.apache.storm.Config;
42-
import org.apache.storm.metric.api.IMetric;
43-
import org.apache.storm.metric.api.MeanReducer;
44-
import org.apache.storm.metric.api.MultiCountMetric;
45-
import org.apache.storm.metric.api.MultiReducedMetric;
4642
import org.apache.storm.task.OutputCollector;
4743
import org.apache.storm.task.TopologyContext;
4844
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -52,13 +48,15 @@
5248
import org.apache.storm.utils.Utils;
5349
import org.apache.stormcrawler.Constants;
5450
import org.apache.stormcrawler.Metadata;
51+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
52+
import org.apache.stormcrawler.metrics.ScopedCounter;
53+
import org.apache.stormcrawler.metrics.ScopedReducedMetric;
5554
import org.apache.stormcrawler.persistence.Status;
5655
import org.apache.stormcrawler.protocol.Protocol;
5756
import org.apache.stormcrawler.protocol.ProtocolFactory;
5857
import org.apache.stormcrawler.protocol.ProtocolResponse;
5958
import org.apache.stormcrawler.protocol.RobotRules;
6059
import org.apache.stormcrawler.util.ConfUtils;
61-
import org.apache.stormcrawler.util.PerSecondReducer;
6260
import org.apache.stormcrawler.util.URLUtil;
6361
import org.slf4j.LoggerFactory;
6462

@@ -85,9 +83,9 @@ public class SimpleFetcherBolt extends StatusEmitterBolt {
8583

8684
private Config conf;
8785

88-
private MultiCountMetric eventCounter;
89-
private MultiReducedMetric averagedMetrics;
90-
private MultiReducedMetric perSecMetrics;
86+
private ScopedCounter eventCounter;
87+
private ScopedReducedMetric averagedMetrics;
88+
private ScopedReducedMetric perSecMetrics;
9189

9290
private ProtocolFactory protocolFactory;
9391

@@ -174,40 +172,26 @@ public void prepare(
174172
int metricsTimeBucketSecs = ConfUtils.getInt(conf, "fetcher.metrics.time.bucket.secs", 10);
175173

176174
this.eventCounter =
177-
context.registerMetric(
178-
"fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);
175+
CrawlerMetrics.registerCounter(
176+
context, stormConf, "fetcher_counter", metricsTimeBucketSecs);
179177

180178
this.averagedMetrics =
181-
context.registerMetric(
182-
"fetcher_average",
183-
new MultiReducedMetric(new MeanReducer()),
184-
metricsTimeBucketSecs);
179+
CrawlerMetrics.registerMeanMetric(
180+
context, stormConf, "fetcher_average", metricsTimeBucketSecs);
185181

186182
this.perSecMetrics =
187-
context.registerMetric(
188-
"fetcher_average_persec",
189-
new MultiReducedMetric(new PerSecondReducer()),
190-
metricsTimeBucketSecs);
183+
CrawlerMetrics.registerPerSecMetric(
184+
context, stormConf, "fetcher_average_persec", metricsTimeBucketSecs);
191185

192186
// create gauges
193-
context.registerMetric(
194-
"activethreads",
195-
new IMetric() {
196-
@Override
197-
public Object getValueAndReset() {
198-
return activeThreads.get();
199-
}
200-
},
201-
metricsTimeBucketSecs);
187+
CrawlerMetrics.registerGauge(
188+
context, stormConf, "activethreads", activeThreads::get, metricsTimeBucketSecs);
202189

203-
context.registerMetric(
190+
CrawlerMetrics.registerGauge(
191+
context,
192+
stormConf,
204193
"throttler_size",
205-
new IMetric() {
206-
@Override
207-
public Object getValueAndReset() {
208-
return throttler.estimatedSize();
209-
}
210-
},
194+
throttler::estimatedSize,
211195
metricsTimeBucketSecs);
212196

213197
protocolFactory = ProtocolFactory.getInstance(conf);

core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@
4242
import java.util.Locale;
4343
import java.util.Map;
4444
import java.util.TimeZone;
45+
import java.util.function.Consumer;
4546
import org.apache.commons.lang3.StringUtils;
4647
import org.apache.http.HttpHeaders;
47-
import org.apache.storm.metric.api.MeanReducer;
48-
import org.apache.storm.metric.api.ReducedMetric;
4948
import org.apache.storm.task.OutputCollector;
5049
import org.apache.storm.task.TopologyContext;
5150
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -54,6 +53,7 @@
5453
import org.apache.storm.tuple.Values;
5554
import org.apache.stormcrawler.Constants;
5655
import org.apache.stormcrawler.Metadata;
56+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
5757
import org.apache.stormcrawler.parse.Outlink;
5858
import org.apache.stormcrawler.parse.ParseFilter;
5959
import org.apache.stormcrawler.parse.ParseFilters;
@@ -86,7 +86,7 @@ public class SiteMapParserBolt extends StatusEmitterBolt {
8686

8787
private int maxOffsetGuess = 300;
8888

89-
private ReducedMetric averagedMetrics;
89+
private Consumer<Number> averagedMetrics;
9090

9191
/** Delay in minutes used for scheduling sub-sitemaps. */
9292
private int scheduleSitemapsWithDelay = -1;
@@ -194,7 +194,7 @@ private List<Outlink> parseSiteMap(
194194
siteMap = parser.parseSiteMap(contentType, content, url1);
195195
}
196196
long end = System.currentTimeMillis();
197-
averagedMetrics.update(end - start);
197+
averagedMetrics.accept(end - start);
198198

199199
List<Outlink> links = new ArrayList<>();
200200

@@ -341,10 +341,8 @@ public void prepare(
341341
parseFilters = ParseFilters.fromConf(stormConf);
342342
maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", 300);
343343
averagedMetrics =
344-
context.registerMetric(
345-
"sitemap_average_processing_time",
346-
new ReducedMetric(new MeanReducer()),
347-
30);
344+
CrawlerMetrics.registerSingleMeanMetric(
345+
context, stormConf, "sitemap_average_processing_time", 30);
348346
scheduleSitemapsWithDelay =
349347
ConfUtils.getInt(stormConf, "sitemap.schedule.delay", scheduleSitemapsWithDelay);
350348
List<String> extensionsStrings =

core/src/main/java/org/apache/stormcrawler/bolt/URLPartitionerBolt.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.LinkedHashMap;
2626
import java.util.Map;
2727
import org.apache.commons.lang3.StringUtils;
28-
import org.apache.storm.metric.api.MultiCountMetric;
2928
import org.apache.storm.task.OutputCollector;
3029
import org.apache.storm.task.TopologyContext;
3130
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -35,6 +34,8 @@
3534
import org.apache.storm.tuple.Values;
3635
import org.apache.stormcrawler.Constants;
3736
import org.apache.stormcrawler.Metadata;
37+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
38+
import org.apache.stormcrawler.metrics.ScopedCounter;
3839
import org.apache.stormcrawler.util.ConfUtils;
3940
import org.apache.stormcrawler.util.URLUtil;
4041
import org.slf4j.Logger;
@@ -47,7 +48,7 @@ public class URLPartitionerBolt extends BaseRichBolt {
4748

4849
private OutputCollector collector;
4950

50-
private MultiCountMetric eventCounter;
51+
private ScopedCounter eventCounter;
5152

5253
private Map<String, String> cache;
5354

@@ -165,7 +166,8 @@ public void prepare(
165166
// system stream
166167
// The data can be accessed by registering a "MetricConsumer" in the
167168
// topology
168-
this.eventCounter = context.registerMetric("URLPartitioner", new MultiCountMetric(), 10);
169+
this.eventCounter =
170+
CrawlerMetrics.registerCounter(context, stormConf, "URLPartitioner", 10);
169171

170172
final int maxEntries = 500;
171173
cache =

0 commit comments

Comments
 (0)