Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo
throws InterruptedException, ExecutionException {
assert operationStates != null;

// Note: The latch here is counterintuitive. Based on the implementation in EVCacheMemcachedClient.asyncGetBulk(),
// the latch count is set to 1 no matter the chunk size (when > 0) and only decrement when pendingChunks counts down to 0.
boolean allCompleted = latch.await(to, unit);
if(log.isDebugEnabled()) log.debug("Took " + (System.currentTimeMillis() - start)+ " to fetch " + rvMap.size() + " keys from " + client);
long pauseDuration = -1;
Expand Down Expand Up @@ -120,25 +122,40 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo
}

boolean hadTimedoutOp = false;
Operation[] opsArray = ops.toArray(new Operation[0]);
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (!state.completed && !allCompleted) {
MemcachedConnection.opTimedOut(state.op);

if (state == null) {
// Operation not yet signaled completion (cancel should still trigger completion) ==> latch timed out
// This also indicates allCompleted == false because the latch count wouldn't have drop to 0.
Operation op = opsArray[i];
op.timeOut();
MemcachedConnection.opTimedOut(op);
hadTimedoutOp = true;
} else {
MemcachedConnection.opSucceeded(state.op);
if (!state.completed && !allCompleted) {
MemcachedConnection.opTimedOut(state.op);
hadTimedoutOp = true;
} else {
MemcachedConnection.opSucceeded(state.op);
}
}
}

if (!allCompleted && !hasZF && hadTimedoutOp) statusString = EVCacheMetricsFactory.TIMEOUT;

for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (state.cancelled) {
if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED;
if (throwException) throw new ExecutionException(new CancellationException("Cancelled"));

// state == null always means timed out and was handled.
if (state != null) {
if (state.cancelled) {
if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED;
if (throwException) throw new ExecutionException(new CancellationException("Cancelled"));
}
if (state.errored && throwException) throw new ExecutionException(state.op.getException());
}
if (state.errored && throwException) throw new ExecutionException(state.op.getException());
}

Map<String, T> m = new HashMap<String, T>();
Expand Down Expand Up @@ -219,20 +236,30 @@ public CompletableFuture<Map<String, T>> getAsyncSome(long timeout, TimeUnit uni

public void handleBulkException() {
ExecutionException t = null;
Operation[] opsArray = ops.toArray(new Operation[0]);
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (!state.completed) {
if (state.cancelled) {
throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled")));
} else if (state.errored) {
throw new RuntimeException(new ExecutionException(state.op.getException()));

if (state == null) {
Operation op = opsArray[i];
op.timeOut();
MemcachedConnection.opTimedOut(op);
t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op));
} else {
if (!state.completed) {
// Use pre-collected state
if (state.cancelled) {
throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled")));
} else if (state.errored) {
throw new RuntimeException(new ExecutionException(state.op.getException()));
} else {
state.op.timeOut();
MemcachedConnection.opTimedOut(state.op);
t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op));
}
} else {
state.op.timeOut();
MemcachedConnection.opTimedOut(state.op);
t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op));
MemcachedConnection.opSucceeded(state.op);
}
} else {
MemcachedConnection.opSucceeded(state.op);
}
}

Expand All @@ -257,19 +284,31 @@ public void doAsyncGetSome(CompletableFuture<Map<String, T>> promise) {
public Single<Map<String, T>> getSome(long to, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) {
return observe().timeout(to, units, Single.create(subscriber -> {
try {
Operation[] opsArray = ops.toArray(new Operation[0]);
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (!state.completed) {
MemcachedConnection.opTimedOut(state.op);

if (state == null) {
Operation op = opsArray[i];
op.timeOut();
MemcachedConnection.opTimedOut(op);
} else {
MemcachedConnection.opSucceeded(state.op);
if (!state.completed) {
MemcachedConnection.opTimedOut(state.op);
} else {
MemcachedConnection.opSucceeded(state.op);
}
}
}

for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled"));
if (state.errored && throwException) throw new ExecutionException(state.op.getException());

// state == null always means timed out and was handled.
if (state != null) {
if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled"));
if (state.errored && throwException) throw new ExecutionException(state.op.getException());
}
}

Map<String, T> m = new HashMap<String, T>();
Expand Down