Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -68,6 +69,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
long scheduleDelayNanos,
int maxQueueSize,
int maxExportBatchSize,
int maxConcurrentExports,
long exporterTimeoutNanos) {
this.worker =
new Worker(
Expand All @@ -76,6 +78,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
telemetryVersion,
scheduleDelayNanos,
maxExportBatchSize,
maxConcurrentExports,
exporterTimeoutNanos,
new ArrayBlockingQueue<>(maxQueueSize),
maxQueueSize); // TODO: use JcTools.newFixedSizeQueue(..)
Expand Down Expand Up @@ -161,13 +164,15 @@ private static final class Worker implements Runnable {
private volatile boolean continueWork = true;
private final ArrayList<LogRecordData> batch;
private final long maxQueueSize;
private final Semaphore concurrencyLimiter;

private Worker(
LogRecordExporter logRecordExporter,
Supplier<MeterProvider> meterProvider,
InternalTelemetryVersion telemetryVersion,
long scheduleDelayNanos,
int maxExportBatchSize,
int maxConcurrentExports,
long exporterTimeoutNanos,
Queue<ReadWriteLogRecord> queue,
long maxQueueSize) {
Expand All @@ -180,6 +185,7 @@ private Worker(
logProcessorInstrumentation =
LogRecordProcessorInstrumentation.get(telemetryVersion, COMPONENT_ID, meterProvider);
this.maxQueueSize = maxQueueSize;
this.concurrencyLimiter = new Semaphore(maxConcurrentExports - 1, true);

this.batch = new ArrayList<>(this.maxExportBatchSize);
}
Expand Down Expand Up @@ -234,10 +240,10 @@ private void flush() {
batch.add(logRecord.toLogRecordData());
logsToFlush--;
if (batch.size() >= maxExportBatchSize) {
exportCurrentBatch();
exportCurrentBatchSync();
}
}
exportCurrentBatch();
exportCurrentBatchSync();
CompletableResultCode flushResult = flushRequested.get();
if (flushResult != null) {
flushResult.succeed();
Expand Down Expand Up @@ -284,6 +290,15 @@ private CompletableResultCode forceFlush() {
}

private void exportCurrentBatch() {
if (!concurrencyLimiter.tryAcquire()) {
exportCurrentBatchSync();
return;
}

exportCurrentBatchAsync();
}

private void exportCurrentBatchSync() {
if (batch.isEmpty()) {
return;
}
Expand All @@ -309,5 +324,31 @@ private void exportCurrentBatch() {
batch.clear();
}
}

private void exportCurrentBatchAsync() {
if (batch.isEmpty()) {
return;
}

List<LogRecordData> batchCopy = new ArrayList<>(batch);
batch.clear();

CompletableResultCode result =
logRecordExporter.export(Collections.unmodifiableList(batchCopy));
result.whenComplete(
() -> {
String error = null;
if (!result.isSuccess()) {
logger.log(Level.FINE, "Exporter failed");
if (result.getFailureThrowable() != null) {
error = result.getFailureThrowable().getClass().getName();
} else {
error = "export_failed";
}
}
logProcessorInstrumentation.finishLogs(batchCopy.size(), error);
concurrencyLimiter.release();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ public final class BatchLogRecordProcessorBuilder {
// Visible for testing
static final int DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
// Visible for testing
static final int DEFAULT_MAX_CONCURRENT_EXPORTS = 1;
// Visible for testing
static final int DEFAULT_EXPORT_TIMEOUT_MILLIS = 30_000;

private final LogRecordExporter logRecordExporter;
private long scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_SCHEDULE_DELAY_MILLIS);
private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE;
private int maxConcurrentExports = DEFAULT_MAX_CONCURRENT_EXPORTS;
private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS);
private Supplier<MeterProvider> meterProvider = MeterProvider::noop;
private InternalTelemetryVersion telemetryVersion = InternalTelemetryVersion.LEGACY;
Expand Down Expand Up @@ -135,6 +138,21 @@ public BatchLogRecordProcessorBuilder setMaxExportBatchSize(int maxExportBatchSi
return this;
}

/**
* Sets the maximum number of concurrent exports.
*
* <p>Default value is {@code 1}.
*
* @param maxConcurrentExports the maximum number of concurrent exports.
* @return this.
* @see BatchLogRecordProcessorBuilder#DEFAULT_MAX_CONCURRENT_EXPORTS
*/
public BatchLogRecordProcessorBuilder setMaxConcurrentExports(int maxConcurrentExports) {
checkArgument(maxConcurrentExports > 0, "maxConcurrentExports must be positive.");
this.maxConcurrentExports = maxConcurrentExports;
return this;
}

/**
* Sets the {@link MeterProvider} to use to collect metrics related to batch export. If not set,
* metrics will not be collected.
Expand Down Expand Up @@ -174,6 +192,11 @@ int getMaxExportBatchSize() {
return maxExportBatchSize;
}

// Visible for testing
int getMaxConcurrentExports() {
return maxConcurrentExports;
}

/**
* Returns a new {@link BatchLogRecordProcessor} that batches, then forwards them to the given
* {@code logRecordExporter}.
Expand All @@ -195,6 +218,7 @@ public BatchLogRecordProcessor build() {
scheduleDelayNanos,
maxQueueSize,
maxExportBatchSize,
maxConcurrentExports,
exporterTimeoutNanos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void builderDefaults() {
.isEqualTo(BatchLogRecordProcessorBuilder.DEFAULT_MAX_QUEUE_SIZE);
assertThat(builder.getMaxExportBatchSize())
.isEqualTo(BatchLogRecordProcessorBuilder.DEFAULT_MAX_EXPORT_BATCH_SIZE);
assertThat(builder.getMaxConcurrentExports())
.isEqualTo(BatchLogRecordProcessorBuilder.DEFAULT_MAX_CONCURRENT_EXPORTS);
assertThat(builder.getExporterTimeoutNanos())
.isEqualTo(
TimeUnit.MILLISECONDS.toNanos(
Expand Down Expand Up @@ -119,6 +121,10 @@ void builderInvalidConfig() {
() -> BatchLogRecordProcessor.builder(mockLogRecordExporter).setMaxQueueSize(0))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("maxQueueSize must be positive.");
assertThatThrownBy(
() -> BatchLogRecordProcessor.builder(mockLogRecordExporter).setMaxConcurrentExports(0))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("maxConcurrentExports must be positive.");
}

@Test
Expand Down
Loading