Skip to content

Commit 89f2337

Browse files
authored
NIFI-15544: If a Connector requires processors or controller services that are unavailable, make Connector invalid but not ghosted (#10851)
1 parent 9822e3e commit 89f2337

File tree

10 files changed

+406
-25
lines changed

10 files changed

+406
-25
lines changed

nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
import org.apache.nifi.components.ConfigVerificationResult;
2929
import org.apache.nifi.components.ValidationResult;
3030
import org.apache.nifi.components.connector.AssetReference;
31+
import org.apache.nifi.components.connector.Connector;
3132
import org.apache.nifi.components.connector.ConnectorNode;
3233
import org.apache.nifi.components.connector.ConnectorRepository;
3334
import org.apache.nifi.components.connector.ConnectorState;
3435
import org.apache.nifi.components.connector.ConnectorValueReference;
3536
import org.apache.nifi.components.connector.FlowUpdateException;
37+
import org.apache.nifi.components.connector.GhostConnector;
3638
import org.apache.nifi.components.connector.SecretReference;
3739
import org.apache.nifi.components.connector.StandaloneConnectorRequestReplicator;
3840
import org.apache.nifi.components.connector.StepConfiguration;
@@ -188,6 +190,15 @@ public void instantiateConnector(final String connectorClassName) {
188190

189191
final BundleCoordinate bundleCoordinate = bundles.getFirst().getBundleDetails().getCoordinate();
190192
connectorNode = flowController.getFlowManager().createConnector(connectorClassName, CONNECTOR_ID, bundleCoordinate, true, true);
193+
194+
if (connectorNode.isExtensionMissing()) {
195+
final Connector connector = connectorNode.getConnector();
196+
if (connector instanceof final GhostConnector ghostConnector) {
197+
throw new IllegalStateException("Failed to create Connector of type " + connectorClassName, ghostConnector.getCauseOfGhost());
198+
} else {
199+
throw new IllegalStateException("Failed to create Connector of type " + connectorClassName);
200+
}
201+
}
191202
}
192203

193204
@Override

nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-integration-tests/src/test/java/org/apache/nifi/mock/connectors/tests/CreateConnectorIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import java.io.File;
2525
import java.io.IOException;
2626

27+
import static org.junit.jupiter.api.Assertions.assertThrows;
28+
import static org.junit.jupiter.api.Assertions.assertTrue;
29+
2730
public class CreateConnectorIT {
2831

2932
@Test
@@ -37,4 +40,18 @@ public void testCreateStartAndStopGenerateAndUpdateConnector() throws IOExceptio
3740
testRunner.stopConnector();
3841
}
3942
}
43+
44+
@Test
45+
public void testConnectorWithMissingBundleThrowsException() {
46+
final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> {
47+
new StandardConnectorTestRunner.Builder()
48+
.connectorClassName("org.apache.nifi.mock.connectors.MissingBundleConnector")
49+
.narLibraryDirectory(new File("target/libDir"))
50+
.build();
51+
});
52+
53+
final String message = exception.getMessage();
54+
assertTrue(message.contains("com.example.nonexistent:missing-nar:1.0.0"), "Expected exception message to contain missing bundle coordinates but was: " + message);
55+
assertTrue(message.contains("com.example.nonexistent.MissingProcessor"), "Expected exception message to contain missing processor type but was: " + message);
56+
}
4057
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.mock.connectors;
19+
20+
import org.apache.nifi.components.ConfigVerificationResult;
21+
import org.apache.nifi.components.connector.AbstractConnector;
22+
import org.apache.nifi.components.connector.ConfigurationStep;
23+
import org.apache.nifi.components.connector.FlowUpdateException;
24+
import org.apache.nifi.components.connector.components.FlowContext;
25+
import org.apache.nifi.flow.Bundle;
26+
import org.apache.nifi.flow.Position;
27+
import org.apache.nifi.flow.ScheduledState;
28+
import org.apache.nifi.flow.VersionedExternalFlow;
29+
import org.apache.nifi.flow.VersionedProcessGroup;
30+
import org.apache.nifi.flow.VersionedProcessor;
31+
32+
import java.util.HashSet;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.UUID;
36+
37+
/**
38+
* A test connector that returns an initial flow containing a processor with a bundle that does not exist.
39+
* This is used to test the behavior when a connector's initial flow references unavailable components.
40+
*/
41+
public class MissingBundleConnector extends AbstractConnector {
42+
43+
@Override
44+
public VersionedExternalFlow getInitialFlow() {
45+
final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
46+
rootGroup.setIdentifier(UUID.randomUUID().toString());
47+
rootGroup.setInstanceIdentifier(UUID.randomUUID().toString());
48+
rootGroup.setName("Missing Bundle Connector Flow");
49+
rootGroup.setPosition(new Position(0.0, 0.0));
50+
rootGroup.setProcessGroups(new HashSet<>());
51+
rootGroup.setConnections(new HashSet<>());
52+
rootGroup.setInputPorts(new HashSet<>());
53+
rootGroup.setOutputPorts(new HashSet<>());
54+
rootGroup.setControllerServices(new HashSet<>());
55+
rootGroup.setFunnels(new HashSet<>());
56+
rootGroup.setLabels(new HashSet<>());
57+
58+
final VersionedProcessor missingProcessor = new VersionedProcessor();
59+
missingProcessor.setIdentifier(UUID.randomUUID().toString());
60+
missingProcessor.setInstanceIdentifier(UUID.randomUUID().toString());
61+
missingProcessor.setName("Missing Processor");
62+
missingProcessor.setType("com.example.nonexistent.MissingProcessor");
63+
missingProcessor.setPosition(new Position(100.0, 100.0));
64+
missingProcessor.setScheduledState(ScheduledState.ENABLED);
65+
missingProcessor.setSchedulingPeriod("0 sec");
66+
missingProcessor.setSchedulingStrategy("TIMER_DRIVEN");
67+
missingProcessor.setExecutionNode("ALL");
68+
missingProcessor.setPenaltyDuration("30 sec");
69+
missingProcessor.setYieldDuration("1 sec");
70+
missingProcessor.setBulletinLevel("WARN");
71+
missingProcessor.setRunDurationMillis(0L);
72+
missingProcessor.setConcurrentlySchedulableTaskCount(1);
73+
missingProcessor.setAutoTerminatedRelationships(new HashSet<>());
74+
missingProcessor.setProperties(Map.of());
75+
missingProcessor.setPropertyDescriptors(Map.of());
76+
missingProcessor.setGroupIdentifier(rootGroup.getIdentifier());
77+
78+
final Bundle missingBundle = new Bundle();
79+
missingBundle.setGroup("com.example.nonexistent");
80+
missingBundle.setArtifact("missing-nar");
81+
missingBundle.setVersion("1.0.0");
82+
missingProcessor.setBundle(missingBundle);
83+
84+
rootGroup.setProcessors(new HashSet<>(List.of(missingProcessor)));
85+
86+
final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
87+
externalFlow.setFlowContents(rootGroup);
88+
return externalFlow;
89+
}
90+
91+
@Override
92+
protected void onStepConfigured(final String stepName, final FlowContext flowContext) {
93+
}
94+
95+
@Override
96+
public List<ConfigurationStep> getConfigurationSteps() {
97+
return List.of();
98+
}
99+
100+
@Override
101+
public void applyUpdate(final FlowContext workingContext, final FlowContext activeContext) throws FlowUpdateException {
102+
}
103+
104+
@Override
105+
public List<ConfigVerificationResult> verifyConfigurationStep(final String stepName, final Map<String, String> overrides, final FlowContext workingContext) {
106+
return List.of();
107+
}
108+
}

nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414
# limitations under the License.
1515

1616
org.apache.nifi.mock.connectors.GenerateAndLog
17+
org.apache.nifi.mock.connectors.MissingBundleConnector

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ public class GhostConnector implements Connector {
3333
private final String canonicalClassName;
3434
private final List<ValidationResult> validationResults;
3535
private final List<ConfigVerificationResult> configVerificationResults;
36+
private final Exception causeOfGhost;
3637

37-
public GhostConnector(final String identifier, final String canonicalClassName) {
38+
public GhostConnector(final String identifier, final String canonicalClassName, final Exception causeOfGhost) {
3839
this.identifier = identifier;
3940
this.canonicalClassName = canonicalClassName;
41+
this.causeOfGhost = causeOfGhost;
4042

4143
validationResults = List.of(new ValidationResult.Builder()
4244
.subject("Missing Connector")
@@ -51,6 +53,10 @@ public GhostConnector(final String identifier, final String canonicalClassName)
5153
.build());
5254
}
5355

56+
public Exception getCauseOfGhost() {
57+
return causeOfGhost;
58+
}
59+
5460
@Override
5561
public void initialize(final ConnectorInitializationContext initContext) {
5662
}

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java

Lines changed: 127 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public class StandardConnectorNode implements ConnectorNode {
100100
private final boolean extensionMissing;
101101
private volatile boolean triggerValidation = true;
102102
private final AtomicReference<CompletableFuture<Void>> drainFutureRef = new AtomicReference<>();
103+
private volatile ValidationResult unresolvedBundleValidationResult = null;
103104

104105
private volatile FrameworkFlowContext workingFlowContext;
105106

@@ -709,11 +710,18 @@ public void loadInitialFlow() throws FlowUpdateException {
709710
if (initialFlow == null) {
710711
logger.info("{} has no initial flow to load", this);
711712
} else {
712-
logger.info("Loading initial flow for {}", this);
713-
// Update all RUNNING components to ENABLED before applying the initial flow so that components
714-
// are not started before being configured.
715-
stopComponents(initialFlow.getFlowContents());
716-
initializationContext.updateFlow(activeFlowContext, initialFlow, BundleCompatibility.RESOLVE_BUNDLE);
713+
final ValidationResult unresolvedBundleResult = validateBundlesCanBeResolved(initialFlow.getFlowContents(), initializationContext.getComponentBundleLookup());
714+
715+
if (unresolvedBundleResult != null) {
716+
logger.error("Cannot load initial flow for {} because some component bundles cannot be resolved: {}", this, unresolvedBundleResult.getExplanation());
717+
unresolvedBundleValidationResult = unresolvedBundleResult;
718+
} else {
719+
logger.info("Loading initial flow for {}", this);
720+
// Update all RUNNING components to ENABLED before applying the initial flow so that components
721+
// are not started before being configured.
722+
stopComponents(initialFlow.getFlowContents());
723+
initializationContext.updateFlow(activeFlowContext, initialFlow, BundleCompatibility.RESOLVE_BUNDLE);
724+
}
717725
}
718726

719727
resetValidationState();
@@ -738,6 +746,101 @@ private void stopComponents(final VersionedProcessGroup group) {
738746
}
739747
}
740748

749+
/**
750+
* Ensures that all bundles required by the given Process Group can be resolved. We do this in order to make the Connector
751+
* invalid if any Processor or Controller Service cannot be properly instantiated due to missing bundles. We intentionally
752+
* differentiate between making the Connector invalid versus Ghosting the Connector for a few reasons:
753+
* <ul>
754+
* <li>
755+
* Ghosting the Connector would prevent us from even getting the Configuration Steps, and it results in all Properties becoming sensitive. This can lead to confusion.
756+
* </li>
757+
* <li>
758+
* The flow may change dynamically and so it's possible for a Connector to be valid given its initial flow and then become invalid
759+
* based on configuration because the new configuration requires a new component that is unavailable. We would not suddenly change from
760+
* a valid Connector to a Ghosted Connector, we could only become invalid. We do not want a missing component in the Initial Flow to be
761+
* treated differently than a missing component from a subsequent flow update.
762+
* </li>
763+
* <li>
764+
* Ghosting should be reserved for situations where the extension itself is missing.
765+
* </li>
766+
* </ul>
767+
*
768+
* @param group the process group to validate
769+
* @param bundleLookup the bundle lookup
770+
* @return a ValidationResult describing the missing bundles if any are missing; null if all bundles can be resolved
771+
*/
772+
private ValidationResult validateBundlesCanBeResolved(final VersionedProcessGroup group, final ComponentBundleLookup bundleLookup) {
773+
final Set<String> missingBundles = new HashSet<>();
774+
final Set<String> missingProcessorTypes = new HashSet<>();
775+
final Set<String> missingControllerServiceTypes = new HashSet<>();
776+
777+
collectUnresolvedBundles(group, bundleLookup, missingBundles, missingProcessorTypes, missingControllerServiceTypes);
778+
779+
if (missingBundles.isEmpty()) {
780+
return null;
781+
}
782+
783+
final StringBuilder explanation = new StringBuilder();
784+
explanation.append("%d Processors and %d Controller Services unavailable from %d missing bundles".formatted(
785+
missingProcessorTypes.size(), missingControllerServiceTypes.size(), missingBundles.size()));
786+
explanation.append("\nMissing Bundles: %s".formatted(missingBundles));
787+
if (!missingProcessorTypes.isEmpty()) {
788+
explanation.append("\nMissing Processors: %s".formatted(missingProcessorTypes));
789+
}
790+
if (!missingControllerServiceTypes.isEmpty()) {
791+
explanation.append("\nMissing Controller Services: %s".formatted(missingControllerServiceTypes));
792+
}
793+
794+
return new ValidationResult.Builder()
795+
.subject("Missing Bundles")
796+
.valid(false)
797+
.explanation(explanation.toString())
798+
.build();
799+
}
800+
801+
private void collectUnresolvedBundles(final VersionedProcessGroup group, final ComponentBundleLookup bundleLookup,
802+
final Set<String> missingBundles, final Set<String> missingProcessorTypes,
803+
final Set<String> missingControllerServiceTypes) {
804+
if (group.getProcessors() != null) {
805+
for (final VersionedProcessor processor : group.getProcessors()) {
806+
if (!isBundleResolvable(processor.getType(), processor.getBundle(), bundleLookup)) {
807+
missingBundles.add(formatBundle(processor.getBundle()));
808+
missingProcessorTypes.add(processor.getType());
809+
}
810+
}
811+
}
812+
813+
if (group.getControllerServices() != null) {
814+
for (final VersionedControllerService service : group.getControllerServices()) {
815+
if (!isBundleResolvable(service.getType(), service.getBundle(), bundleLookup)) {
816+
missingBundles.add(formatBundle(service.getBundle()));
817+
missingControllerServiceTypes.add(service.getType());
818+
}
819+
}
820+
}
821+
822+
if (group.getProcessGroups() != null) {
823+
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
824+
collectUnresolvedBundles(childGroup, bundleLookup, missingBundles, missingProcessorTypes, missingControllerServiceTypes);
825+
}
826+
}
827+
}
828+
829+
private String formatBundle(final Bundle bundle) {
830+
return "%s:%s:%s".formatted(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
831+
}
832+
833+
private boolean isBundleResolvable(final String componentType, final Bundle currentBundle, final ComponentBundleLookup bundleLookup) {
834+
final List<Bundle> availableBundles = bundleLookup.getAvailableBundles(componentType);
835+
836+
if (availableBundles.contains(currentBundle)) {
837+
return true;
838+
}
839+
840+
// With RESOLVE_BUNDLE, a bundle can be resolved only if exactly one alternative bundle is available
841+
return availableBundles.size() == 1;
842+
}
843+
741844
private void recreateWorkingFlowContext() {
742845
destroyWorkingContext();
743846
workingFlowContext = flowContextFactory.createWorkingFlowContext(identifier,
@@ -1233,22 +1336,27 @@ public ValidationState performValidation() {
12331336
logger.debug("Performing validation for {}", this);
12341337
try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, getConnector().getClass(), getIdentifier())) {
12351338

1236-
final ConnectorValidationContext validationContext = createValidationContext(activeFlowContext);
1237-
12381339
final List<ValidationResult> allResults = new ArrayList<>();
1239-
validateManagedFlowComponents(allResults);
1240-
validatePropertyReferences(allResults);
12411340

1242-
if (allResults.isEmpty()) {
1243-
try {
1244-
final List<ValidationResult> implValidationResults = getConnector().validate(activeFlowContext, validationContext);
1245-
allResults.addAll(implValidationResults);
1246-
} catch (final Exception e) {
1247-
allResults.add(new ValidationResult.Builder()
1248-
.subject("Validation Failure")
1249-
.valid(false)
1250-
.explanation("Encountered a failure while attempting to perform validation: " + e.getMessage())
1251-
.build());
1341+
if (unresolvedBundleValidationResult != null) {
1342+
allResults.add(unresolvedBundleValidationResult);
1343+
} else {
1344+
final ConnectorValidationContext validationContext = createValidationContext(activeFlowContext);
1345+
1346+
validateManagedFlowComponents(allResults);
1347+
validatePropertyReferences(allResults);
1348+
1349+
if (allResults.isEmpty()) {
1350+
try {
1351+
final List<ValidationResult> implValidationResults = getConnector().validate(activeFlowContext, validationContext);
1352+
allResults.addAll(implValidationResults);
1353+
} catch (final Exception e) {
1354+
allResults.add(new ValidationResult.Builder()
1355+
.subject("Validation Failure")
1356+
.valid(false)
1357+
.explanation("Encountered a failure while attempting to perform validation: " + e.getMessage())
1358+
.build());
1359+
}
12521360
}
12531361
}
12541362

0 commit comments

Comments
 (0)