Skip to content

Commit cc567d7

Browse files
MikeDafiMichael Askndafi
andauthored
[controller] Dead Store Stats: From Region to Cluster-level feature (linkedin#1719)
* [controller] Dead Store Stats: Ease of config handling * Fixing test * Mapping DeadStoreStats Per Cluster * Fixing unit test * Update TestVeniceHelixResources.java * No need to throw --------- Co-authored-by: Michael Askndafi <maskndaf@linkedin.com>
1 parent 583b262 commit cc567d7

File tree

6 files changed

+43
-58
lines changed

6 files changed

+43
-58
lines changed

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithIsolatedEnvironment.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ public void testDeadStoreStatsInitialization() {
321321
int newAdminPort = controllerConfig.getAdminPort() + 100;
322322
PropertyBuilder builder = new PropertyBuilder().put(controllerProps.toProperties())
323323
.put("admin.port", newAdminPort)
324-
.put("controller.cluster.name", clusterName)
324+
.put("cluster.name", clusterName)
325325
.put("controller.dead.store.endpoint.enabled", true)
326326
.put("controller.dead.store.stats.class.name", MockDeadStoreStats.class.getName());
327327
VeniceProperties newControllerProps = builder.build();
@@ -334,7 +334,7 @@ public void testDeadStoreStatsInitialization() {
334334
pubSubTopicRepository,
335335
pubSubBrokerWrapper.getPubSubClientsFactory());
336336

337-
Assert.assertTrue(admin.deadStoreStats instanceof MockDeadStoreStats);
337+
Assert.assertTrue(admin.deadStoreStatsMap.get(clusterName) instanceof MockDeadStoreStats);
338338
}
339339

340340
public static class MockDeadStoreStats implements DeadStoreStats {

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixResources.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import static org.mockito.Mockito.mock;
55
import static org.mockito.Mockito.when;
66

7-
import com.linkedin.venice.controller.stats.DeadStoreStats;
87
import com.linkedin.venice.helix.HelixAdapterSerializer;
98
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
109
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSystemStoreRepository;
@@ -55,7 +54,6 @@ private HelixVeniceClusterResources getVeniceHelixResources(String cluster) {
5554
.getReadOnlyZKSharedSystemStoreRepository();
5655
doReturn(mock(HelixReadOnlyZKSharedSchemaRepository.class)).when(veniceHelixAdmin)
5756
.getReadOnlyZKSharedSchemaRepository();
58-
veniceHelixAdmin.deadStoreStats = mock(DeadStoreStats.class);
5957

6058
doReturn(Collections.emptyList()).when(veniceHelixAdmin).getAllStores(cluster);
6159

@@ -65,6 +63,7 @@ private HelixVeniceClusterResources getVeniceHelixResources(String cluster) {
6563
when(controllerConfig.isDaVinciPushStatusEnabled()).thenReturn(true);
6664
when(controllerConfig.getOffLineJobWaitTimeInMilliseconds()).thenReturn(120000L);
6765
when(controllerConfig.isDeadStoreEndpointEnabled()).thenReturn(true);
66+
when(controllerConfig.isPreFetchDeadStoreStatsEnabled()).thenReturn(true);
6867
when(controllerConfig.getDeadStoreStatsPreFetchRefreshIntervalInMs()).thenReturn(100L); // Must be Long
6968

7069
return new HelixVeniceClusterResources(

services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ public class HelixVeniceClusterResources implements VeniceResource {
7474

7575
private ErrorPartitionResetTask errorPartitionResetTask = null;
7676

77-
private final ExecutorService deadStoreStatsPreFetchService = Executors.newSingleThreadExecutor();
78-
private DeadStoreStatsPreFetchTask deadStoreStatsPreFetchTask = null;
77+
final ExecutorService deadStoreStatsPreFetchService = Executors.newSingleThreadExecutor();
78+
DeadStoreStatsPreFetchTask deadStoreStatsPreFetchTask = null;
7979
private final Optional<MetaStoreWriter> metaStoreWriter;
8080
private final VeniceAdminStats veniceAdminStats;
8181
private final VeniceHelixAdmin admin;

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -340,21 +340,6 @@ public long getLogCompactionIntervalMS() {
340340
return getCommonConfig().getLogCompactionIntervalMS();
341341
}
342342

343-
public String getDeadStoreStatsClassName() {
344-
return getCommonConfig().getDeadStoreStatsClassName();
345-
}
346-
347-
public VeniceProperties getDeadStoreStatsConfigs() {
348-
return getCommonConfig().getDeadStoreStatsConfigs();
349-
}
350-
351-
public boolean isDeadStoreEndpointEnabled(String clusterName) {
352-
if (!clusterToControllerConfigMap.containsKey(clusterName)) {
353-
return false; // can be the case where venice-controllers isn't a configmap in current tests
354-
}
355-
return getControllerConfig(clusterName).isDeadStoreEndpointEnabled();
356-
}
357-
358343
public long getTimeSinceLastLogCompactionThresholdMS() {
359344
return getCommonConfig().getTimeSinceLastLogCompactionThresholdMS();
360345
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
460460
private Set<PushJobCheckpoints> pushJobUserErrorCheckpoints;
461461
private final LogContext logContext;
462462

463-
DeadStoreStats deadStoreStats;
463+
final Map<String, DeadStoreStats> deadStoreStatsMap = new VeniceConcurrentHashMap<>();
464464

465465
public VeniceHelixAdmin(
466466
VeniceControllerMultiClusterConfig multiClusterConfigs,
@@ -699,17 +699,21 @@ public VeniceHelixAdmin(
699699
ParticipantMessageKey.getClassSchema()));
700700
}
701701

702-
if (multiClusterConfigs.isDeadStoreEndpointEnabled(controllerClusterName)) {
703-
Class<? extends DeadStoreStats> deadStoreStatsClass =
704-
ReflectUtils.loadClass(multiClusterConfigs.getDeadStoreStatsClassName());
705-
try {
706-
deadStoreStats = ReflectUtils.callConstructor(
707-
deadStoreStatsClass,
708-
new Class[] { VeniceProperties.class },
709-
new Object[] { multiClusterConfigs.getDeadStoreStatsConfigs() });
710-
} catch (Exception e) {
711-
LOGGER.error("Failed to enable " + DeadStoreStats.class.getSimpleName(), e);
712-
throw new VeniceException(e);
702+
for (String clusterName: multiClusterConfigs.getClusters()) {
703+
if (multiClusterConfigs.getControllerConfig(clusterName).isDeadStoreEndpointEnabled()) {
704+
Class<? extends DeadStoreStats> deadStoreStatsClass =
705+
ReflectUtils.loadClass(multiClusterConfigs.getControllerConfig(clusterName).getDeadStoreStatsClassName());
706+
try {
707+
DeadStoreStats deadStoreStats = ReflectUtils.callConstructor(
708+
deadStoreStatsClass,
709+
new Class[] { VeniceProperties.class },
710+
new Object[] { multiClusterConfigs.getControllerConfig(clusterName).getDeadStoreStatsConfigs() });
711+
712+
deadStoreStatsMap.put(clusterName, deadStoreStats);
713+
} catch (Exception e) {
714+
LOGGER.error("Failed to enable " + DeadStoreStats.class.getSimpleName(), e);
715+
throw new VeniceException(e);
716+
}
713717
}
714718
}
715719

@@ -8266,7 +8270,7 @@ public Map<String, RegionPushDetails> listStorePushInfo(
82668270

82678271
public void preFetchDeadStoreStats(String clusterName, List<StoreInfo> storeInfos) {
82688272
checkControllerLeadershipFor(clusterName);
8269-
deadStoreStats.preFetchStats(storeInfos);
8273+
deadStoreStatsMap.get(clusterName).preFetchStats(storeInfos);
82708274
}
82718275

82728276
/**
@@ -8275,7 +8279,7 @@ public void preFetchDeadStoreStats(String clusterName, List<StoreInfo> storeInfo
82758279
@Override
82768280
public List<StoreInfo> getDeadStores(String clusterName, String storeName, boolean includeSystemStores) {
82778281
checkControllerLeadershipFor(clusterName);
8278-
if (!multiClusterConfigs.isDeadStoreEndpointEnabled(clusterName)) {
8282+
if (!multiClusterConfigs.getControllerConfig(clusterName).isDeadStoreEndpointEnabled()) {
82798283
throw new VeniceUnsupportedOperationException("Dead store stats is not enabled.");
82808284
}
82818285

@@ -8285,13 +8289,13 @@ public List<StoreInfo> getDeadStores(String clusterName, String storeName, boole
82858289
.filter(store -> includeSystemStores || !store.isSystemStore())
82868290
.map(StoreInfo::fromStore)
82878291
.collect(Collectors.toList());
8288-
return deadStoreStats.getDeadStores(clusterStoreInfos);
8292+
return deadStoreStatsMap.get(clusterName).getDeadStores(clusterStoreInfos);
82898293
} else {
82908294
StoreInfo store = StoreInfo.fromStore(getStore(clusterName, storeName));
82918295
if (store == null) {
82928296
throw new VeniceNoStoreException(storeName, clusterName);
82938297
}
8294-
return deadStoreStats.getDeadStores(Collections.singletonList(store));
8298+
return deadStoreStatsMap.get(clusterName).getDeadStores(Collections.singletonList(store));
82958299
}
82968300
}
82978301

services/venice-controller/src/test/java/com/linkedin/venice/controller/DeadStoreStatsPreFetchTaskTest.java

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static org.mockito.Mockito.*;
44

5-
import com.linkedin.venice.controller.stats.DeadStoreStats;
65
import com.linkedin.venice.meta.Store;
76
import java.util.Collections;
87
import java.util.concurrent.ExecutorService;
@@ -13,26 +12,24 @@
1312

1413

1514
public class DeadStoreStatsPreFetchTaskTest {
15+
private static final String CLUSTER_NAME = "test-cluster";
1616
private VeniceHelixAdmin mockAdmin;
17-
private DeadStoreStats mockStats;
1817
private Store mockStore;
1918

2019
@BeforeClass
2120
public void setUp() {
2221
mockAdmin = mock(VeniceHelixAdmin.class);
23-
mockStats = mock(DeadStoreStats.class);
2422
mockStore = mock(Store.class);
25-
mockAdmin.deadStoreStats = mockStats;
26-
when(mockAdmin.getAllStores("test-cluster")).thenReturn(Collections.singletonList(mockStore));
27-
when(mockAdmin.isLeaderControllerFor("test-cluster")).thenReturn(true);
23+
when(mockAdmin.getAllStores(CLUSTER_NAME)).thenReturn(Collections.singletonList(mockStore));
24+
when(mockAdmin.isLeaderControllerFor(CLUSTER_NAME)).thenReturn(true);
2825
}
2926

3027
@Test
3128
public void testBecomesLeaderAfterRetries() throws InterruptedException {
3229
// Simulate: false 1st check, true on 2nd check (~10 sec delay)
33-
when(mockAdmin.isLeaderControllerFor("test-cluster")).thenReturn(false).thenReturn(true);
30+
when(mockAdmin.isLeaderControllerFor(CLUSTER_NAME)).thenReturn(false).thenReturn(true);
3431

35-
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask("test-cluster", mockAdmin, 1000);
32+
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask(CLUSTER_NAME, mockAdmin, 1000);
3633
ExecutorService executor = Executors.newSingleThreadExecutor();
3734

3835
executor.submit(task);
@@ -42,27 +39,27 @@ public void testBecomesLeaderAfterRetries() throws InterruptedException {
4239

4340
shutdownTask(task, executor);
4441

45-
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq("test-cluster"), anyList());
46-
verify(mockAdmin, atLeast(2)).isLeaderControllerFor("test-cluster");
42+
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq(CLUSTER_NAME), anyList());
43+
verify(mockAdmin, atLeast(2)).isLeaderControllerFor(CLUSTER_NAME);
4744
}
4845

4946
@Test
5047
public void testInitialFetchIsCalled() throws InterruptedException {
51-
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask("test-cluster", mockAdmin, 1000);
48+
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask(CLUSTER_NAME, mockAdmin, 1000);
5249
ExecutorService executor = Executors.newSingleThreadExecutor();
5350

5451
executor.submit(task);
5552
waitForAsyncExecution();
5653

57-
verify(mockAdmin, atLeastOnce()).getAllStores("test-cluster");
58-
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq("test-cluster"), anyList());
54+
verify(mockAdmin, atLeastOnce()).getAllStores(CLUSTER_NAME);
55+
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq(CLUSTER_NAME), anyList());
5956

6057
shutdownTask(task, executor);
6158
}
6259

6360
@Test
6461
public void testFullRunLifecycle() throws InterruptedException {
65-
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask("test-cluster", mockAdmin, 200);
62+
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask(CLUSTER_NAME, mockAdmin, 200);
6663
ExecutorService executor = Executors.newSingleThreadExecutor();
6764

6865
executor.submit(task);
@@ -74,35 +71,35 @@ public void testFullRunLifecycle() throws InterruptedException {
7471

7572
// 1 call should happen immediately (before loop)
7673
// at least 1 additional call should happen from loop
77-
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq("test-cluster"), anyList());
78-
verify(mockAdmin, atLeast(2)).getAllStores("test-cluster");
74+
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq(CLUSTER_NAME), anyList());
75+
verify(mockAdmin, atLeast(2)).getAllStores(CLUSTER_NAME);
7976
}
8077

8178
@Test
8279
public void testFetchRepeatsUntilClosed() throws InterruptedException {
83-
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask("test-cluster", mockAdmin, 100);
80+
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask(CLUSTER_NAME, mockAdmin, 100);
8481
ExecutorService executor = Executors.newSingleThreadExecutor();
8582

8683
executor.submit(task);
8784
Thread.sleep(400);
8885

8986
shutdownTask(task, executor);
90-
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq("test-cluster"), anyList());
87+
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq(CLUSTER_NAME), anyList());
9188
}
9289

9390
@Test
9491
public void testExceptionIsHandledGracefully() throws InterruptedException {
9592
doThrow(new RuntimeException("Simulated error")).when(mockAdmin)
96-
.preFetchDeadStoreStats(eq("test-cluster"), anyList());
93+
.preFetchDeadStoreStats(eq(CLUSTER_NAME), anyList());
9794

98-
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask("test-cluster", mockAdmin, 100);
95+
DeadStoreStatsPreFetchTask task = new DeadStoreStatsPreFetchTask(CLUSTER_NAME, mockAdmin, 100);
9996
ExecutorService executor = Executors.newSingleThreadExecutor();
10097

10198
executor.submit(task);
10299
Thread.sleep(300);
103100

104101
shutdownTask(task, executor);
105-
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq("test-cluster"), anyList());
102+
verify(mockAdmin, atLeastOnce()).preFetchDeadStoreStats(eq(CLUSTER_NAME), anyList());
106103
}
107104

108105
private void waitForAsyncExecution() throws InterruptedException {

0 commit comments

Comments
 (0)