Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import de.rub.nds.crawler.data.ScanConfig;
import de.rub.nds.crawler.data.ScanJobDescription;
import de.rub.nds.crawler.data.ScanTarget;
import de.rub.nds.crawler.persistence.IPersistenceProvider;
import de.rub.nds.crawler.util.CanceallableThreadPoolExecutor;
import de.rub.nds.scanner.core.execution.NamedThreadFactory;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -43,9 +42,6 @@ public abstract class BulkScanWorker<T extends ScanConfig> {
/** The scan configuration for this worker */
protected final T scanConfig;

/** The persistence provider for writing partial results */
protected final IPersistenceProvider persistenceProvider;

/**
* Calls the inner scan function and may handle cleanup. This is needed to wrap the scanner into
* a future object such that we can handle timeouts properly.
Expand All @@ -60,16 +56,10 @@ public abstract class BulkScanWorker<T extends ScanConfig> {
* @param scanConfig The scan configuration for this worker
* @param parallelScanThreads The number of parallel scan threads to use, i.e., how many {@link
* ScanTarget}s to handle in parallel.
* @param persistenceProvider The persistence provider for writing partial results
*/
protected BulkScanWorker(
String bulkScanId,
T scanConfig,
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
protected BulkScanWorker(String bulkScanId, T scanConfig, int parallelScanThreads) {
this.bulkScanId = bulkScanId;
this.scanConfig = scanConfig;
this.persistenceProvider = persistenceProvider;

timeoutExecutor =
new CanceallableThreadPoolExecutor(
Expand All @@ -94,24 +84,33 @@ protected BulkScanWorker(
* <li>Wait for the final result via {@link ProgressableFuture#get()}
* </ul>
*
* <p>The optional {@code partialResultCallback} is invoked once for every partial result the
* scan emits. The framework does not throttle, persist, or otherwise post-process partial
* results; the caller owns that policy. Throw-safe: exceptions from the callback are logged and
* swallowed so the scan continues.
*
* @param jobDescription The job description for this scan.
* @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore
* partials.
* @return A ProgressableFuture representing the scan lifecycle
*/
public ProgressableFuture<Document> handle(ScanJobDescription jobDescription) {
public ProgressableFuture<Document> handle(
ScanJobDescription jobDescription, Consumer<Document> partialResultCallback) {
// if we initialized ourself, we also clean up ourself
shouldCleanupSelf.weakCompareAndSetAcquire(false, init());
activeJobs.incrementAndGet();

ProgressableFuture<Document> progressableFuture = new ProgressableFuture<>();

// Compose a consumer that both updates the future and persists partial results
Consumer<Document> progressConsumer =
partialResult -> {
progressableFuture.updateResult(partialResult);
try {
persistenceProvider.upsertPartialResult(jobDescription, partialResult);
} catch (Exception e) {
LOGGER.warn("Failed to persist partial result, continuing scan", e);
if (partialResultCallback != null) {
try {
partialResultCallback.accept(partialResult);
} catch (Exception e) {
LOGGER.warn("Partial result callback threw, continuing scan", e);
}
}
};

Expand All @@ -132,6 +131,16 @@ public ProgressableFuture<Document> handle(ScanJobDescription jobDescription) {
return progressableFuture;
}

/**
* Convenience overload for callers that do not need partial results.
*
* @param jobDescription The job description for this scan.
* @return A ProgressableFuture representing the scan lifecycle
*/
public ProgressableFuture<Document> handle(ScanJobDescription jobDescription) {
return handle(jobDescription, null);
}

/**
* Scans a target and returns the result as a Document. This is the core scanning functionality
* that must be implemented by subclasses.
Expand Down
55 changes: 24 additions & 31 deletions src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import de.rub.nds.crawler.data.BulkScanInfo;
import de.rub.nds.crawler.data.ScanConfig;
import de.rub.nds.crawler.data.ScanJobDescription;
import de.rub.nds.crawler.persistence.IPersistenceProvider;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang3.exception.UncheckedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -58,55 +58,57 @@ public static BulkScanWorkerManager getInstance() {

/**
* Static convenience method to handle a scan job. See also {@link #handle(ScanJobDescription,
* int, int, IPersistenceProvider)}.
* int, int, Consumer)}.
*
* @param scanJobDescription The scan job to handle
* @param parallelConnectionThreads The number of parallel connection threads to use (used to
* create worker if it does not exist)
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @param persistenceProvider The persistence provider for writing partial results
* @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore
* partials.
* @return A ProgressableFuture representing the scan lifecycle
*/
public static ProgressableFuture<Document> handleStatic(
ScanJobDescription scanJobDescription,
int parallelConnectionThreads,
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
Consumer<Document> partialResultCallback) {
return handleStatic(
scanJobDescription,
parallelConnectionThreads,
parallelScanThreads,
1,
persistenceProvider);
partialResultCallback);
}

/**
* Static convenience method to handle a scan job. See also {@link #handle(ScanJobDescription,
* int, int, int, IPersistenceProvider)}.
* int, int, int, Consumer)}.
*
* @param scanJobDescription The scan job to handle
* @param parallelConnectionThreads The number of parallel connection threads to use (used to
* create worker if it does not exist)
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @param parallelProbes The number of probes to run in parallel per scan target
* @param persistenceProvider The persistence provider for writing partial results
* @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore
* partials.
* @return A ProgressableFuture representing the scan lifecycle
*/
public static ProgressableFuture<Document> handleStatic(
ScanJobDescription scanJobDescription,
int parallelConnectionThreads,
int parallelScanThreads,
int parallelProbes,
IPersistenceProvider persistenceProvider) {
Consumer<Document> partialResultCallback) {
BulkScanWorkerManager manager = getInstance();
return manager.handle(
scanJobDescription,
parallelConnectionThreads,
parallelScanThreads,
parallelProbes,
persistenceProvider);
partialResultCallback);
}

private final Cache<String, BulkScanWorker<?>> bulkScanWorkers;
Expand Down Expand Up @@ -136,23 +138,16 @@ private BulkScanWorkerManager() {
* create worker if it does not exist)
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @param persistenceProvider The persistence provider for writing partial results
* @return A bulk scan worker for the specified bulk scan
* @throws UncheckedException If a worker cannot be created
*/
public BulkScanWorker<?> getBulkScanWorker(
String bulkScanId,
ScanConfig scanConfig,
int parallelConnectionThreads,
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
int parallelScanThreads) {
return getBulkScanWorker(
bulkScanId,
scanConfig,
parallelConnectionThreads,
parallelScanThreads,
1,
persistenceProvider);
bulkScanId, scanConfig, parallelConnectionThreads, parallelScanThreads, 1);
}

/**
Expand All @@ -166,7 +161,6 @@ public BulkScanWorker<?> getBulkScanWorker(
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @param parallelProbes The number of probes to run in parallel per scan target
* @param persistenceProvider The persistence provider for writing partial results
* @return A bulk scan worker for the specified bulk scan
* @throws UncheckedException If a worker cannot be created
*/
Expand All @@ -175,8 +169,7 @@ public BulkScanWorker<?> getBulkScanWorker(
ScanConfig scanConfig,
int parallelConnectionThreads,
int parallelScanThreads,
int parallelProbes,
IPersistenceProvider persistenceProvider) {
int parallelProbes) {
try {
return bulkScanWorkers.get(
bulkScanId,
Expand All @@ -186,8 +179,7 @@ public BulkScanWorker<?> getBulkScanWorker(
bulkScanId,
parallelConnectionThreads,
parallelScanThreads,
parallelProbes,
persistenceProvider);
parallelProbes);
ret.init();
return ret;
});
Expand All @@ -206,20 +198,21 @@ public BulkScanWorker<?> getBulkScanWorker(
* create worker if it does not exist)
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @param persistenceProvider The persistence provider for writing partial results
* @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore
* partials.
* @return A ProgressableFuture representing the scan lifecycle
*/
public ProgressableFuture<Document> handle(
ScanJobDescription scanJobDescription,
int parallelConnectionThreads,
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
Consumer<Document> partialResultCallback) {
return handle(
scanJobDescription,
parallelConnectionThreads,
parallelScanThreads,
1,
persistenceProvider);
partialResultCallback);
}

/**
Expand All @@ -232,24 +225,24 @@ public ProgressableFuture<Document> handle(
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @param parallelProbes The number of probes to run in parallel per scan target
* @param persistenceProvider The persistence provider for writing partial results
* @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore
* partials.
* @return A ProgressableFuture representing the scan lifecycle
*/
public ProgressableFuture<Document> handle(
ScanJobDescription scanJobDescription,
int parallelConnectionThreads,
int parallelScanThreads,
int parallelProbes,
IPersistenceProvider persistenceProvider) {
Consumer<Document> partialResultCallback) {
BulkScanInfo bulkScanInfo = scanJobDescription.getBulkScanInfo();
BulkScanWorker<?> worker =
getBulkScanWorker(
bulkScanInfo.getBulkScanId(),
bulkScanInfo.getScanConfig(),
parallelConnectionThreads,
parallelScanThreads,
parallelProbes,
persistenceProvider);
return worker.handle(scanJobDescription);
parallelProbes);
return worker.handle(scanJobDescription, partialResultCallback);
}
}
5 changes: 4 additions & 1 deletion src/main/java/de/rub/nds/crawler/core/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import de.rub.nds.crawler.persistence.IPersistenceProvider;
import de.rub.nds.scanner.core.execution.NamedThreadFactory;
import java.util.concurrent.*;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.Document;
Expand Down Expand Up @@ -94,13 +95,15 @@ private ScanResult waitForScanResult(

private void handleScanJob(ScanJobDescription scanJobDescription) {
LOGGER.info("Received scan job for {}", scanJobDescription.getScanTarget());
Consumer<Document> partialResultCallback =
partial -> persistenceProvider.upsertPartialResult(scanJobDescription, partial);
ProgressableFuture<Document> progressableFuture =
BulkScanWorkerManager.handleStatic(
scanJobDescription,
parallelConnectionThreads,
parallelScanThreads,
parallelProbes,
persistenceProvider);
partialResultCallback);

workerExecutor.submit(
() -> {
Expand Down
17 changes: 4 additions & 13 deletions src/main/java/de/rub/nds/crawler/data/ScanConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package de.rub.nds.crawler.data;

import de.rub.nds.crawler.core.BulkScanWorker;
import de.rub.nds.crawler.persistence.IPersistenceProvider;
import de.rub.nds.scanner.core.config.ScannerDetail;
import de.rub.nds.scanner.core.probe.ProbeType;
import java.io.Serializable;
Expand Down Expand Up @@ -123,35 +122,27 @@ public void setExcludedProbes(List<ProbeType> excludedProbes) {
* @param bulkScanID The ID of the bulk scan this worker is for
* @param parallelConnectionThreads The number of parallel connection threads to use
* @param parallelScanThreads The number of parallel scan threads to use
* @param persistenceProvider The persistence provider for writing partial results
* @return A worker for this scan configuration
*/
public abstract BulkScanWorker<? extends ScanConfig> createWorker(
String bulkScanID,
int parallelConnectionThreads,
int parallelScanThreads,
IPersistenceProvider persistenceProvider);
String bulkScanID, int parallelConnectionThreads, int parallelScanThreads);

/**
* Creates a worker for this scan configuration and configures probe-level parallelism.
* Implementations can override this method to support probe-level parallelism directly. Default
* behavior delegates to {@link #createWorker(String, int, int, IPersistenceProvider)} for
* backward compatibility.
* behavior delegates to {@link #createWorker(String, int, int)}.
*
* @param bulkScanID The ID of the bulk scan this worker is for
* @param parallelConnectionThreads The number of parallel connection threads to use
* @param parallelScanThreads The number of parallel scan threads to use
* @param parallelProbes The number of probes to run in parallel per scan target
* @param persistenceProvider The persistence provider for writing partial results
* @return A worker for this scan configuration
*/
public BulkScanWorker<? extends ScanConfig> createWorker(
String bulkScanID,
int parallelConnectionThreads,
int parallelScanThreads,
int parallelProbes,
IPersistenceProvider persistenceProvider) {
return createWorker(
bulkScanID, parallelConnectionThreads, parallelScanThreads, persistenceProvider);
int parallelProbes) {
return createWorker(bulkScanID, parallelConnectionThreads, parallelScanThreads);
}
}
Loading