Skip to content

Commit 3f339e2

Browse files
committed
Fix: asyncBulkGet decoding of hashed and unhashed keys together in one request
1 parent f2d9bd4 commit 3f339e2

File tree

7 files changed

+467
-159
lines changed

7 files changed

+467
-159
lines changed

evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java

Lines changed: 155 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,35 @@
11
package com.netflix.evcache.test;
22

3+
import static org.testng.Assert.assertEquals;
4+
import static org.testng.Assert.assertFalse;
35
import static org.testng.Assert.assertNotNull;
6+
import static org.testng.Assert.assertNull;
7+
import static org.testng.Assert.assertThrows;
48
import static org.testng.Assert.assertTrue;
59

10+
import com.netflix.evcache.EVCache;
11+
import com.netflix.evcache.EVCacheException;
12+
import com.netflix.evcache.EVCacheGetOperationListener;
13+
import com.netflix.evcache.EVCacheLatch;
14+
import com.netflix.evcache.operation.EVCacheOperationFuture;
15+
import com.netflix.evcache.pool.EVCacheClient;
16+
import com.netflix.evcache.pool.ServerGroup;
17+
import com.netflix.evcache.test.transcoder.Movie;
18+
import com.netflix.evcache.test.transcoder.MovieTranscoder;
19+
import com.netflix.evcache.util.KeyHasher;
20+
import java.util.HashMap;
21+
import java.util.List;
622
import java.util.Map;
723
import java.util.Properties;
8-
9-
import java.util.*;
24+
import java.util.UUID;
25+
import java.util.concurrent.CompletableFuture;
1026
import java.util.concurrent.Future;
1127
import java.util.concurrent.TimeUnit;
12-
13-
import com.netflix.evcache.*;
14-
import com.netflix.evcache.pool.EVCacheClient;
15-
import com.netflix.evcache.pool.ServerGroup;
16-
import com.netflix.evcache.util.KeyHasher;
1728
import org.slf4j.Logger;
1829
import org.slf4j.LoggerFactory;
1930
import org.testng.annotations.Test;
20-
21-
import com.netflix.evcache.operation.EVCacheOperationFuture;
2231
import rx.schedulers.Schedulers;
2332

24-
25-
import static org.testng.Assert.*;
26-
2733
public class EVCacheTestDI extends DIBase implements EVCacheGetOperationListener<String> {
2834
private static final Logger log = LoggerFactory.getLogger(EVCacheTestDI.class);
2935
private int loops = 1;
@@ -280,6 +286,8 @@ public void functionalTestsWithAppLevelAndASGLevelHashingScenarios() throws Exce
280286
assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".auto.hash.keys", Boolean.class).orElse(false).get());
281287
assertFalse(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get());
282288
testWithLargeKey();
289+
testWithMixedKeys();
290+
testWithMixedKeysAndCustomTranscoder();
283291
// negative scenario
284292
propertiesToSet.remove(appName + ".auto.hash.keys");
285293
refreshEVCache();
@@ -314,8 +322,13 @@ public void functionalTestsWithAppLevelAndASGLevelHashingScenarios() throws Exce
314322
}
315323
doFunctionalTests(true);
316324
for (ServerGroup serverGroup : clientsByServerGroup.keySet()) {
317-
propertiesToSet.remove(serverGroup.getName());
325+
propertiesToSet.remove(serverGroup.getName() + ".hash.key");
326+
propertiesToSet.remove(serverGroup.getName() + ".hash.algo");
318327
}
328+
329+
propertiesToSet.remove(appName + ".hash.key", "true");
330+
331+
refreshEVCache();
319332
}
320333

321334
private void testWithLargeKey() throws Exception {
@@ -330,8 +343,135 @@ private void testWithLargeKey() throws Exception {
330343
EVCacheLatch latch = evCache.set(key, value, EVCacheLatch.Policy.ALL);
331344
latch.await(1000, TimeUnit.MILLISECONDS);
332345

346+
String val = evCache.get(key);
347+
// get
348+
assertEquals(val, value);
349+
350+
// delete
351+
Future<Boolean>[] futures = evCache.delete(key);
352+
for (Future<Boolean> future : futures) {
353+
future.get();
354+
}
355+
}
356+
357+
private void testWithMixedKeys() throws Exception {
358+
359+
EVCache[] evcacheInstance = new EVCache[2];
360+
evcacheInstance[0] = getNewBuilder().setAppName(appName).setCachePrefix("cid").enableRetry().build();
361+
evcacheInstance[1] = this.evCache;
362+
363+
Map<String, String> kv = new HashMap<>(6);
364+
String oneLargeKey = null;
365+
String oneSmallKey = null;
366+
for (int k = 0; k < 3; k ++) {
367+
StringBuilder sb = new StringBuilder();
368+
sb.append("testWithSmallAndLargeKeysMixed");
369+
for (int i= 0; i < 100; i++) {
370+
sb.append(System.nanoTime());
371+
}
372+
oneLargeKey = sb.toString();
373+
kv.put(oneLargeKey, UUID.randomUUID().toString());
374+
}
375+
for (int k = 3; k < 6; k ++) {
376+
oneSmallKey = "testWithSmallAndLargeKeysMixed" + System.nanoTime();
377+
kv.put(oneSmallKey, UUID.randomUUID().toString());
378+
}
379+
380+
for (Map.Entry<String, String> entry : kv.entrySet()) {
381+
EVCacheLatch latch = evCache.set(entry.getKey(), entry.getValue(), EVCacheLatch.Policy.ALL);
382+
latch.await(10000, TimeUnit.MILLISECONDS);
383+
}
384+
333385
// get
334-
assertEquals(evCache.get(key), value);
386+
String val = evCache.get(oneLargeKey);
387+
assertEquals(val, kv.get(oneLargeKey));
388+
val = evCache.get(oneSmallKey);
389+
assertEquals(val, kv.get(oneSmallKey));
390+
391+
// async bulk get
392+
for (int op : new int[]{0, 1}) {
393+
Map<String, String> results;
394+
if (op == 0) {
395+
CompletableFuture<Map<String, String>> future = evCache.getAsyncBulk(kv.keySet().toArray(new String[0]));
396+
results = future.get(10000, TimeUnit.MILLISECONDS);
397+
} else {
398+
results = evCache.getBulk(kv.keySet().toArray(new String[0]));
399+
}
400+
assertEquals(results.size(), kv.size());
401+
for (Map.Entry<String, String> result : results.entrySet()) {
402+
assertEquals(result.getValue(), kv.get(result.getKey()));
403+
}
404+
}
405+
406+
// delete
407+
for (Map.Entry<String, String> entry : kv.entrySet()) {
408+
Future<Boolean>[] deleteFutures = evCache.delete(entry.getKey());
409+
for (Future<Boolean> deleteFuture : deleteFutures) {
410+
deleteFuture.get();
411+
}
412+
}
413+
414+
}
415+
416+
private void testWithMixedKeysAndCustomTranscoder() throws Exception {
417+
418+
com.netflix.evcache.EVCache evCache = getNewBuilder().setAppName(appName).setCachePrefix("cid").enableRetry()
419+
.setTranscoder(new MovieTranscoder())
420+
.build();
421+
422+
Map<String, Movie> kv = new HashMap<>(6);
423+
String oneLargeKey = null;
424+
String oneSmallKey = null;
425+
for (int k = 0; k < 3; k ++) {
426+
StringBuilder sb = new StringBuilder();
427+
sb.append("testWithSmallAndLargeKeysMixed");
428+
for (int i= 0; i < 100; i++) {
429+
sb.append(System.nanoTime());
430+
}
431+
oneLargeKey = sb.toString();
432+
kv.put(oneLargeKey, new Movie(k, String.valueOf(k)));
433+
}
434+
for (int k = 3; k < 6; k ++) {
435+
oneSmallKey = "testWithSmallAndLargeKeysMixed" + System.nanoTime();
436+
kv.put(oneSmallKey, new Movie(k, String.valueOf(k)));
437+
}
438+
439+
for (Map.Entry<String, Movie> entry : kv.entrySet()) {
440+
EVCacheLatch latch = evCache.set(entry.getKey(), entry.getValue(), EVCacheLatch.Policy.ALL);
441+
latch.await(10000, TimeUnit.MILLISECONDS);
442+
}
443+
444+
// get
445+
Movie val = evCache.get(oneLargeKey);
446+
assertEquals(val, kv.get(oneLargeKey));
447+
val = evCache.get(oneSmallKey);
448+
assertEquals(val, kv.get(oneSmallKey));
449+
450+
// async bulk get
451+
for (int op : new int[]{0, 1}) {
452+
Map<String, Movie> results = new HashMap<>();
453+
if (op == 0) {
454+
CompletableFuture<Map<String, Movie>> future = evCache.getAsyncBulk(kv.keySet().toArray(new String[0]));
455+
results = future.get(10000, TimeUnit.MILLISECONDS);
456+
// } else {
457+
// TODO: getBulk api is known to be broken for un-hashed keys not decoding correctly when request contains both hashed and unhashed keys
458+
// results = evCache.getBulk(kv.keySet().toArray(new String[0]));
459+
}
460+
461+
for (Map.Entry<String, Movie> result : results.entrySet()) {
462+
assertEquals(results.size(), kv.size());
463+
assertEquals(result.getValue(), kv.get(result.getKey()), "Did not get the written value back with op " + (op == 0 ? "getAsyncBulk" : "getBulk"));
464+
}
465+
}
466+
467+
// delete
468+
for (Map.Entry<String, Movie> entry : kv.entrySet()) {
469+
Future<Boolean>[] deleteFutures = evCache.delete(entry.getKey());
470+
for (Future<Boolean> deleteFuture : deleteFutures) {
471+
deleteFuture.get();
472+
}
473+
}
474+
335475
}
336476

337477
private void doFunctionalTests(boolean isHashingEnabled) throws Exception {
@@ -458,6 +598,7 @@ public void testAll() {
458598
testGetAndTouchObservable();
459599
waitForCallbacks();
460600
testAppendOrAdd();
601+
// functionalTestsWithAppLevelAndASGLevelHashingScenarios(); // slow, requires EVCache refresh
461602
testTouch();
462603
testDelete();
463604
testInsert();
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.netflix.evcache.test.transcoder;
2+
3+
import java.util.Objects;
4+
5+
public class Movie {
6+
long id;
7+
String name;
8+
9+
public Movie() { // required for decode
10+
}
11+
12+
public Movie(long id, String name) {
13+
this.id = id;
14+
this.name = name;
15+
}
16+
17+
public long getId() {
18+
return id;
19+
}
20+
21+
public void setId(long id) {
22+
this.id = id;
23+
}
24+
25+
public String getName() {
26+
return name;
27+
}
28+
29+
public void setName(String name) {
30+
this.name = name;
31+
}
32+
33+
@Override
34+
public boolean equals(Object o) {
35+
if (!(o instanceof Movie)) return false;
36+
Movie movie = (Movie) o;
37+
return id == movie.id && Objects.equals(name, movie.name);
38+
}
39+
40+
@Override
41+
public int hashCode() {
42+
return Objects.hash(id, name);
43+
}
44+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.netflix.evcache.test.transcoder;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import java.io.IOException;
6+
import net.spy.memcached.CachedData;
7+
import net.spy.memcached.transcoders.Transcoder;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
public class MovieTranscoder implements Transcoder<Movie> {
12+
private static final Logger LOGGER = LoggerFactory.getLogger(MovieTranscoder.class);
13+
ObjectMapper mapper = new ObjectMapper();
14+
15+
@Override
16+
public boolean asyncDecode(CachedData d) {
17+
return false;
18+
}
19+
20+
@Override
21+
public CachedData encode(Movie movie) {
22+
byte[] bytes;
23+
try {
24+
bytes = mapper.writeValueAsBytes(movie);
25+
} catch (JsonProcessingException e) {
26+
throw new RuntimeException(e);
27+
}
28+
return new CachedData(0, bytes, bytes.length);
29+
}
30+
31+
@Override
32+
public Movie decode(CachedData d) {
33+
try {
34+
return mapper.readValue(d.getData(), Movie.class);
35+
} catch (IOException e) {
36+
throw new RuntimeException(e);
37+
}
38+
}
39+
40+
@Override
41+
public int getMaxSize() {
42+
return CachedData.MAX_SIZE;
43+
}
44+
}

0 commit comments

Comments
 (0)