|
30 | 30 | import java.util.Objects; |
31 | 31 | import java.util.concurrent.ConcurrentHashMap; |
32 | 32 | import java.util.concurrent.ConcurrentLinkedQueue; |
33 | | -import java.util.concurrent.atomic.AtomicInteger; |
| 33 | +import java.util.concurrent.locks.ReentrantLock; |
34 | 34 | import java.util.logging.Level; |
35 | 35 | import java.util.logging.Logger; |
36 | 36 | import javax.annotation.Nullable; |
@@ -242,7 +242,6 @@ AggregatorHandle<T> maybeGetPooledAggregatorHandle() { |
242 | 242 | private AggregatorHolder<T> getHolderForRecord() { |
243 | 243 | AggregatorHolder<T> aggregatorHolder = this.aggregatorHolder; |
244 | 244 | while (!aggregatorHolder.tryAcquireForRecord()) { |
245 | | - aggregatorHolder.releaseForRecord(); |
246 | 245 | aggregatorHolder = this.aggregatorHolder; |
247 | 246 | Thread.yield(); |
248 | 247 | } |
@@ -369,44 +368,38 @@ private static class AggregatorHolder<T extends PointData> { |
369 | 368 | // (AggregatorHolder), and so if a recording thread encounters an odd value, |
370 | 369 | // all it needs to do is release the "read lock" it just obtained (decrementing by 2), |
371 | 370 | // and then grab and record against the new current interval (AggregatorHolder). |
372 | | - private final AtomicInteger[] activeRecordingThreads; |
| 371 | + private final ReentrantLock[] locks; |
373 | 372 |
|
374 | 373 | private AggregatorHolder() { |
375 | 374 | this(new ConcurrentHashMap<>()); |
376 | 375 | } |
377 | 376 |
|
378 | 377 | private AggregatorHolder(ConcurrentHashMap<Attributes, AggregatorHandle<T>> aggregatorHandles) { |
379 | 378 | this.aggregatorHandles = aggregatorHandles; |
380 | | - activeRecordingThreads = new AtomicInteger[Runtime.getRuntime().availableProcessors()]; |
381 | | - for (int i = 0; i < activeRecordingThreads.length; i++) { |
382 | | - activeRecordingThreads[i] = new AtomicInteger(0); |
| 379 | + locks = new ReentrantLock[Runtime.getRuntime().availableProcessors()]; |
| 380 | + for (int i = 0; i < locks.length; i++) { |
| 381 | + locks[i] = new ReentrantLock(); |
383 | 382 | } |
384 | 383 | } |
385 | 384 |
|
386 | 385 | private boolean tryAcquireForRecord() { |
387 | | - return forThread().addAndGet(2) % 2 == 0; |
| 386 | + return forThread().tryLock(); |
388 | 387 | } |
389 | 388 |
|
390 | 389 | private void releaseForRecord() { |
391 | | - forThread().addAndGet(-2); |
| 390 | + forThread().unlock(); |
392 | 391 | } |
393 | 392 |
|
394 | 393 | @SuppressWarnings("ThreadPriorityCheck") |
395 | 394 | private void acquireForCollect() { |
396 | | - for (int i = 0; i < activeRecordingThreads.length; i++) { |
397 | | - activeRecordingThreads[i].addAndGet(1); |
398 | | - } |
399 | | - for (int i = 0; i < activeRecordingThreads.length; i++) { |
400 | | - AtomicInteger val = activeRecordingThreads[i]; |
401 | | - while (val.get() > 1) { |
402 | | - Thread.yield(); |
403 | | - } |
| 395 | + for (int i = 0; i < locks.length; i++) { |
| 396 | + locks[i].lock(); |
404 | 397 | } |
405 | 398 | } |
406 | 399 |
|
407 | | - private AtomicInteger forThread() { |
408 | | - int index = Math.abs((int) (Thread.currentThread().getId() % activeRecordingThreads.length)); |
409 | | - return activeRecordingThreads[index]; |
| 400 | + private ReentrantLock forThread() { |
| 401 | + int index = Math.abs((int) (Thread.currentThread().getId() % locks.length)); |
| 402 | + return locks[index]; |
410 | 403 | } |
411 | 404 | } |
412 | 405 |
|
|
0 commit comments