1818class Buffer {
1919
2020 private static final long bufferActiveBit = 1L << 63 ;
21- private final AtomicLong observationCount = new AtomicLong ( 0 ) ;
21+ private final AtomicLong [] stripedObservationCounts ;
2222 private double [] observationBuffer = new double [0 ];
2323 private int bufferPos = 0 ;
2424 private boolean reset = false ;
@@ -27,8 +27,18 @@ class Buffer {
2727 ReentrantLock runLock = new ReentrantLock ();
2828 Condition bufferFilled = appendLock .newCondition ();
2929
30+ Buffer () {
31+ stripedObservationCounts = new AtomicLong [Runtime .getRuntime ().availableProcessors ()];
32+ for (int i = 0 ; i < stripedObservationCounts .length ; i ++) {
33+ stripedObservationCounts [i ] = new AtomicLong (0 );
34+ }
35+ }
36+
3037 boolean append (double value ) {
31- long count = observationCount .incrementAndGet ();
38+ AtomicLong observationCountForThread =
39+ stripedObservationCounts [
40+ ((int ) Thread .currentThread ().getId ()) % stripedObservationCounts .length ];
41+ long count = observationCountForThread .incrementAndGet ();
3242 if ((count & bufferActiveBit ) == 0 ) {
3343 return false ; // sign bit not set -> buffer not active.
3444 } else {
@@ -69,7 +79,10 @@ <T extends DataPointSnapshot> T run(
6979 runLock .lock ();
7080 try {
7181 // Signal that the buffer is active.
72- Long expectedCount = observationCount .getAndAdd (bufferActiveBit );
82+ Long expectedCount = 0L ;
83+ for (AtomicLong observationCount : stripedObservationCounts ) {
84+ expectedCount += observationCount .getAndAdd (bufferActiveBit );
85+ }
7386
7487 while (!complete .apply (expectedCount )) {
7588 // Wait until all in-flight threads have added their observations to the histogram /
@@ -81,14 +94,18 @@ <T extends DataPointSnapshot> T run(
8194 result = createResult .get ();
8295
8396 // Signal that the buffer is inactive.
84- int expectedBufferSize ;
97+ long expectedBufferSize = 0 ;
8598 if (reset ) {
86- expectedBufferSize =
87- (int ) ((observationCount .getAndSet (0 ) & ~bufferActiveBit ) - expectedCount );
99+ for (AtomicLong observationCount : stripedObservationCounts ) {
100+ expectedBufferSize += (int ) (observationCount .getAndSet (0 ) & ~bufferActiveBit );
101+ }
88102 reset = false ;
89103 } else {
90- expectedBufferSize = (int ) (observationCount .addAndGet (bufferActiveBit ) - expectedCount );
104+ for (AtomicLong observationCount : stripedObservationCounts ) {
105+ expectedBufferSize += (int ) observationCount .addAndGet (bufferActiveBit );
106+ }
91107 }
108+ expectedBufferSize -= expectedCount ;
92109
93110 appendLock .lock ();
94111 try {
0 commit comments