Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.config.TracerConfig;
import datadog.trace.core.monitor.HealthMetrics;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LongRunningTracesTracker {
private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTracesTracker.class);

private final DDAgentFeaturesDiscovery features;
private final HealthMetrics healthMetrics;
private long lastFlushMilli = 0;
Expand All @@ -21,6 +26,7 @@ public class LongRunningTracesTracker {
private int dropped = 0;
private int write = 0;
private int expired = 0;
private int droppedSampling = 0;

public static final int NOT_TRACKED = -1;
public static final int UNDEFINED = 0;
Expand All @@ -41,6 +47,16 @@ public LongRunningTracesTracker(
(int) TimeUnit.SECONDS.toMillis(config.getLongRunningTraceFlushInterval());
this.features = sharedCommunicationObjects.featuresDiscovery(config);
this.healthMetrics = healthMetrics;

if (!features.supportsLongRunning()) {
LOGGER.warn(
"Long running trace tracking is enabled via {}, however the Datadog Agent version {} does not support receiving long running traces. "
+ "Long running traces will be tracked locally in memory (up to {} traces) but will NOT be sent to the agent. "
+ "Long running traces are included in tracer flares.",
"dd." + TracerConfig.TRACE_LONG_RUNNING_ENABLED,
features.getVersion() != null ? features.getVersion() : "unknown",
maxTrackedTraces);
}
}

public boolean add(PendingTraceBuffer.Element element) {
Expand Down Expand Up @@ -78,7 +94,7 @@ public void flushAndCompact(long nowMilli) {
cleanSlot(i);
continue;
}
if (trace.empty() || !features.supportsLongRunning()) {
if (trace.empty()) {
trace.compareAndSetLongRunningState(WRITE_RUNNING_SPANS, NOT_TRACKED);
cleanSlot(i);
continue;
Expand All @@ -92,12 +108,15 @@ public void flushAndCompact(long nowMilli) {
if (shouldFlush(nowMilli, trace)) {
if (negativeOrNullPriority(trace)) {
trace.compareAndSetLongRunningState(TRACKED, NOT_TRACKED);
droppedSampling++;
cleanSlot(i);
continue;
}
trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS);
write++;
trace.write();
if (features.supportsLongRunning()) {
trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS);
write++;
trace.write();
}
}
i++;
}
Expand Down Expand Up @@ -134,9 +153,10 @@ private boolean negativeOrNullPriority(PendingTrace trace) {
}

private void flushStats() {
healthMetrics.onLongRunningUpdate(dropped, write, expired);
healthMetrics.onLongRunningUpdate(dropped, write, expired, droppedSampling);
dropped = 0;
write = 0;
expired = 0;
droppedSampling = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void onSend(
public void onFailedSend(
final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {}

public void onLongRunningUpdate(final int dropped, final int write, final int expired) {}
public void onLongRunningUpdate(
final int dropped, final int write, final int expired, final int droppedSampling) {}

/**
* Report that a trace has been used to compute client stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
private final LongAdder longRunningTracesWrite = new LongAdder();
private final LongAdder longRunningTracesDropped = new LongAdder();
private final LongAdder longRunningTracesExpired = new LongAdder();
private final LongAdder longRunningTracesDroppedSampling = new LongAdder();

private final LongAdder clientStatsProcessedSpans = new LongAdder();
private final LongAdder clientStatsProcessedTraces = new LongAdder();
Expand Down Expand Up @@ -296,10 +297,12 @@ public void onFailedSend(
}

@Override
public void onLongRunningUpdate(final int dropped, final int write, final int expired) {
public void onLongRunningUpdate(
final int dropped, final int write, final int expired, final int droppedSampling) {
longRunningTracesWrite.add(write);
longRunningTracesDropped.add(dropped);
longRunningTracesExpired.add(expired);
longRunningTracesDroppedSampling.add(droppedSampling);
}

private void onSendAttempt(
Expand Down Expand Up @@ -479,6 +482,11 @@ public void run(TracerHealthMetrics target) {
target.statsd, "long-running.dropped", target.longRunningTracesDropped, NO_TAGS);
reportIfChanged(
target.statsd, "long-running.expired", target.longRunningTracesExpired, NO_TAGS);
reportIfChanged(
target.statsd,
"long-running.dropped_sampling",
target.longRunningTracesDroppedSampling,
NO_TAGS);

reportIfChanged(
target.statsd, "stats.traces_in", target.clientStatsProcessedTraces, NO_TAGS);
Expand Down Expand Up @@ -608,6 +616,8 @@ public String summary() {
+ longRunningTracesDropped.sum()
+ "\nlongRunningTracesExpired="
+ longRunningTracesExpired.sum()
+ "\nlongRunningTracesDroppedSampling="
+ longRunningTracesDroppedSampling.sum()
+ "\n"
+ "\nclientStatsRequests="
+ clientStatsRequests.sum()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class LongRunningTracesTrackerTest extends DDSpecification {
trace.longRunningTrackedState == LongRunningTracesTracker.EXPIRED
}

def "agent disabled feature"() {
def "trace remains tracked but not written when agent long running feature not available"() {
given:
def trace = newTraceToTrack()
tracker.add(trace)
Expand All @@ -133,7 +133,9 @@ class LongRunningTracesTrackerTest extends DDSpecification {

then:
1 * features.supportsLongRunning() >> false
tracker.traceArray.size() == 0
tracker.traceArray.size() == 1
tracker.traceArray[0].longRunningTrackedState == LongRunningTracesTracker.TRACKED
tracker.traceArray[0].getLastWriteTime() == 0
}

def flushAt(long timeMilli) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ This can manifest when creating mocks.
@Timeout(5)
class PendingTraceBufferTest extends DDSpecification {
@Subject
def buffer = PendingTraceBuffer.delaying(SystemTimeSource.INSTANCE, Mock(Config), null, null)
def buffer = PendingTraceBuffer.delaying(SystemTimeSource.INSTANCE, Mock(Config), null, HealthMetrics.NO_OP)
def bufferSpy = Spy(buffer)

def tracer = Mock(CoreTracer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,13 @@ class HealthMetricsTest extends Specification {
def healthMetrics = new TracerHealthMetrics(new Latched(statsD, latch), 100, TimeUnit.MILLISECONDS)
healthMetrics.start()
when:
healthMetrics.onLongRunningUpdate(3,10,1)
healthMetrics.onLongRunningUpdate(3,10,1,5)
latch.await(10, TimeUnit.SECONDS)
then:
1 * statsD.count("long-running.write", 10, _)
1 * statsD.count("long-running.dropped", 3, _)
1 * statsD.count("long-running.expired", 1, _)
1 * statsD.count("long-running.dropped_sampling", 5, _)
cleanup:
healthMetrics.close()
}
Expand Down