Skip to content

Commit e689f4e

Browse files
committed
Add configurable option to scale-down during task run time for cost-based autoscaler
1 parent 009365f commit e689f4e

File tree

4 files changed

+127
-8
lines changed

4 files changed

+127
-8
lines changed

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
132132
// Weight configuration: strongly favor lag reduction over idle time
133133
.lagWeight(0.9)
134134
.idleWeight(0.1)
135+
.scaleDownDuringTaskRolloverOnly(false)
136+
.scaleDownBarrier(1)
135137
.build();
136138

137139
final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount);
@@ -225,6 +227,9 @@ void test_scaleDownDuringTaskRollover()
225227
// High idle weight ensures scale-down when tasks are mostly idle (little data to process)
226228
.lagWeight(0.1)
227229
.idleWeight(0.9)
230+
.scaleDownDuringTaskRolloverOnly(false)
231+
// Do not slow scale-downs
232+
.scaleDownBarrier(0)
228233
.build();
229234

230235
final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount);

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
9090
private final WeightedCostFunction costFunction;
9191
private volatile CostMetrics lastKnownMetrics;
9292

93+
private int scaleDownCounter = 0;
94+
9395
public CostBasedAutoScaler(
9496
SeekableStreamSupervisor supervisor,
9597
CostBasedAutoScalerConfig config,
@@ -148,7 +150,11 @@ public void reset()
148150
@Override
149151
public int computeTaskCountForRollover()
150152
{
151-
return computeOptimalTaskCount(lastKnownMetrics);
153+
if (config.isScaleDownOnTaskRolloverOnly()) {
154+
return computeOptimalTaskCount(lastKnownMetrics);
155+
} else {
156+
return -1;
157+
}
152158
}
153159

154160
public int computeTaskCountForScaleAction()
@@ -157,11 +163,16 @@ public int computeTaskCountForScaleAction()
157163
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
158164
final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
159165

160-
// Perform only scale-up actions
166+
// Perform scale-up actions; scale-down actions only if configured.
161167
int taskCount = -1;
162168
if (optimalTaskCount > currentTaskCount) {
163169
taskCount = optimalTaskCount;
164-
log.info("New task count [%d] on supervisor [%s]", taskCount, supervisorId);
170+
log.info("New task count [%d] on supervisor [%s], scaling up", taskCount, supervisorId);
171+
} else if (!config.isScaleDownOnTaskRolloverOnly() && optimalTaskCount < currentTaskCount &&
172+
(config.getScaleDownBarrier() == 0 || ++scaleDownCounter == config.getScaleDownBarrier())) {
173+
taskCount = optimalTaskCount;
174+
scaleDownCounter = 0;
175+
log.info("New task count [%d] on supervisor [%s], scaling down", taskCount, supervisorId);
165176
} else {
166177
log.info("No scaling required for supervisor [%s]", supervisorId);
167178
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig
4848
private static final double DEFAULT_LAG_WEIGHT = 0.25;
4949
private static final double DEFAULT_IDLE_WEIGHT = 0.75;
5050
private static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000 records/sec per task as default
51+
private static final int DEFAULT_SCALE_DOWN_BARRIER = 10;
5152

5253
private final boolean enableTaskAutoScaler;
5354
private final int taskCountMax;
@@ -60,6 +61,8 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig
6061
private final double lagWeight;
6162
private final double idleWeight;
6263
private final double defaultProcessingRate;
64+
private final int scaleDownBarrier;
65+
private final boolean scaleDownDuringTaskRolloverOnly;
6366

6467
@JsonCreator
6568
public CostBasedAutoScalerConfig(
@@ -72,7 +75,9 @@ public CostBasedAutoScalerConfig(
7275
@Nullable @JsonProperty("scaleActionPeriodMillis") Long scaleActionPeriodMillis,
7376
@Nullable @JsonProperty("lagWeight") Double lagWeight,
7477
@Nullable @JsonProperty("idleWeight") Double idleWeight,
75-
@Nullable @JsonProperty("defaultProcessingRate") Double defaultProcessingRate
78+
@Nullable @JsonProperty("defaultProcessingRate") Double defaultProcessingRate,
79+
@Nullable @JsonProperty("scaleDownBarrier") Integer scaleDownBarrier,
80+
@Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean scaleDownDuringTaskRolloverOnly
7681
)
7782
{
7883
this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
@@ -90,6 +95,8 @@ public CostBasedAutoScalerConfig(
9095
this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
9196
this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
9297
this.defaultProcessingRate = Configs.valueOrDefault(defaultProcessingRate, DEFAULT_PROCESSING_RATE);
98+
this.scaleDownBarrier = Configs.valueOrDefault(scaleDownBarrier, DEFAULT_SCALE_DOWN_BARRIER);
99+
this.scaleDownDuringTaskRolloverOnly = Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
93100

94101
if (this.enableTaskAutoScaler) {
95102
Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when enableTaskAutoScaler is true");
@@ -107,17 +114,16 @@ public CostBasedAutoScalerConfig(
107114
}
108115
this.taskCountStart = taskCountStart;
109116

110-
// Validate stopTaskCountRatio
111117
Preconditions.checkArgument(
112118
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0),
113119
"0.0 < stopTaskCountRatio <= 1.0"
114120
);
115121
this.stopTaskCountRatio = stopTaskCountRatio;
116122

117-
// Validate weights are non-negative
118123
Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
119124
Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 0");
120125
Preconditions.checkArgument(this.defaultProcessingRate > 0, "defaultProcessingRate must be > 0");
126+
Preconditions.checkArgument(this.scaleDownBarrier >= 0, "scaleDownBarrier must be >= 0");
121127
}
122128

123129
/**
@@ -196,6 +202,18 @@ public double getDefaultProcessingRate()
196202
return defaultProcessingRate;
197203
}
198204

205+
@JsonProperty
206+
public int getScaleDownBarrier()
207+
{
208+
return scaleDownBarrier;
209+
}
210+
211+
@JsonProperty
212+
public boolean isScaleDownOnTaskRolloverOnly()
213+
{
214+
return scaleDownDuringTaskRolloverOnly;
215+
}
216+
199217
@Override
200218
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
201219
{
@@ -222,6 +240,8 @@ public boolean equals(Object o)
222240
&& Double.compare(that.lagWeight, lagWeight) == 0
223241
&& Double.compare(that.idleWeight, idleWeight) == 0
224242
&& Double.compare(that.defaultProcessingRate, defaultProcessingRate) == 0
243+
&& scaleDownBarrier == that.scaleDownBarrier
244+
&& scaleDownDuringTaskRolloverOnly == that.scaleDownDuringTaskRolloverOnly
225245
&& Objects.equals(taskCountStart, that.taskCountStart)
226246
&& Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
227247
}
@@ -239,7 +259,9 @@ public int hashCode()
239259
scaleActionPeriodMillis,
240260
lagWeight,
241261
idleWeight,
242-
defaultProcessingRate
262+
defaultProcessingRate,
263+
scaleDownBarrier,
264+
scaleDownDuringTaskRolloverOnly
243265
);
244266
}
245267

@@ -257,6 +279,8 @@ public String toString()
257279
", lagWeight=" + lagWeight +
258280
", idleWeight=" + idleWeight +
259281
", defaultProcessingRate=" + defaultProcessingRate +
282+
", scaleDownBarrier=" + scaleDownBarrier +
283+
", scaleDownDuringTaskRolloverOnly=" + scaleDownDuringTaskRolloverOnly +
260284
'}';
261285
}
262286

@@ -276,6 +300,8 @@ public static class Builder
276300
private Double lagWeight;
277301
private Double idleWeight;
278302
private Double defaultProcessingRate;
303+
private Integer scaleDownBarrier;
304+
private Boolean scaleDownDuringTaskRolloverOnly;
279305

280306
private Builder()
281307
{
@@ -341,6 +367,18 @@ public Builder defaultProcessingRate(double defaultProcessingRate)
341367
return this;
342368
}
343369

370+
public Builder scaleDownBarrier(int scaleDownBarrier)
371+
{
372+
this.scaleDownBarrier = scaleDownBarrier;
373+
return this;
374+
}
375+
376+
public Builder scaleDownDuringTaskRolloverOnly(boolean scaleDownDuringTaskRolloverOnly)
377+
{
378+
this.scaleDownDuringTaskRolloverOnly = scaleDownDuringTaskRolloverOnly;
379+
return this;
380+
}
381+
344382
public CostBasedAutoScalerConfig build()
345383
{
346384
return new CostBasedAutoScalerConfig(
@@ -353,7 +391,9 @@ public CostBasedAutoScalerConfig build()
353391
scaleActionPeriodMillis,
354392
lagWeight,
355393
idleWeight,
356-
defaultProcessingRate
394+
defaultProcessingRate,
395+
scaleDownBarrier,
396+
scaleDownDuringTaskRolloverOnly
357397
);
358398
}
359399
}

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,69 @@ public void testBoundaryConditionOptimalEqualsCurrentMinusOne()
257257
);
258258
}
259259

260+
@Test
261+
public void testScaleDownBlockedWhenScaleDownOnRolloverOnlyEnabled()
262+
{
263+
CostBasedAutoScalerConfig rolloverOnlyConfig = CostBasedAutoScalerConfig.builder()
264+
.taskCountMax(100)
265+
.taskCountMin(1)
266+
.enableTaskAutoScaler(true)
267+
.scaleDownDuringTaskRolloverOnly(true)
268+
.scaleDownBarrier(1) // Set the barrier to 1 so it would trigger immediately if not blocked
269+
.build();
270+
271+
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
272+
mockSupervisor,
273+
rolloverOnlyConfig,
274+
mockSpec,
275+
mockEmitter
276+
));
277+
278+
int currentTaskCount = 50;
279+
int optimalCount = 30; // Lower than current (scale-down scenario)
280+
281+
doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
282+
setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
283+
284+
Assert.assertEquals(
285+
"Should return -1 when scaleDownDuringTaskRolloverOnly is true",
286+
-1,
287+
autoScaler.computeTaskCountForScaleAction()
288+
);
289+
}
290+
291+
@Test
292+
public void testScaleDownAllowedDuringRolloverWhenScaleDownOnRolloverOnlyEnabled()
293+
{
294+
CostBasedAutoScalerConfig rolloverOnlyConfig = CostBasedAutoScalerConfig.builder()
295+
.taskCountMax(100)
296+
.taskCountMin(1)
297+
.enableTaskAutoScaler(true)
298+
.scaleDownDuringTaskRolloverOnly(true)
299+
.build();
300+
301+
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
302+
mockSupervisor,
303+
rolloverOnlyConfig,
304+
mockSpec,
305+
mockEmitter
306+
));
307+
308+
int currentTaskCount = 50;
309+
int optimalCount = 30;
310+
311+
// Set up lastKnownMetrics by calling computeTaskCountForScaleAction first
312+
doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
313+
setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
314+
autoScaler.computeTaskCountForScaleAction(); // This populates lastKnownMetrics
315+
316+
Assert.assertEquals(
317+
"Should scale-down during rollover when scaleDownDuringTaskRolloverOnly is true",
318+
optimalCount,
319+
autoScaler.computeTaskCountForRollover()
320+
);
321+
}
322+
260323
private void setupMocksForMetricsCollection(int taskCount, double avgLag, double pollIdleRatio)
261324
{
262325
when(mockSupervisor.computeLagStats()).thenReturn(new LagStats(0, (long) avgLag * 2, (long) avgLag));

0 commit comments

Comments
 (0)