Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class EVCacheBulkGetFuture<T> extends BulkGetFuture<T> {
private static final Logger log = LoggerFactory.getLogger(EVCacheBulkGetFuture.class);
private final Map<String, Future<T>> rvMap;
private final Collection<Operation> ops;
private final Operation[] opsArray;
private final CountDownLatch latch;
private final long start;
private final EVCacheClient client;
Expand All @@ -60,6 +61,7 @@ public EVCacheBulkGetFuture(Map<String, Future<T>> m, Collection<Operation> getO
super(m, getOps, l, service);
rvMap = m;
ops = getOps;
opsArray = ops.toArray(new Operation[0]);
Copy link
Collaborator

@Sunjeet Sunjeet Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ops is an empty list that gets populated later.. See EVCacheMemcachedClient's

final Collection<Operation> ops = new ArrayList<Operation>(chunks.size());
final EVCacheBulkGetFuture<T> rv = new EVCacheBulkGetFuture<T>(m, ops, latch, executorService, client); 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch.. agree, then we should convert to Array whenever we need them (was implemented that way in the original fix, but I tweak it the wrong way)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

latch = l;
this.start = System.currentTimeMillis();
this.client = client;
Expand All @@ -70,6 +72,8 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo
throws InterruptedException, ExecutionException {
assert operationStates != null;

// Note: The latch here is counterintuitive. Based on the implementation in EVCacheMemcachedClient.asyncGetBulk(),
// the latch count is set to 1 no matter the chunk size and only decrement when pendingChunks counts down to 0.
Copy link
Collaborator

@Sunjeet Sunjeet Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the latch count is set to 1 no matter the chunk size

isn't it also 0 when chunks.isEmpty() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I actually mean I was expecting the latch count equal to the chunk size (no matter it's empty or with some chunks).

boolean allCompleted = latch.await(to, unit);
if(log.isDebugEnabled()) log.debug("Took " + (System.currentTimeMillis() - start)+ " to fetch " + rvMap.size() + " keys from " + client);
long pauseDuration = -1;
Expand Down Expand Up @@ -122,23 +126,41 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo
boolean hadTimedoutOp = false;
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (!state.completed && !allCompleted) {
MemcachedConnection.opTimedOut(state.op);

if (state == null) {
// Operation not yet signaled completion (cancel should still trigger completion) ==> latch timed out
// This also indicates allCompleted == false because the latch count wouldn't have drop to 0.
Operation op = opsArray[i];
op.timeOut();
MemcachedConnection.opTimedOut(op);
hadTimedoutOp = true;
} else {
MemcachedConnection.opSucceeded(state.op);
if (state.timedOut) {
MemcachedConnection.opTimedOut(state.op);
hadTimedoutOp = true;
} else {
MemcachedConnection.opSucceeded(state.op);
}
}
}

if (!allCompleted && !hasZF && hadTimedoutOp) statusString = EVCacheMetricsFactory.TIMEOUT;
if (hadTimedoutOp && !hasZF) statusString = EVCacheMetricsFactory.TIMEOUT;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional to omit && hadTimedoutOp in the conditional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you mean omit !allCompleted? This is based on the fact that "!allCompleted" will always lead to hadTimedoutOp.

// Should we throw when timeout?

for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (state.cancelled) {
if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED;
if (throwException) throw new ExecutionException(new CancellationException("Cancelled"));

// state == null always means timed out and was handled.
if (state != null) {
if (state.cancelled) {
if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED;
if (throwException) throw new ExecutionException(new CancellationException("Cancelled"));
}
if (state.errored) {
if (hasZF) statusString = EVCacheMetricsFactory.ERROR;
if (throwException) throw new ExecutionException(state.op.getException());
}
}
if (state.errored && throwException) throw new ExecutionException(state.op.getException());
}

Map<String, T> m = new HashMap<String, T>();
Expand Down Expand Up @@ -221,18 +243,24 @@ public void handleBulkException() {
ExecutionException t = null;
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (!state.completed) {

if (state == null) {
Operation op = opsArray[i];
op.timeOut();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to signal timeout here? Iiuc we're handling that one of the ops in the bulk had an exception, shouldn't we signal a cancellation instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. signal a cancellation to the op seems better if we enter here while state is still null

Copy link
Contributor Author

@shy-1234 shy-1234 Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm after second look at this method, I wonder what's the intention of this method. The caller caught an exception, but we are not doing anything with that "ex". Instead we throw based on whichever ops we found "first" that is either Cancelled or TimedOut or Errored...
I feel like the correct implementation should be something like

    public void handleBulkException(Throwable ex) {
        Operation[] opsArray = ops.toArray(new Operation[0]);
        for (int i = 0; i < operationStates.length(); i++) {
            SingleOperationState state = operationStates.get(i);

            if (state == null) {
                Operation op = opsArray[i];
                op.cancel();
                MemcachedConnection.opSucceeded(op);
            } else {
                // Use pre-collected state
                if (state.timedOut) {
                    MemcachedConnection.opTimedOut(state.op);
                } else {
                    MemcachedConnection.opSucceeded(state.op);
                }
            }
        }

        throw new RuntimeException(ex);
    }

MemcachedConnection.opTimedOut(op);
t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op));
} else {
// Use pre-collected state
if (state.cancelled) {
throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled")));
} else if (state.errored) {
throw new RuntimeException(new ExecutionException(state.op.getException()));
} else {
state.op.timeOut();
} else if (state.timedOut) {
MemcachedConnection.opTimedOut(state.op);
t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op));
} else {
MemcachedConnection.opSucceeded(state.op);
}
} else {
MemcachedConnection.opSucceeded(state.op);
}
}

Expand All @@ -259,17 +287,31 @@ public Single<Map<String, T>> getSome(long to, TimeUnit units, boolean throwExce
try {
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (!state.completed) {
MemcachedConnection.opTimedOut(state.op);
Operation op = opsArray[i];

if (state == null) {
op.timeOut();
MemcachedConnection.opTimedOut(op);
// Should we throw when timeout?
} else {
MemcachedConnection.opSucceeded(state.op);
if (state.timedOut) {
MemcachedConnection.opTimedOut(state.op);
// Should we throw when timeout?
} else {
MemcachedConnection.opSucceeded(state.op);
}
}
}

for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled"));
if (state.errored && throwException) throw new ExecutionException(state.op.getException());
Operation op = opsArray[i];

// state == null always means timed out and was handled.
if (state != null) {
if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled"));
if (state.errored && throwException) throw new ExecutionException(state.op.getException());
}
}

Map<String, T> m = new HashMap<String, T>();
Expand Down