-
Notifications
You must be signed in to change notification settings - Fork 252
fix: handle null states in EVCacheBulkGetFuture and refactor #179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
a1de62b
00ff306
e93ef48
7eb75e8
f14eaa1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,6 +51,7 @@ public class EVCacheBulkGetFuture<T> extends BulkGetFuture<T> { | |
| private static final Logger log = LoggerFactory.getLogger(EVCacheBulkGetFuture.class); | ||
| private final Map<String, Future<T>> rvMap; | ||
| private final Collection<Operation> ops; | ||
| private final Operation[] opsArray; | ||
| private final CountDownLatch latch; | ||
| private final long start; | ||
| private final EVCacheClient client; | ||
|
|
@@ -60,6 +61,7 @@ public EVCacheBulkGetFuture(Map<String, Future<T>> m, Collection<Operation> getO | |
| super(m, getOps, l, service); | ||
| rvMap = m; | ||
| ops = getOps; | ||
| opsArray = ops.toArray(new Operation[0]); | ||
| latch = l; | ||
| this.start = System.currentTimeMillis(); | ||
| this.client = client; | ||
|
|
@@ -70,6 +72,8 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo | |
| throws InterruptedException, ExecutionException { | ||
| assert operationStates != null; | ||
|
|
||
| // Note: The latch here is counterintuitive. Based on the implementation in EVCacheMemcachedClient.asyncGetBulk(), | ||
| // the latch count is set to 1 no matter the chunk size and only decrement when pendingChunks counts down to 0. | ||
|
||
| boolean allCompleted = latch.await(to, unit); | ||
| if(log.isDebugEnabled()) log.debug("Took " + (System.currentTimeMillis() - start)+ " to fetch " + rvMap.size() + " keys from " + client); | ||
| long pauseDuration = -1; | ||
|
|
@@ -122,23 +126,41 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo | |
| boolean hadTimedoutOp = false; | ||
| for (int i = 0; i < operationStates.length(); i++) { | ||
| SingleOperationState state = operationStates.get(i); | ||
| if (!state.completed && !allCompleted) { | ||
| MemcachedConnection.opTimedOut(state.op); | ||
|
|
||
| if (state == null) { | ||
| // Operation not yet signaled completion (cancel should still trigger completion) ==> latch timed out | ||
| // This also indicates allCompleted == false because the latch count wouldn't have drop to 0. | ||
| Operation op = opsArray[i]; | ||
| op.timeOut(); | ||
| MemcachedConnection.opTimedOut(op); | ||
| hadTimedoutOp = true; | ||
| } else { | ||
| MemcachedConnection.opSucceeded(state.op); | ||
| if (state.timedOut) { | ||
| MemcachedConnection.opTimedOut(state.op); | ||
| hadTimedoutOp = true; | ||
| } else { | ||
| MemcachedConnection.opSucceeded(state.op); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (!allCompleted && !hasZF && hadTimedoutOp) statusString = EVCacheMetricsFactory.TIMEOUT; | ||
| if (hadTimedoutOp && !hasZF) statusString = EVCacheMetricsFactory.TIMEOUT; | ||
|
||
| // Should we throw when timeout? | ||
|
|
||
| for (int i = 0; i < operationStates.length(); i++) { | ||
| SingleOperationState state = operationStates.get(i); | ||
| if (state.cancelled) { | ||
| if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; | ||
| if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); | ||
|
|
||
| // state == null always means timed out and was handled. | ||
| if (state != null) { | ||
| if (state.cancelled) { | ||
| if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED; | ||
| if (throwException) throw new ExecutionException(new CancellationException("Cancelled")); | ||
| } | ||
| if (state.errored) { | ||
| if (hasZF) statusString = EVCacheMetricsFactory.ERROR; | ||
| if (throwException) throw new ExecutionException(state.op.getException()); | ||
| } | ||
| } | ||
| if (state.errored && throwException) throw new ExecutionException(state.op.getException()); | ||
| } | ||
|
|
||
| Map<String, T> m = new HashMap<String, T>(); | ||
|
|
@@ -221,18 +243,24 @@ public void handleBulkException() { | |
| ExecutionException t = null; | ||
| for (int i = 0; i < operationStates.length(); i++) { | ||
| SingleOperationState state = operationStates.get(i); | ||
| if (!state.completed) { | ||
|
|
||
| if (state == null) { | ||
| Operation op = opsArray[i]; | ||
| op.timeOut(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want to signal timeout here? Iiuc we're handling that one of the ops in the bulk had an exception, shouldn't we signal a cancellation instead?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. signal a cancellation to the op seems better if we enter here while state is still null
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm after second look at this method, I wonder what's the intention of this method. The caller caught an exception, but we are not doing anything with that "ex". Instead we throw based on whichever ops we found "first" that is either Cancelled or TimedOut or Errored... |
||
| MemcachedConnection.opTimedOut(op); | ||
| t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); | ||
| } else { | ||
| // Use pre-collected state | ||
| if (state.cancelled) { | ||
| throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); | ||
| } else if (state.errored) { | ||
| throw new RuntimeException(new ExecutionException(state.op.getException())); | ||
| } else { | ||
| state.op.timeOut(); | ||
| } else if (state.timedOut) { | ||
| MemcachedConnection.opTimedOut(state.op); | ||
| t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op)); | ||
| } else { | ||
| MemcachedConnection.opSucceeded(state.op); | ||
| } | ||
| } else { | ||
| MemcachedConnection.opSucceeded(state.op); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -259,17 +287,31 @@ public Single<Map<String, T>> getSome(long to, TimeUnit units, boolean throwExce | |
| try { | ||
| for (int i = 0; i < operationStates.length(); i++) { | ||
| SingleOperationState state = operationStates.get(i); | ||
| if (!state.completed) { | ||
| MemcachedConnection.opTimedOut(state.op); | ||
| Operation op = opsArray[i]; | ||
|
|
||
| if (state == null) { | ||
| op.timeOut(); | ||
| MemcachedConnection.opTimedOut(op); | ||
| // Should we throw when timeout? | ||
| } else { | ||
| MemcachedConnection.opSucceeded(state.op); | ||
| if (state.timedOut) { | ||
| MemcachedConnection.opTimedOut(state.op); | ||
| // Should we throw when timeout? | ||
| } else { | ||
| MemcachedConnection.opSucceeded(state.op); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for (int i = 0; i < operationStates.length(); i++) { | ||
| SingleOperationState state = operationStates.get(i); | ||
| if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); | ||
| if (state.errored && throwException) throw new ExecutionException(state.op.getException()); | ||
| Operation op = opsArray[i]; | ||
|
|
||
| // state == null always means timed out and was handled. | ||
| if (state != null) { | ||
| if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled")); | ||
| if (state.errored && throwException) throw new ExecutionException(state.op.getException()); | ||
| } | ||
| } | ||
|
|
||
| Map<String, T> m = new HashMap<String, T>(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opsis an empty list that gets populated later.. See EVCacheMemcachedClient'sThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch.. agree, then we should convert to Array whenever we need them (was implemented that way in the original fix, but I tweak it the wrong way)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed