Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion src/java/org/apache/nutch/fetcher/Fetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,27 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.tdunning.math.stats.MergingDigest;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.metrics.LatencyTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.util.MimeUtil;
import org.apache.nutch.util.NutchConfiguration;
Expand Down Expand Up @@ -223,9 +228,9 @@ public void run(Context innerContext)

setup(innerContext);
initCounters(innerContext);
LinkedList<FetcherThread> fetcherThreads = new LinkedList<>();
try {
Configuration conf = innerContext.getConfiguration();
LinkedList<FetcherThread> fetcherThreads = new LinkedList<>();
FetchItemQueues fetchQueues = new FetchItemQueues(conf);
QueueFeeder feeder;

Expand Down Expand Up @@ -494,11 +499,70 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
} while (activeThreads.get() > 0);
LOG.info("-activeThreads={}", activeThreads);
} finally {
// Merge all thread latency trackers and emit once; emit TDigest for reducer
LatencyTracker mergedLatencyTracker = new LatencyTracker(
NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY);
for (FetcherThread fetcherThread : fetcherThreads) {
mergedLatencyTracker.merge(fetcherThread.getFetchLatencyTracker());
}
mergedLatencyTracker.emitCountAndSumOnly(innerContext);
byte[] digestBytes = mergedLatencyTracker.toBytes();
if (digestBytes.length > 0) {
innerContext.write(new Text(NutchMetrics.LATENCY_KEY),
new NutchWritable(new BytesWritable(digestBytes)));
}
cleanup(innerContext);
}
}
}

/**
* Reducer that passes through (url, datum) records and merges TDigests from
* map tasks to set job-level latency percentile counters.
*/
public static class FetcherReducer extends
Reducer<Text, NutchWritable, Text, NutchWritable> {

private static final Text LATENCY_KEY = new Text(NutchMetrics.LATENCY_KEY);

@Override
public void reduce(Text key, Iterable<NutchWritable> values,
Context context) throws IOException, InterruptedException {
if (key.equals(LATENCY_KEY)) {
MergingDigest mergedDigest = null;
for (NutchWritable nutchWritable : values) {
if (nutchWritable.get() instanceof BytesWritable) {
BytesWritable digestBytesWritable = (BytesWritable) nutchWritable.get();
byte[] digestBytes = digestBytesWritable.copyBytes();
if (digestBytes != null && digestBytes.length > 0) {
MergingDigest digest = LatencyTracker.fromBytes(digestBytes);
if (digest != null) {
if (mergedDigest == null) {
mergedDigest = digest;
} else {
mergedDigest.add(digest);
}
}
}
}
}
// Set only percentile counters; count_total and sum_ms are already correct from task aggregation
if (mergedDigest != null) {
context.getCounter(NutchMetrics.GROUP_FETCHER,
NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P50_MS).setValue((long) mergedDigest.quantile(0.50));
context.getCounter(NutchMetrics.GROUP_FETCHER,
NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P95_MS).setValue((long) mergedDigest.quantile(0.95));
context.getCounter(NutchMetrics.GROUP_FETCHER,
NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P99_MS).setValue((long) mergedDigest.quantile(0.99));
}
return;
}
for (NutchWritable value : values) {
context.write(key, value);
}
}
}

public void fetch(Path segment, int threads) throws IOException,
InterruptedException, ClassNotFoundException {

Expand Down Expand Up @@ -561,6 +625,8 @@ public void fetch(Path segment, int threads) throws IOException,
job.setInputFormatClass(InputFormat.class);
job.setJarByClass(Fetcher.class);
job.setMapperClass(Fetcher.FetcherRun.class);
job.setReducerClass(Fetcher.FetcherReducer.class);
job.setNumReduceTasks(1);

FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormatClass(FetcherOutputFormat.class);
Expand Down
10 changes: 8 additions & 2 deletions src/java/org/apache/nutch/fetcher/FetcherThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,6 @@ public void run() {
if (fit != null) {
fetchQueues.finishFetchItem(fit);
}
// Emit fetch latency metrics
fetchLatencyTracker.emitCounters(context);
// Emit error metrics
errorTracker.emitCounters(context);
activeThreads.decrementAndGet(); // count threads
Expand All @@ -577,6 +575,14 @@ public void run() {
}
}

/**
* Returns the fetch latency tracker for this thread so the mapper can merge
* all thread trackers and emit job-level percentiles.
*/
public LatencyTracker getFetchLatencyTracker() {
return fetchLatencyTracker;
}

private Text handleRedirect(FetchItem fit, String newUrl,
boolean temp, String redirType)
throws MalformedURLException, URLFilterException, InterruptedException {
Expand Down
91 changes: 88 additions & 3 deletions src/java/org/apache/nutch/indexer/IndexerMapReduce.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,28 @@
import java.util.Collection;
import java.util.Locale;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.CrawlDb;
import org.apache.nutch.crawl.Inlinks;
Expand All @@ -56,6 +63,8 @@
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;

import com.tdunning.math.stats.MergingDigest;

/**
* <p>
* This class is typically invoked from within
Expand Down Expand Up @@ -294,8 +303,21 @@ private void initCounters(Reducer<Text, NutchWritable, Text, NutchIndexAction>.C
@Override
public void cleanup(Reducer<Text, NutchWritable, Text, NutchIndexAction>.Context context)
throws IOException, InterruptedException {
// Emit indexing latency metrics
indexLatencyTracker.emitCounters(context);
indexLatencyTracker.emitCountAndSumOnly(context);
byte[] digestBytes = indexLatencyTracker.toBytes();
if (digestBytes.length > 0) {
Path outPath = FileOutputFormat.getOutputPath(context);
Path latencyDir = new Path(outPath, "_latency");
Path latencyFile = new Path(latencyDir, context.getTaskAttemptID().toString() + ".seq");
FileSystem fs = latencyFile.getFileSystem(context.getConfiguration());
fs.mkdirs(latencyDir);
try (SequenceFile.Writer writer = SequenceFile.createWriter(context.getConfiguration(),
SequenceFile.Writer.file(latencyFile),
SequenceFile.Writer.keyClass(NullWritable.class),
SequenceFile.Writer.valueClass(BytesWritable.class))) {
writer.append(NullWritable.get(), new BytesWritable(digestBytes));
}
}
}

@Override
Expand Down Expand Up @@ -557,4 +579,67 @@ public static void initMRJob(Path crawlDb, Path linkDb,
job.setMapOutputValueClass(NutchWritable.class);
job.setOutputValueClass(NutchWritable.class);
}

/** Mapper for Indexer Latency Merge job: passes through (1, bytes). */
public static class IndexerLatencyMergeMapper
extends Mapper<NullWritable, BytesWritable, IntWritable, BytesWritable> {
private static final IntWritable ONE = new IntWritable(1);

@Override
public void map(NullWritable key, BytesWritable value, Context context)
throws IOException, InterruptedException {
context.write(ONE, value);
}
}

/** Reducer for Indexer Latency Merge job: merges TDigests and sets job counters. */
public static class IndexerLatencyMergeReducer
extends Reducer<IntWritable, BytesWritable, IntWritable, BytesWritable> {

@Override
public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context)
throws IOException, InterruptedException {
MergingDigest merged = null;
for (BytesWritable bw : values) {
byte[] bytes = bw.copyBytes();
if (bytes != null && bytes.length > 0) {
MergingDigest d = LatencyTracker.fromBytes(bytes);
if (d != null) {
if (merged == null) {
merged = d;
} else {
merged.add(d);
}
}
}
}
if (merged != null) {
context.getCounter(NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P50_MS).setValue((long) merged.quantile(0.50));
context.getCounter(NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P95_MS).setValue((long) merged.quantile(0.95));
context.getCounter(NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P99_MS).setValue((long) merged.quantile(0.99));
}
}
}

/**
* Runs a small job that merges TDigest side files from the indexer and sets
* job-level percentile counters. Call after the main index job succeeds.
*/
public static Job createLatencyMergeJob(Configuration conf, Path latencyDir)
throws IOException {
Job job = Job.getInstance(conf, "Nutch Indexer Latency Merge");
job.setJarByClass(IndexerMapReduce.class);
FileInputFormat.addInputPath(job, latencyDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(IndexerLatencyMergeMapper.class);
job.setReducerClass(IndexerLatencyMergeReducer.class);
job.setNumReduceTasks(1);
job.setOutputFormatClass(NullOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(BytesWritable.class);
return job;
}
}
23 changes: 23 additions & 0 deletions src/java/org/apache/nutch/indexer/IndexingJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@

import org.apache.commons.lang3.time.StopWatch;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.segment.SegmentChecker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.StringUtils;
Expand Down Expand Up @@ -143,6 +145,8 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
+ RANDOM.nextInt());

FileOutputFormat.setOutputPath(job, tmp);
// Driver-level error tracking: categorization + LOG.error only (no job counters; see ErrorTracker Javadoc).
Comment thread
lewismc marked this conversation as resolved.
ErrorTracker errorTracker = new ErrorTracker(NutchMetrics.GROUP_INDEXER);
try {
try{
boolean success = job.waitForCompletion(true);
Expand All @@ -155,6 +159,25 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
LOG.error(StringUtils.stringifyException(e));
throw e;
}
Path latencyDir = new Path(tmp, "_latency");
FileSystem fs = tmp.getFileSystem(conf);
if (fs.exists(latencyDir)) {
try (Job mergeJob = IndexerMapReduce.createLatencyMergeJob(conf, latencyDir)) {
FileOutputFormat.setOutputPath(mergeJob, new Path(tmp, "_latency_merge_out"));
boolean mergeSuccess = mergeJob.waitForCompletion(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2026-04-08 21:38:08,406 ERROR o.a.n.i.IndexingJob [main] Indexer: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/mnt/data/wastl/proj/crawler/nutch/test/tmp_1775677086987-2006747810/_latency
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:342)
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:281)
        at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:445)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:311)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:328)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:201)
        at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1677)
        at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1674)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1674)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1695)
        at org.apache.nutch.indexer.IndexingJob.index(IndexingJob.java:167)
        at org.apache.nutch.indexer.IndexingJob.run(IndexingJob.java:320)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:82)
        at org.apache.nutch.indexer.IndexingJob.main(IndexingJob.java:329)

when running

bin/nutch index -Dplugin.includes='indexer-dummy|index-(basic|more)' -nocrawldb /path/to/segments/20260408205641/
```

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or when running on a single-node cluster (indexing to Solr):

2026-04-08 21:50:01,953 ERROR indexer.IndexingJob: Indexer: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/wastl/tmp_1775677717095--1619189953/_latency

This only affects the latency-merge job.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked into SequenceFileInputFormat.listStatus: it either takes a single file or a directory tree with sequence files _latency/part-xxx/data.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I wondered about this. I am not a huge fan of the intermediate output being written for IndexerJob either. I think we could even remove the changes for this job and address them separately. This will NOT have an impact on the Job execution... however the counters are not accurate.

if (!mergeSuccess) {
LOG.error("Indexer Latency Merge job failed");
errorTracker.recordError(ErrorTracker.ErrorType.OTHER);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOG.error("Indexer Latency Merge job failed: {}", e.getMessage());
errorTracker.recordError(e);
throw e;
}
}
LOG.info("Indexer: number of documents indexed, deleted, or skipped:");
for (Counter counter : job.getCounters()
.getGroup(NutchMetrics.GROUP_INDEXER)) {
Expand Down
29 changes: 23 additions & 6 deletions src/java/org/apache/nutch/metrics/ErrorTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
* based on exception type. It uses a bounded set of error categories to stay within
* Hadoop's counter limits (~120 counters).
*
* <p>Usage:
* <p><b>Usage in mapper/reducer or task threads:</b>
* <pre>
* // In mapper/reducer setup or thread initialization
* errorTracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
* // or with context for cached counters:
* errorTracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER, context);
*
* // When catching exceptions
* try {
Expand All @@ -49,11 +51,20 @@
* // Or with manual categorization
* errorTracker.recordError(ErrorTracker.ErrorType.NETWORK);
*
* // In cleanup - emit all error counters
* // In cleanup - emit all error counters to the job
* errorTracker.emitCounters(context);
* </pre>
*
* <p>Emits the following counters:
* <p><b>Usage in driver/client code (no task context):</b>
* When used in a job driver or other code that does not run inside a mapper/reducer,
* create an ErrorTracker with the single-argument constructor (counter group only).
* Call {@link #recordError(Throwable)} or {@link #recordError(ErrorTracker.ErrorType)}
* for consistent error categorization. Do <em>not</em> call {@link #emitCounters(TaskInputOutputContext)};
* Hadoop counters can only be written from within a task, so counts remain in-memory only.
* This allows the same categorization and logging pattern (e.g. with LOG.error) as in
* tasks, without emitting to job counters.
*
* <p>Emits the following counters (when used inside a task and emitCounters is called):
* <ul>
* <li>errors_total - total number of errors across all categories</li>
* <li>errors_network_total - network-related errors</li>
Expand Down Expand Up @@ -104,9 +115,15 @@ public enum ErrorType {
/**
* Creates a new ErrorTracker for the specified counter group.
*
* <p>This constructor creates an ErrorTracker without cached counters.
* Call {@link #initCounters(TaskInputOutputContext)} in setup() to cache
* counter references for better performance.
* <p>Use in mapper/reducer setup or thread initialization: call
* {@link #initCounters(TaskInputOutputContext)} in setup() to cache counter
* references, then {@link #emitCounters(TaskInputOutputContext)} in cleanup to
* emit counts to the job.
*
* <p>Use in driver/client code (no task context): do not call initCounters or
* emitCounters. Only {@link #recordError(Throwable)} and
* {@link #recordError(ErrorTracker.ErrorType)} are used; counts stay in-memory
* for consistent categorization and logging (e.g. with LOG.error).
*
* @param group the Hadoop counter group name (e.g., NutchMetrics.GROUP_FETCHER)
*/
Expand Down
Loading
Loading