Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
class Buffer {

private static final long bufferActiveBit = 1L << 63;
private final AtomicLong observationCount = new AtomicLong(0);
private final AtomicLong[] stripedObservationCounts;
private double[] observationBuffer = new double[0];
private int bufferPos = 0;
private boolean reset = false;
Expand All @@ -27,8 +27,18 @@ class Buffer {
ReentrantLock runLock = new ReentrantLock();
Condition bufferFilled = appendLock.newCondition();

Buffer() {
stripedObservationCounts = new AtomicLong[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < stripedObservationCounts.length; i++) {
stripedObservationCounts[i] = new AtomicLong(0);
}
}

boolean append(double value) {
long count = observationCount.incrementAndGet();
AtomicLong observationCountForThread =
stripedObservationCounts[
((int) Thread.currentThread().getId()) % stripedObservationCounts.length];
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to consider is whether there is a better algorithm for accessing one of these striped instances. With this algorithm based on thread id, its conceivable that the thread ids could be arranged in such a way that there end up being "hot spots" (i.e. AtomicLong instances that end up receiving a disproportionate amount of the updates). LongAdder solves this by using a random number generate to select which "cell" to access.

long count = observationCountForThread.incrementAndGet();
if ((count & bufferActiveBit) == 0) {
return false; // sign bit not set -> buffer not active.
} else {
Expand Down Expand Up @@ -69,7 +79,10 @@ <T extends DataPointSnapshot> T run(
runLock.lock();
try {
// Signal that the buffer is active.
Long expectedCount = observationCount.getAndAdd(bufferActiveBit);
Long expectedCount = 0L;
for (AtomicLong observationCount : stripedObservationCounts) {
expectedCount += observationCount.getAndAdd(bufferActiveBit);
}

while (!complete.apply(expectedCount)) {
// Wait until all in-flight threads have added their observations to the histogram /
Expand All @@ -81,14 +94,18 @@ <T extends DataPointSnapshot> T run(
result = createResult.get();

// Signal that the buffer is inactive.
int expectedBufferSize;
long expectedBufferSize = 0;
if (reset) {
expectedBufferSize =
(int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount);
for (AtomicLong observationCount : stripedObservationCounts) {
expectedBufferSize += (int) (observationCount.getAndSet(0) & ~bufferActiveBit);
}
reset = false;
} else {
expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount);
for (AtomicLong observationCount : stripedObservationCounts) {
expectedBufferSize += (int) observationCount.addAndGet(bufferActiveBit);
}
}
expectedBufferSize -= expectedCount;

appendLock.lock();
try {
Expand Down