Skip to content

Commit 85de46c

Browse files
committed
fix(kafka-connect): support initial_state when creating connectors (KIP-980)
This fixes issue #637 where connectors could not be created with an initial state of STOPPED or PAUSED. The connector would always start in RUNNING state regardless of the desired state specified in the YAML configuration. Changes: - Add ConnectorCreateRequest record for POST /connectors API - Add createConnector method to KafkaConnectApi using POST endpoint - Update KafkaConnectorChangeHandler to use POST with initial_state for CREATE - Update tests to verify initial_state is passed correctly Closes #637
1 parent 022479e commit 85de46c

File tree

4 files changed

+172
-5
lines changed

4 files changed

+172
-5
lines changed

providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectApi.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package io.streamthoughts.jikkou.kafka.connect.api;
88

99
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectCluster;
10+
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorCreateRequest;
1011
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorInfoResponse;
1112
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorStatusResponse;
1213
import jakarta.ws.rs.Consumes;
@@ -58,6 +59,19 @@ public interface KafkaConnectApi extends AutoCloseable {
5859
@Path("connectors")
5960
List<String> listConnectors();
6061

62+
/**
63+
* Create a new connector with the given configuration and optional initial state.
64+
* This method supports KIP-980, allowing the connector to be created in a specific
65+
* initial state (RUNNING, STOPPED, or PAUSED).
66+
*
67+
* @param request the connector creation request containing name, config, and optional initial state.
68+
* @return information about the created connector
69+
*/
70+
@POST
71+
@Path("connectors")
72+
@Consumes(MediaType.APPLICATION_JSON)
73+
ConnectorInfoResponse createConnector(ConnectorCreateRequest request);
74+
6175
/**
6276
* Create a new connector using the given configuration, or update the configuration
6377
* for an existing connector. Returns information about the connector after the change has been made
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) The original authors
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.jikkou.kafka.connect.api.data;
8+
9+
import com.fasterxml.jackson.annotation.JsonInclude;
10+
import com.fasterxml.jackson.annotation.JsonProperty;
11+
import io.streamthoughts.jikkou.core.annotation.Reflectable;
12+
import jakarta.validation.constraints.NotNull;
13+
import java.io.Serializable;
14+
import java.util.Map;
15+
16+
/**
17+
* Request body for creating a new connector via POST /connectors.
18+
*
19+
* @param name the name of the connector.
20+
* @param config the configuration of the connector.
21+
* @param initialState the initial state of the connector (RUNNING, STOPPED, or PAUSED).
22+
* This field is optional; if not specified, the connector starts in RUNNING state.
23+
* @see io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi#createConnector(ConnectorCreateRequest)
24+
*/
25+
@Reflectable
26+
@JsonInclude(JsonInclude.Include.NON_NULL)
27+
public record ConnectorCreateRequest(@JsonProperty("name") @NotNull String name,
28+
@JsonProperty("config") @NotNull Map<String, Object> config,
29+
@JsonProperty("initial_state") String initialState
30+
) implements Serializable {
31+
32+
/**
33+
* Creates a ConnectorCreateRequest with default RUNNING state.
34+
*
35+
* @param name the connector name.
36+
* @param config the connector configuration.
37+
*/
38+
public ConnectorCreateRequest(String name, Map<String, Object> config) {
39+
this(name, config, null);
40+
}
41+
}

providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.streamthoughts.jikkou.core.reconciler.change.BaseChangeHandler;
2424
import io.streamthoughts.jikkou.http.client.RestClientException;
2525
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
26+
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorCreateRequest;
2627
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorInfoResponse;
2728
import io.streamthoughts.jikkou.kafka.connect.api.data.ErrorResponse;
2829
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
@@ -67,15 +68,15 @@ private Stream<ChangeResponse> handleChange(ResourceChange change) {
6768
case NONE -> Stream.empty(); // no change of these types should be handled by this class.
6869
case REPLACE -> null;
6970
case UPDATE -> updateConnector(change);
70-
case CREATE -> createOrUpdateConnectorConfig(change);
71+
case CREATE -> createConnector(change);
7172
case DELETE -> deleteConnector(change);
7273
};
7374
}
7475

7576
@NotNull
7677
private Stream<ChangeResponse> updateConnector(ResourceChange change) {
7778
if (!isStateOnlyChange(change)) {
78-
return createOrUpdateConnectorConfig(change);
79+
return updateConnectorConfig(change);
7980
}
8081
SpecificStateChange<KafkaConnectorState> stateChange = getState(change);
8182

@@ -104,8 +105,39 @@ private Stream<ChangeResponse> deleteConnector(ResourceChange change) {
104105
return Stream.of(toChangeResponse(change, future));
105106
}
106107

108+
/**
109+
* Creates a new connector using POST /connectors with initial_state support (KIP-980).
110+
*/
111+
@NotNull
112+
private Stream<ChangeResponse> createConnector(ResourceChange change) {
113+
final String connectorName = change.getMetadata().getName();
114+
final Map<String, Object> configAsMap = buildConnectorConfig(change);
115+
final SpecificStateChange<KafkaConnectorState> stateChange = getState(change);
116+
final KafkaConnectorState desiredState = stateChange.getAfter();
117+
118+
// Determine the initial_state parameter for KIP-980
119+
// Only RUNNING, STOPPED, and PAUSED are valid initial states
120+
final String initialState = switch (desiredState) {
121+
case STOPPED, PAUSED -> desiredState.value();
122+
// For RUNNING or other states (including null), don't set initial_state (defaults to RUNNING)
123+
case RUNNING, UNASSIGNED, RESTARTING, FAILED -> null;
124+
case null -> null;
125+
};
126+
127+
ConnectorCreateRequest request = new ConnectorCreateRequest(connectorName, configAsMap, initialState);
128+
CompletableFuture<ConnectorInfoResponse> future = CompletableFuture.supplyAsync(() ->
129+
api.createConnector(request)
130+
);
131+
132+
ChangeResponse response = toChangeResponse(change, future);
133+
return Stream.of(response);
134+
}
135+
136+
/**
137+
* Updates an existing connector's configuration using PUT /connectors/{name}/config.
138+
*/
107139
@NotNull
108-
private Stream<ChangeResponse> createOrUpdateConnectorConfig(ResourceChange change) {
140+
private Stream<ChangeResponse> updateConnectorConfig(ResourceChange change) {
109141
final Map<String, Object> configAsMap = buildConnectorConfig(change);
110142
CompletableFuture<ConnectorInfoResponse> future = CompletableFuture.supplyAsync(() ->
111143
api.createOrUpdateConnector(change.getMetadata().getName(), configAsMap)

providers/jikkou-provider-kafka-connect/src/test/java/io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandlerTest.java

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,20 @@
1515
import io.streamthoughts.jikkou.core.reconciler.ChangeResponse;
1616
import io.streamthoughts.jikkou.core.reconciler.Operation;
1717
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
18+
import io.streamthoughts.jikkou.kafka.connect.api.data.ConnectorCreateRequest;
1819
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
1920
import java.util.List;
2021
import org.junit.jupiter.api.Assertions;
2122
import org.junit.jupiter.api.Test;
23+
import org.mockito.ArgumentCaptor;
2224
import org.mockito.Mockito;
2325

2426
class KafkaConnectorChangeHandlerTest {
2527

2628
public static final String TEST_CONNECTOR_NAME = "test";
2729

2830
@Test
29-
void shouldUpdateConfigForAddChange() {
31+
void shouldCreateConnectorWithNoInitialStateForRunningState() {
3032
KafkaConnectApi mkKafkaConnectApi = Mockito.mock(KafkaConnectApi.class);
3133
KafkaConnectorChangeHandler handler = new KafkaConnectorChangeHandler(mkKafkaConnectApi, TEST_CONNECTOR_NAME);
3234

@@ -51,8 +53,86 @@ void shouldUpdateConfigForAddChange() {
5153
Assertions.assertEquals(1, results.size());
5254

5355
AsyncUtils.getValue(results.getFirst().getResults());
56+
57+
ArgumentCaptor<ConnectorCreateRequest> requestCaptor = ArgumentCaptor.forClass(ConnectorCreateRequest.class);
58+
Mockito.verify(mkKafkaConnectApi, Mockito.times(1))
59+
.createConnector(requestCaptor.capture());
60+
61+
ConnectorCreateRequest request = requestCaptor.getValue();
62+
Assertions.assertEquals(TEST_CONNECTOR_NAME, request.name());
63+
Assertions.assertNull(request.initialState());
64+
}
65+
66+
@Test
67+
void shouldCreateConnectorWithStoppedInitialState() {
68+
KafkaConnectApi mkKafkaConnectApi = Mockito.mock(KafkaConnectApi.class);
69+
KafkaConnectorChangeHandler handler = new KafkaConnectorChangeHandler(mkKafkaConnectApi, TEST_CONNECTOR_NAME);
70+
71+
ResourceChange change = GenericResourceChange
72+
.builder()
73+
.withMetadata(ObjectMeta
74+
.builder()
75+
.withName(TEST_CONNECTOR_NAME)
76+
.build()
77+
)
78+
.withSpec(ResourceChangeSpec
79+
.builder()
80+
.withOperation(Operation.CREATE)
81+
.withChange(StateChange.create("connectorClass", "???"))
82+
.withChange(StateChange.create("tasksMax", 1))
83+
.withChange(StateChange.create("state", KafkaConnectorState.STOPPED))
84+
.build()
85+
)
86+
.build();
87+
88+
List<ChangeResponse> results = handler.handleChanges(List.of(change));
89+
Assertions.assertEquals(1, results.size());
90+
91+
AsyncUtils.getValue(results.getFirst().getResults());
92+
93+
ArgumentCaptor<ConnectorCreateRequest> requestCaptor = ArgumentCaptor.forClass(ConnectorCreateRequest.class);
5494
Mockito.verify(mkKafkaConnectApi, Mockito.times(1))
55-
.createOrUpdateConnector(Mockito.eq(TEST_CONNECTOR_NAME), Mockito.anyMap());
95+
.createConnector(requestCaptor.capture());
96+
97+
ConnectorCreateRequest request = requestCaptor.getValue();
98+
Assertions.assertEquals(TEST_CONNECTOR_NAME, request.name());
99+
Assertions.assertEquals("STOPPED", request.initialState());
100+
}
101+
102+
@Test
103+
void shouldCreateConnectorWithPausedInitialState() {
104+
KafkaConnectApi mkKafkaConnectApi = Mockito.mock(KafkaConnectApi.class);
105+
KafkaConnectorChangeHandler handler = new KafkaConnectorChangeHandler(mkKafkaConnectApi, TEST_CONNECTOR_NAME);
106+
107+
ResourceChange change = GenericResourceChange
108+
.builder()
109+
.withMetadata(ObjectMeta
110+
.builder()
111+
.withName(TEST_CONNECTOR_NAME)
112+
.build()
113+
)
114+
.withSpec(ResourceChangeSpec
115+
.builder()
116+
.withOperation(Operation.CREATE)
117+
.withChange(StateChange.create("connectorClass", "???"))
118+
.withChange(StateChange.create("tasksMax", 1))
119+
.withChange(StateChange.create("state", KafkaConnectorState.PAUSED))
120+
.build()
121+
)
122+
.build();
123+
124+
List<ChangeResponse> results = handler.handleChanges(List.of(change));
125+
Assertions.assertEquals(1, results.size());
126+
127+
AsyncUtils.getValue(results.getFirst().getResults());
128+
129+
ArgumentCaptor<ConnectorCreateRequest> requestCaptor = ArgumentCaptor.forClass(ConnectorCreateRequest.class);
130+
Mockito.verify(mkKafkaConnectApi, Mockito.times(1))
131+
.createConnector(requestCaptor.capture());
132+
133+
ConnectorCreateRequest request = requestCaptor.getValue();
134+
Assertions.assertEquals(TEST_CONNECTOR_NAME, request.name());
135+
Assertions.assertEquals("PAUSED", request.initialState());
56136
}
57137

58138
@Test

0 commit comments

Comments
 (0)