Skip to content

Enhance autoscaling during task rollover#18954

Open
Fly-Style wants to merge 8 commits intoapache:masterfrom
Fly-Style:cost-autoscaler-correct-rollover-placement
Open

Enhance autoscaling during task rollover#18954
Fly-Style wants to merge 8 commits intoapache:masterfrom
Fly-Style:cost-autoscaler-correct-rollover-placement

Conversation

@Fly-Style
Copy link
Contributor

@Fly-Style Fly-Style commented Jan 26, 2026

This patch improves the autoscaling during task rollover. Previously, despite the alorithmic correctness, the emptyness ofactivelyReadingTaskGroups was a very rare event in reality (but that accidentally happened to me during the manual testing of this feature), and the task scaling down happened very rarely as well, which is inappropriate given the facts that:

UPD: to not to block cost-based autoscaler to be used in production, there is a parralel patch allowing the scaledown during task runtime: #18958

Changeset

  • Introduced a two-phase rollover scaling flow: detect/prepare during task duration checks and apply only after all active tasks with the old partition assignment stop.
  • This stopping logic respects maxAllowedStops to avoid worker exhaustion and a disruptive ingestion stop.
  • Add missed autoscaler-required task metrics once rollover scaling is applied.
  • Added a testing hook for autoscaler metrics and new unit tests covering rollover scale-down behavior.

Key changed class in this PR

SeekableStreamSupervisor

Algorithm

  • Phase 1: Gradual stop (in checkTaskDuration()) :
    • At least one task hits end of life -> autoscaler recommends a taskCount, if needed
    • Set pendingRolloverTaskCount, stop tasks (we respect maxAllowedStops)
    • The process itself takes multiple cycles to stop all tasks
  • Phase 2: Apply scale (in maybeApplyPendingScaleRollover())
    • By reaching conditions, change taskCount, clear allocation info, cleanup.

Full flow is available on the diagram under the spoiler.

Details
sequenceDiagram
    participant S as Supervisor
    participant ATG as activelyReadingTaskGroups
    participant PCG as pendingCompletionTaskGroups
    participant AS as AutoScaler

    Note over S: Cycle 1 - Scale triggered
    S->>ATG: 8 task groups running
    S->>AS: computeTaskCountForRollover()
    AS-->>S: returns 4
    S->>S: pendingRolloverTaskCount = 4
    S->>ATG: Stop 3 groups (maxAllowedStops)
    S->>PCG: Move 3 groups to publishing
    Note over ATG: 5 groups remain

    Note over S: Cycle 2 - Continue stopping
    S->>PCG: 1 publishing complete
    S->>ATG: Stop 3 more groups
    S->>PCG: Move to publishing
    Note over ATG: 2 groups remain

    Note over S: Cycle 3 - Phase 2 triggers
    S->>PCG: 3 publishing complete
    S->>ATG: Stop remaining 2 groups
    S->>PCG: Move to publishing
    Note over ATG: EMPTY - Phase 2 triggers!
    S->>S: changeTaskCountInIOConfig(4)
    S->>S: clearAllocationInfo()
    S->>S: pendingRolloverTaskCount = null

    Note over S: Cycle 4 - New tasks created
    S->>S: updatePartitionDataFromStream()
    Note over S: Rebuild partitionGroups with taskCount=4
    S->>ATG: Create 4 new task groups
    Note over S: All 4 tasks start (capacity available)
Loading

docker-compose.yml for those who are interested to test it themselves (build Druid images with tag local) :

Details
version: "2.2"

volumes:
  metadata_data: {}
  middle_var: {}
  historical_var: {}
  broker_var: {}
  coordinator_var: {}
  router_var: {}
  druid_shared: {}


services:
  postgres:
    container_name: postgres
    image: postgres:17.6
    ports:
      - "5432:5432"
    volumes:
      - metadata_data:/var/lib/postgresql/data
    environment:
      - POSTGRES_PASSWORD=FoolishPassword
      - POSTGRES_USER=druid
      - POSTGRES_DB=druid

  # Need 3.5 or later for container nodes
  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.5.10
    ports:
      - "2181:2181"
    environment:
      - ZOO_MY_ID=1

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  kafka-producer:
    container_name: kafka-producer
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - kafka
    command: >
      bash -c "
        echo 'Waiting for Kafka to be ready...' &&
        cub kafka-ready -b kafka:29092 1 120 &&
        echo 'Creating topic...' &&
        kafka-topics --create --if-not-exists --bootstrap-server kafka:29092 --topic druid-events --partitions 8 --replication-factor 1 &&
        echo 'Starting to produce events at 100 event per 1 second...' &&
        while true; do
          TIMESTAMP=$$(date +%s%3N)
          EVENT=\"{\\\"timestamp\\\":\\\"$$(date -u +%Y-%m-%dT%H:%M:%S.%3NZ)\\\",\\\"event_type\\\":\\\"test_event\\\",\\\"value\\\":$$RANDOM,\\\"message\\\":\\\"Event at $$TIMESTAMP\\\"}\"
          echo \"$$EVENT\" | kafka-console-producer --bootstrap-server kafka:29092 --topic druid-events
          echo \"Produced: $$EVENT\"
          sleep 1
        done
      "

  coordinator:
    image: apache/druid:local
    container_name: coordinator
    volumes:
      - druid_shared:/opt/shared
      - coordinator_var:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
    ports:
      - "8081:8081"
    command:
      - coordinator
    env_file:
      - environment

  broker:
    image: apache/druid:local
    container_name: broker
    volumes:
      - broker_var:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8082:8082"
    command:
      - broker
    env_file:
      - environment

  historical:
    image: apache/druid:local
    container_name: historical
    volumes:
      - druid_shared:/opt/shared
      - historical_var:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8083:8083"
    command:
      - historical
    env_file:
      - environment

  middlemanager:
    image: apache/druid:local
    container_name: middlemanager
    volumes:
      - druid_shared:/opt/shared
      - middle_var:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8091:8091"
      - "8100-8105:8100-8105"
    command:
      - middleManager
    env_file:
      - environment

  router:
    image: apache/druid:local
    container_name: router
    volumes:
      - router_var:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8888:8888"
    command:
      - router
    env_file:
      - environment

This PR has:

  • been self-reviewed.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster.

@Fly-Style Fly-Style marked this pull request as ready for review January 26, 2026 12:53
@Fly-Style Fly-Style force-pushed the cost-autoscaler-correct-rollover-placement branch from 8679300 to 59c076d Compare January 26, 2026 12:58
@Fly-Style Fly-Style force-pushed the cost-autoscaler-correct-rollover-placement branch from 59c076d to 4c0ba02 Compare January 26, 2026 13:01
@jtuglu1 jtuglu1 self-requested a review January 26, 2026 17:18
@Fly-Style Fly-Style force-pushed the cost-autoscaler-correct-rollover-placement branch from 74cc0a6 to b6ccc54 Compare January 26, 2026 18:05
@Fly-Style Fly-Style closed this Jan 26, 2026
@Fly-Style Fly-Style reopened this Jan 26, 2026
Copy link
Contributor

@uds5501 uds5501 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible bug 😓


// Set the pending rollover flag - actual change applied in Phase 2
// when ALL actively reading task groups have stopped
pendingRolloverTaskCount = rolloverTaskCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that a scenario happens where the rolloverTaskCount is assigned x and the task groups we end up stopping is y and x > y (which seems possible due to max allowed stops config).
If this happens, during Phase 2, we will end up not applying the change at all and be stuck in a loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixed: 171344a

Copy link
Contributor

@jtuglu1 jtuglu1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Left some initial comments


// Set the pending rollover flag - actual change applied in Phase 2
// when ALL actively reading task groups have stopped
pendingRolloverTaskCount = rolloverTaskCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question here is whether we want to invalidate these results if things have drastically changed during a roll-over. For supervisors running 100s of tasks (think 300-400 tasks) a roll-over period might take 10-15mins depending on how aggressive you are (also depends on how many other concurrent supervisors you have running). Naturally, lag will spike much higher on roll-over so perhaps it's worth considering some sort of sanity check before committing this result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I thought and make scale down during rollover as configurable option here: #18958.
Definitely, there are cases when we may not want to use this option and scale down during task runtime, as usual.

}
log.info(
"Stopping taskGroup[%d] for autoscaler rollover to [%d] tasks.",
groupId, rolloverTaskCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe log both stoppedForRollover and availableStops?

Copy link
Contributor Author

@Fly-Style Fly-Style Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here: 171344a, IIRC.

maybeApplyPendingScaleRollover();

checkPendingCompletionTasks();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called after checkPendingCompletionTasks()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybeApplyPendingScaleRollover is gated only on activelyReadingTaskGroups being empty, and it’s safe even if there are still publishing tasks. Applying the taskCount change earlier (before you prune/kill pending completion groups) ensures that the new allocation is staged as soon as it’s safe and avoids extra cycles where you keep old taskCount despite no active readers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the maybeApplyPendingScaleRollover() function comments then?

* This method is called after {@link #checkPendingCompletionTasks()} to check if a pending
   * scale rollover can be applied. The scale is only applied when:
   * <ul>
   *   <li>A pending rollover was set up in {@link #checkTaskDuration()} (Phase 1)</li>
   *   <li>All actively reading task groups have stopped (moved to pendingCompletionTaskGroups)</li>
   * </ul>
   * <p>
   * By deferring the taskCount change until all old tasks have stopped, we avoid
   * partition allocation mismatches that would cause {@link #discoverTasks()} to kill
   * publishing tasks on the next cycle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch! Thanks in advance!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Fly-Style Fly-Style requested review from jtuglu1 and uds5501 January 29, 2026 16:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants