Skip to content

Commit ed0264a

Browse files
authored
Add configurable option to scale-down during task run time for cost-based autoscaler (#18958)
* Add configurable option to scale-down during task run time for cost-based autoscaler * Docs * Address review comments, compress tests a bit
1 parent 009365f commit ed0264a

File tree

7 files changed

+332
-164
lines changed

7 files changed

+332
-164
lines changed

docs/ingestion/supervisor.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ The following table outlines the configuration properties related to the `costBa
208208
|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25|
209209
|`idleWeight`|The weight of extracted poll idle value in cost function. | No | 0.75 |
210210
|`defaultProcessingRate`|A planned processing rate per task, required for first cost estimations. | No | 1000 |
211+
|`scaleDownBarrier`| A number of successful scale down attempts which should be skipped to prevent the auto-scaler from scaling down tasks immediately. | No | 5 |
212+
|`scaleDownDuringTaskRolloverOnly`| Indicates whether task scaling down is limited to periods during task rollovers only. | No | False |
211213

212214
The following example shows a supervisor spec with `lagBased` autoscaler:
213215

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
132132
// Weight configuration: strongly favor lag reduction over idle time
133133
.lagWeight(0.9)
134134
.idleWeight(0.1)
135+
.scaleDownDuringTaskRolloverOnly(false)
136+
.scaleDownBarrier(1)
135137
.build();
136138

137139
final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount);
@@ -225,6 +227,9 @@ void test_scaleDownDuringTaskRollover()
225227
// High idle weight ensures scale-down when tasks are mostly idle (little data to process)
226228
.lagWeight(0.1)
227229
.idleWeight(0.9)
230+
.scaleDownDuringTaskRolloverOnly(false)
231+
// Do not slow scale-downs
232+
.scaleDownBarrier(0)
228233
.build();
229234

230235
final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount);

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
9090
private final WeightedCostFunction costFunction;
9191
private volatile CostMetrics lastKnownMetrics;
9292

93+
private int scaleDownCounter = 0;
94+
9395
public CostBasedAutoScaler(
9496
SeekableStreamSupervisor supervisor,
9597
CostBasedAutoScalerConfig config,
@@ -148,7 +150,12 @@ public void reset()
148150
@Override
149151
public int computeTaskCountForRollover()
150152
{
151-
return computeOptimalTaskCount(lastKnownMetrics);
153+
if (config.isScaleDownOnTaskRolloverOnly()) {
154+
return computeOptimalTaskCount(lastKnownMetrics);
155+
} else {
156+
scaleDownCounter = 0;
157+
return -1;
158+
}
152159
}
153160

154161
public int computeTaskCountForScaleAction()
@@ -157,11 +164,18 @@ public int computeTaskCountForScaleAction()
157164
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
158165
final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
159166

160-
// Perform only scale-up actions
167+
// Perform scale-up actions; scale-down actions only if configured.
161168
int taskCount = -1;
162169
if (optimalTaskCount > currentTaskCount) {
163170
taskCount = optimalTaskCount;
164-
log.info("New task count [%d] on supervisor [%s]", taskCount, supervisorId);
171+
scaleDownCounter = 0; // Nullify the scaleDown counter after a successful scaleup too.
172+
log.info("New task count [%d] on supervisor [%s], scaling up", taskCount, supervisorId);
173+
} else if (!config.isScaleDownOnTaskRolloverOnly()
174+
&& optimalTaskCount < currentTaskCount
175+
&& ++scaleDownCounter >= config.getScaleDownBarrier()) {
176+
taskCount = optimalTaskCount;
177+
scaleDownCounter = 0;
178+
log.info("New task count [%d] on supervisor [%s], scaling down", taskCount, supervisorId);
165179
} else {
166180
log.info("No scaling required for supervisor [%s]", supervisorId);
167181
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@
4343
@JsonInclude(JsonInclude.Include.NON_NULL)
4444
public class CostBasedAutoScalerConfig implements AutoScalerConfig
4545
{
46-
private static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 15 * 60 * 1000; // 15 minutes
47-
private static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 1200000; // 20 minutes
48-
private static final double DEFAULT_LAG_WEIGHT = 0.25;
49-
private static final double DEFAULT_IDLE_WEIGHT = 0.75;
50-
private static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000 records/sec per task as default
46+
static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10 minutes
47+
static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 5 * 60 * 1000; // 5 minutes
48+
static final double DEFAULT_LAG_WEIGHT = 0.25;
49+
static final double DEFAULT_IDLE_WEIGHT = 0.75;
50+
static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000 records/sec per task as default
51+
static final int DEFAULT_SCALE_DOWN_BARRIER = 5; // We delay scale down by 5 * DEFAULT_SCALE_ACTION_PERIOD_MILLIS
5152

5253
private final boolean enableTaskAutoScaler;
5354
private final int taskCountMax;
@@ -60,6 +61,18 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig
6061
private final double lagWeight;
6162
private final double idleWeight;
6263
private final double defaultProcessingRate;
64+
/**
65+
* Represents the threshold value used to prevent the auto-scaler from scaling down tasks immediately,
66+
* when the computed cost-based metrics fall below this barrier.
67+
* A higher value implies a more conservative scaling down behavior, ensuring that tasks
68+
* are not prematurely terminated in scenarios of potential workload spikes or insufficient cost savings.
69+
*/
70+
private final int scaleDownBarrier;
71+
/**
72+
* Indicates whether task scaling down is limited to periods during task rollovers only.
73+
* If set to {@code false}, allows scaling down during normal task run time.
74+
*/
75+
private final boolean scaleDownDuringTaskRolloverOnly;
6376

6477
@JsonCreator
6578
public CostBasedAutoScalerConfig(
@@ -72,7 +85,9 @@ public CostBasedAutoScalerConfig(
7285
@Nullable @JsonProperty("scaleActionPeriodMillis") Long scaleActionPeriodMillis,
7386
@Nullable @JsonProperty("lagWeight") Double lagWeight,
7487
@Nullable @JsonProperty("idleWeight") Double idleWeight,
75-
@Nullable @JsonProperty("defaultProcessingRate") Double defaultProcessingRate
88+
@Nullable @JsonProperty("defaultProcessingRate") Double defaultProcessingRate,
89+
@Nullable @JsonProperty("scaleDownBarrier") Integer scaleDownBarrier,
90+
@Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean scaleDownDuringTaskRolloverOnly
7691
)
7792
{
7893
this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
@@ -90,6 +105,8 @@ public CostBasedAutoScalerConfig(
90105
this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
91106
this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
92107
this.defaultProcessingRate = Configs.valueOrDefault(defaultProcessingRate, DEFAULT_PROCESSING_RATE);
108+
this.scaleDownBarrier = Configs.valueOrDefault(scaleDownBarrier, DEFAULT_SCALE_DOWN_BARRIER);
109+
this.scaleDownDuringTaskRolloverOnly = Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
93110

94111
if (this.enableTaskAutoScaler) {
95112
Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when enableTaskAutoScaler is true");
@@ -107,17 +124,16 @@ public CostBasedAutoScalerConfig(
107124
}
108125
this.taskCountStart = taskCountStart;
109126

110-
// Validate stopTaskCountRatio
111127
Preconditions.checkArgument(
112128
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0),
113129
"0.0 < stopTaskCountRatio <= 1.0"
114130
);
115131
this.stopTaskCountRatio = stopTaskCountRatio;
116132

117-
// Validate weights are non-negative
118133
Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
119134
Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 0");
120135
Preconditions.checkArgument(this.defaultProcessingRate > 0, "defaultProcessingRate must be > 0");
136+
Preconditions.checkArgument(this.scaleDownBarrier >= 0, "scaleDownBarrier must be >= 0");
121137
}
122138

123139
/**
@@ -196,6 +212,18 @@ public double getDefaultProcessingRate()
196212
return defaultProcessingRate;
197213
}
198214

215+
@JsonProperty
216+
public int getScaleDownBarrier()
217+
{
218+
return scaleDownBarrier;
219+
}
220+
221+
@JsonProperty("scaleDownDuringTaskRolloverOnly")
222+
public boolean isScaleDownOnTaskRolloverOnly()
223+
{
224+
return scaleDownDuringTaskRolloverOnly;
225+
}
226+
199227
@Override
200228
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
201229
{
@@ -222,6 +250,8 @@ public boolean equals(Object o)
222250
&& Double.compare(that.lagWeight, lagWeight) == 0
223251
&& Double.compare(that.idleWeight, idleWeight) == 0
224252
&& Double.compare(that.defaultProcessingRate, defaultProcessingRate) == 0
253+
&& scaleDownBarrier == that.scaleDownBarrier
254+
&& scaleDownDuringTaskRolloverOnly == that.scaleDownDuringTaskRolloverOnly
225255
&& Objects.equals(taskCountStart, that.taskCountStart)
226256
&& Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
227257
}
@@ -239,7 +269,9 @@ public int hashCode()
239269
scaleActionPeriodMillis,
240270
lagWeight,
241271
idleWeight,
242-
defaultProcessingRate
272+
defaultProcessingRate,
273+
scaleDownBarrier,
274+
scaleDownDuringTaskRolloverOnly
243275
);
244276
}
245277

@@ -257,6 +289,8 @@ public String toString()
257289
", lagWeight=" + lagWeight +
258290
", idleWeight=" + idleWeight +
259291
", defaultProcessingRate=" + defaultProcessingRate +
292+
", scaleDownBarrier=" + scaleDownBarrier +
293+
", scaleDownDuringTaskRolloverOnly=" + scaleDownDuringTaskRolloverOnly +
260294
'}';
261295
}
262296

@@ -276,6 +310,8 @@ public static class Builder
276310
private Double lagWeight;
277311
private Double idleWeight;
278312
private Double defaultProcessingRate;
313+
private Integer scaleDownBarrier;
314+
private Boolean scaleDownDuringTaskRolloverOnly;
279315

280316
private Builder()
281317
{
@@ -341,6 +377,18 @@ public Builder defaultProcessingRate(double defaultProcessingRate)
341377
return this;
342378
}
343379

380+
public Builder scaleDownBarrier(int scaleDownBarrier)
381+
{
382+
this.scaleDownBarrier = scaleDownBarrier;
383+
return this;
384+
}
385+
386+
public Builder scaleDownDuringTaskRolloverOnly(boolean scaleDownDuringTaskRolloverOnly)
387+
{
388+
this.scaleDownDuringTaskRolloverOnly = scaleDownDuringTaskRolloverOnly;
389+
return this;
390+
}
391+
344392
public CostBasedAutoScalerConfig build()
345393
{
346394
return new CostBasedAutoScalerConfig(
@@ -353,7 +401,9 @@ public CostBasedAutoScalerConfig build()
353401
scaleActionPeriodMillis,
354402
lagWeight,
355403
idleWeight,
356-
defaultProcessingRate
404+
defaultProcessingRate,
405+
scaleDownBarrier,
406+
scaleDownDuringTaskRolloverOnly
357407
);
358408
}
359409
}

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@
2424
import org.junit.Assert;
2525
import org.junit.Test;
2626

27+
import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_IDLE_WEIGHT;
28+
import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_LAG_WEIGHT;
29+
import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS;
30+
import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_PROCESSING_RATE;
31+
import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
32+
import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_DOWN_BARRIER;
33+
2734
public class CostBasedAutoScalerConfigTest
2835
{
2936
private final ObjectMapper mapper = new DefaultObjectMapper();
@@ -42,7 +49,9 @@ public void testSerdeWithAllProperties() throws Exception
4249
+ " \"scaleActionPeriodMillis\": 60000,\n"
4350
+ " \"lagWeight\": 0.6,\n"
4451
+ " \"idleWeight\": 0.4,\n"
45-
+ " \"distancePenaltyExponent\": 3.0\n"
52+
+ " \"defaultProcessingRate\": 2000.0,\n"
53+
+ " \"scaleDownBarrier\": 10,\n"
54+
+ " \"scaleDownDuringTaskRolloverOnly\": true\n"
4655
+ "}";
4756

4857
CostBasedAutoScalerConfig config = mapper.readValue(json, CostBasedAutoScalerConfig.class);
@@ -56,6 +65,9 @@ public void testSerdeWithAllProperties() throws Exception
5665
Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
5766
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
5867
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
68+
Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
69+
Assert.assertEquals(10, config.getScaleDownBarrier());
70+
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
5971

6072
// Test serialization back to JSON
6173
String serialized = mapper.writeValueAsString(config);
@@ -81,10 +93,15 @@ public void testSerdeWithDefaults() throws Exception
8193
Assert.assertEquals(2, config.getTaskCountMin());
8294

8395
// Check defaults
84-
Assert.assertEquals(900000L, config.getScaleActionPeriodMillis());
85-
Assert.assertEquals(1200000L, config.getMinTriggerScaleActionFrequencyMillis());
86-
Assert.assertEquals(0.25, config.getLagWeight(), 0.001);
87-
Assert.assertEquals(0.75, config.getIdleWeight(), 0.001);
96+
Assert.assertEquals(DEFAULT_SCALE_ACTION_PERIOD_MILLIS, config.getScaleActionPeriodMillis());
97+
Assert.assertEquals(DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS, config.getMinTriggerScaleActionFrequencyMillis());
98+
Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001);
99+
Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001);
100+
Assert.assertEquals(DEFAULT_PROCESSING_RATE, config.getDefaultProcessingRate(), 0.001);
101+
Assert.assertEquals(DEFAULT_SCALE_DOWN_BARRIER, config.getScaleDownBarrier());
102+
Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
103+
Assert.assertNull(config.getTaskCountStart());
104+
Assert.assertNull(config.getStopTaskCountRatio());
88105
}
89106

90107
@Test
@@ -98,6 +115,9 @@ public void testSerdeWithDisabledAutoScaler() throws Exception
98115
CostBasedAutoScalerConfig config = mapper.readValue(json, CostBasedAutoScalerConfig.class);
99116

100117
Assert.assertFalse(config.getEnableTaskAutoScaler());
118+
// When disabled, taskCountMax and taskCountMin default to 0
119+
Assert.assertEquals(0, config.getTaskCountMax());
120+
Assert.assertEquals(0, config.getTaskCountMin());
101121
}
102122

103123
@Test(expected = RuntimeException.class)
@@ -149,4 +169,36 @@ public void testValidation_InvalidStopTaskCountRatio()
149169
.enableTaskAutoScaler(true)
150170
.build();
151171
}
172+
173+
@Test
174+
public void testBuilder()
175+
{
176+
CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
177+
.taskCountMax(100)
178+
.taskCountMin(5)
179+
.taskCountStart(10)
180+
.enableTaskAutoScaler(true)
181+
.minTriggerScaleActionFrequencyMillis(600000L)
182+
.stopTaskCountRatio(0.8)
183+
.scaleActionPeriodMillis(60000L)
184+
.lagWeight(0.6)
185+
.idleWeight(0.4)
186+
.defaultProcessingRate(2000.0)
187+
.scaleDownBarrier(10)
188+
.scaleDownDuringTaskRolloverOnly(true)
189+
.build();
190+
191+
Assert.assertTrue(config.getEnableTaskAutoScaler());
192+
Assert.assertEquals(100, config.getTaskCountMax());
193+
Assert.assertEquals(5, config.getTaskCountMin());
194+
Assert.assertEquals(Integer.valueOf(10), config.getTaskCountStart());
195+
Assert.assertEquals(600000L, config.getMinTriggerScaleActionFrequencyMillis());
196+
Assert.assertEquals(Double.valueOf(0.8), config.getStopTaskCountRatio());
197+
Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
198+
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
199+
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
200+
Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
201+
Assert.assertEquals(10, config.getScaleDownBarrier());
202+
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
203+
}
152204
}

0 commit comments

Comments
 (0)