Skip to content

Commit 9822e3e

Browse files
authored
NIFI-15488: Added significant number of debug log messages as well as some info/error for connector-related events (#10803)
1 parent 3e395f8 commit 9822e3e

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,11 @@ public void prepareForUpdate() throws FlowUpdateException {
164164
+ "; it must be UPDATING.");
165165
}
166166

167+
logger.debug("Preparing {} for update", this);
167168
try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, getConnector().getClass(), getIdentifier())) {
168169
getConnector().prepareForUpdate(workingFlowContext, activeFlowContext);
169170
stateTransition.setCurrentState(ConnectorState.UPDATING);
171+
logger.debug("Successfully prepared {} for update", this);
170172
} catch (final Throwable t) {
171173
logger.error("Failed to prepare update for {}", this, t);
172174

@@ -184,6 +186,7 @@ public void prepareForUpdate() throws FlowUpdateException {
184186
public void inheritConfiguration(final List<VersionedConfigurationStep> activeConfig, final List<VersionedConfigurationStep> workingConfig,
185187
final Bundle flowContextBundle) throws FlowUpdateException {
186188

189+
logger.debug("Inheriting configuration for {}", this);
187190
final MutableConnectorConfigurationContext configurationContext = createConfigurationContext(activeConfig);
188191
final FrameworkFlowContext inheritContext = flowContextFactory.createWorkingFlowContext(identifier,
189192
connectorDetails.getComponentLog(), configurationContext, flowContextBundle);
@@ -196,6 +199,8 @@ public void inheritConfiguration(final List<VersionedConfigurationStep> activeCo
196199
final StepConfiguration stepConfig = createStepConfiguration(step);
197200
setConfiguration(step.getName(), stepConfig, true);
198201
}
202+
203+
logger.debug("Successfully inherited configuration for {}", this);
199204
}
200205

201206
private StepConfiguration createStepConfiguration(final VersionedConfigurationStep step) {
@@ -280,6 +285,7 @@ private void applyUpdate(final FrameworkFlowContext contextToInherit) throws Flo
280285

281286
stateTransition.setCurrentState(ConnectorState.UPDATED);
282287
stateTransition.setDesiredState(ConnectorState.UPDATED);
288+
logger.info("Successfully applied update for {}", this);
283289
}
284290

285291
private void destroyWorkingContext() {
@@ -306,6 +312,7 @@ public void abortUpdate(final Throwable cause) {
306312
try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, getConnector().getClass(), getIdentifier())) {
307313
getConnector().abortUpdate(workingFlowContext, cause);
308314
}
315+
logger.debug("Aborted update for {}", this);
309316
}
310317

311318
@Override
@@ -325,6 +332,7 @@ private void setConfiguration(final String stepName, final StepConfiguration con
325332
try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, connector.getClass(), getIdentifier())) {
326333
logger.debug("Notifying {} of configuration change for configuration step {}", this, stepName);
327334
connector.onConfigurationStepConfigured(stepName, workingFlowContext);
335+
logger.debug("Successfully set configuration for step {} on {}", stepName, this);
328336
} catch (final FlowUpdateException e) {
329337
throw e;
330338
} catch (final Exception e) {
@@ -442,6 +450,7 @@ public Future<Void> stop(final FlowEngine scheduler) {
442450

443451
@Override
444452
public Future<Void> drainFlowFiles() {
453+
logger.debug("Draining FlowFiles for {}", this);
445454
requireStopped("drain FlowFiles", ConnectorState.DRAINING);
446455

447456
try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, connectorDetails.getConnector().getClass(), getIdentifier())) {
@@ -451,7 +460,12 @@ public Future<Void> drainFlowFiles() {
451460

452461
final CompletableFuture<Void> stateUpdateFuture = drainFuture.whenComplete((result, failureCause) -> {
453462
drainFutureRef.set(null);
454-
logger.info("Successfully drained FlowFiles from {}; ensuring all components are stopped.", this);
463+
464+
if (failureCause == null) {
465+
logger.info("Successfully drained FlowFiles for {}", this);
466+
} else {
467+
logger.error("Failed to drain FlowFiles for {}", this, failureCause);
468+
}
455469

456470
try {
457471
connectorDetails.getConnector().stop(activeFlowContext);
@@ -465,6 +479,7 @@ public Future<Void> drainFlowFiles() {
465479

466480
return stateUpdateFuture;
467481
} catch (final Throwable t) {
482+
logger.error("Failed to drain FlowFiles for {}", this, t);
468483
stateTransition.setCurrentState(ConnectorState.STOPPED);
469484
throw t;
470485
}
@@ -509,15 +524,25 @@ public void verifyCanPurgeFlowFiles() throws IllegalStateException {
509524

510525
@Override
511526
public Future<Void> purgeFlowFiles(final String requestor) {
527+
logger.debug("Purging FlowFiles for {}", this);
512528
requireStopped("purge FlowFiles", ConnectorState.PURGING);
513529

514530
try {
515531
final String dropRequestId = UUID.randomUUID().toString();
516532
final DropFlowFileStatus status = activeFlowContext.getManagedProcessGroup().dropAllFlowFiles(dropRequestId, requestor);
517533
final CompletableFuture<Void> future = status.getCompletionFuture();
518-
final CompletableFuture<Void> stateUpdateFuture = future.whenComplete((result, failureCause) -> stateTransition.setCurrentState(ConnectorState.STOPPED));
534+
final CompletableFuture<Void> stateUpdateFuture = future.whenComplete((result, failureCause) -> {
535+
stateTransition.setCurrentState(ConnectorState.STOPPED);
536+
537+
if (failureCause == null) {
538+
logger.info("Successfully purged FlowFiles for {}", this);
539+
} else {
540+
logger.error("Failed to purge FlowFiles for {}", this, failureCause);
541+
}
542+
});
519543
return stateUpdateFuture;
520544
} catch (final Throwable t) {
545+
logger.error("Failed to purge FlowFiles for {}", this, t);
521546
stateTransition.setCurrentState(ConnectorState.STOPPED);
522547
throw t;
523548
}
@@ -541,6 +566,7 @@ private void requireStopped(final String action, final ConnectorState newState)
541566
}
542567

543568
private void stopComponent(final FlowEngine scheduler, final CompletableFuture<Void> stopCompleteFuture) {
569+
logger.debug("Stopping component for {}", this);
544570
try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, connectorDetails.getConnector().getClass(), getIdentifier())) {
545571
connectorDetails.getConnector().stop(activeFlowContext);
546572
} catch (final Exception e) {
@@ -551,6 +577,7 @@ private void stopComponent(final FlowEngine scheduler, final CompletableFuture<V
551577

552578
stateTransition.setCurrentState(ConnectorState.STOPPED);
553579
stopCompleteFuture.complete(null);
580+
logger.info("Successfully stopped {}", this);
554581

555582
final ConnectorState desiredState = getDesiredState();
556583
if (desiredState == ConnectorState.RUNNING) {
@@ -560,6 +587,7 @@ private void stopComponent(final FlowEngine scheduler, final CompletableFuture<V
560587
}
561588

562589
private void startComponent(final ScheduledExecutorService scheduler, final CompletableFuture<Void> startCompleteFuture) {
590+
logger.debug("Starting component for {}", this);
563591
final ConnectorState desiredState = getDesiredState();
564592
if (desiredState != ConnectorState.RUNNING) {
565593
logger.info("Will not start {} because the desired state is no longer RUNNING but is now {}", this, desiredState);
@@ -576,6 +604,7 @@ private void startComponent(final ScheduledExecutorService scheduler, final Comp
576604

577605
stateTransition.setCurrentState(ConnectorState.RUNNING);
578606
startCompleteFuture.complete(null);
607+
logger.info("Successfully started {}", this);
579608
}
580609

581610

@@ -654,17 +683,20 @@ public List<DescribedValue> fetchAllowableValues(final String stepName, final St
654683

655684
@Override
656685
public void initializeConnector(final FrameworkConnectorInitializationContext initializationContext) {
686+
logger.debug("Initializing {}", this);
657687
this.initializationContext = initializationContext;
658688

659689
try (NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, getConnector().getClass(), getIdentifier())) {
660690
getConnector().initialize(initializationContext);
661691
}
662692

663693
recreateWorkingFlowContext();
694+
logger.info("Successfully initialized {}", this);
664695
}
665696

666697
@Override
667698
public void loadInitialFlow() throws FlowUpdateException {
699+
logger.debug("Loading initial flow for {}", this);
668700
if (initializationContext == null) {
669701
throw new IllegalStateException("Cannot load initial flow because " + this + " has not been initialized yet.");
670702
}
@@ -733,6 +765,7 @@ public boolean isValidationPaused() {
733765

734766
@Override
735767
public List<ConfigVerificationResult> verifyConfigurationStep(final String stepName, final StepConfiguration configurationOverrides) {
768+
logger.debug("Verifying configuration step {} for {}", stepName, this);
736769
final List<SecretReference> invalidSecretRefs = new ArrayList<>();
737770
final List<AssetReference> invalidAssetRefs = new ArrayList<>();
738771
final Map<String, String> resolvedPropertyOverrides = resolvePropertyReferences(configurationOverrides, invalidSecretRefs, invalidAssetRefs);
@@ -790,6 +823,7 @@ public List<ConfigVerificationResult> verifyConfigurationStep(final String stepN
790823
results.addAll(invalidConfigResults);
791824
}
792825

826+
logger.debug("Completed verification of configuration step {} for {}", stepName, this);
793827
return results;
794828
}
795829
}
@@ -895,6 +929,7 @@ private Optional<ConfigurationStep> getConfigurationStep(final String stepName)
895929

896930
@Override
897931
public List<ConfigVerificationResult> verify() {
932+
logger.debug("Verifying {}", this);
898933
final List<ConfigVerificationResult> results = new ArrayList<>();
899934

900935
final ValidationState state = performValidation();
@@ -909,13 +944,15 @@ public List<ConfigVerificationResult> verify() {
909944
.explanation("There are " + validationFailureExplanations.size() + " validation failures: " + validationFailureExplanations)
910945
.build());
911946

947+
logger.debug("Completed verification for {} with validation failures", this);
912948
return results;
913949
}
914950

915951
try (NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, getConnector().getClass(), getIdentifier())) {
916952
results.addAll(getConnector().verify(workingFlowContext));
917953
}
918954

955+
logger.debug("Completed verification for {}", this);
919956
return results;
920957
}
921958

@@ -955,6 +992,7 @@ public FrameworkFlowContext getWorkingFlowContext() {
955992
@Override
956993
public void discardWorkingConfiguration() {
957994
recreateWorkingFlowContext();
995+
logger.debug("Discarded working configuration for {}", this);
958996
}
959997

960998
@Override
@@ -1192,6 +1230,7 @@ public Collection<ValidationResult> getValidationErrors() {
11921230

11931231
@Override
11941232
public ValidationState performValidation() {
1233+
logger.debug("Performing validation for {}", this);
11951234
try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, getConnector().getClass(), getIdentifier())) {
11961235

11971236
final ConnectorValidationContext validationContext = createValidationContext(activeFlowContext);

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,12 @@ public class StandardConnectorRepository implements ConnectorRepository {
5353

5454
@Override
5555
public void initialize(final ConnectorRepositoryInitializationContext context) {
56+
logger.debug("Initializing ConnectorRepository");
5657
this.extensionManager = context.getExtensionManager();
5758
this.requestReplicator = context.getRequestReplicator();
5859
this.secretsManager = context.getSecretsManager();
5960
this.assetRepository = new StandardConnectorAssetRepository(context.getAssetManager());
61+
logger.debug("Successfully initialized ConnectorRepository");
6062
}
6163

6264
@Override
@@ -67,10 +69,12 @@ public void addConnector(final ConnectorNode connector) {
6769
@Override
6870
public void restoreConnector(final ConnectorNode connector) {
6971
addConnector(connector);
72+
logger.debug("Successfully restored {}", connector);
7073
}
7174

7275
@Override
7376
public void removeConnector(final String connectorId) {
77+
logger.debug("Removing {}", connectorId);
7478
final ConnectorNode connectorNode = connectors.get(connectorId);
7579
if (connectorNode == null) {
7680
throw new IllegalStateException("No connector found with ID " + connectorId);
@@ -136,6 +140,7 @@ private void restartConnector(final ConnectorNode connector, final CompletableFu
136140

137141
@Override
138142
public void applyUpdate(final ConnectorNode connector, final ConnectorUpdateContext context) throws FlowUpdateException {
143+
logger.debug("Applying update to {}", connector);
139144
final ConnectorState initialDesiredState = connector.getDesiredState();
140145
logger.info("Applying update to Connector {}", connector);
141146

@@ -155,6 +160,7 @@ public void applyUpdate(final ConnectorNode connector, final ConnectorUpdateCont
155160
}
156161

157162
private void updateConnector(final ConnectorNode connector, final ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
163+
logger.debug("Updating {}", connector);
158164
try {
159165
// Perform whatever preparation is necessary for the update. Default implementation is to stop the connector.
160166
logger.debug("Preparing {} for update", connector);
@@ -179,17 +185,17 @@ private void updateConnector(final ConnectorNode connector, final ConnectorState
179185
// If the initial desired state was RUNNING, start the connector again. Otherwise, stop it.
180186
// We don't simply leave it be as the prepareForUpdate / update may have changed the state of some components.
181187
if (initialDesiredState == ConnectorState.RUNNING) {
182-
logger.info("Connector {} has been successfully updated; starting Connector to resume initial state", connector);
188+
logger.info("{} has been successfully updated; starting to resume initial state", connector);
183189
connector.start(lifecycleExecutor);
184190
} else {
185-
logger.info("Connector {} has been successfully updated; stopping Connector to resume initial state", connector);
191+
logger.info("{} has been successfully updated; stopping to resume initial state", connector);
186192
connector.stop(lifecycleExecutor);
187193
}
188194

189195
// We've updated the state of the connector so save flow again
190196
context.saveFlow();
191197
} catch (final Exception e) {
192-
logger.error("Failed to apply connector update for {}", connector, e);
198+
logger.error("Failed to apply update for {}", connector, e);
193199
connector.abortUpdate(e);
194200
}
195201
}
@@ -268,18 +274,22 @@ private void collectReferencedAssetIds(final FrameworkFlowContext flowContext, f
268274
@Override
269275
public void configureConnector(final ConnectorNode connector, final String stepName, final StepConfiguration configuration) throws FlowUpdateException {
270276
connector.setConfiguration(stepName, configuration);
277+
logger.info("Successfully configured {} for step {}", connector, stepName);
271278
}
272279

273280
@Override
274281
public void inheritConfiguration(final ConnectorNode connector, final List<VersionedConfigurationStep> activeFlowConfiguration,
275282
final List<VersionedConfigurationStep> workingFlowConfiguration, final Bundle flowContextBundle) throws FlowUpdateException {
276283

284+
logger.debug("Inheriting configuration for {}", connector);
277285
connector.transitionStateForUpdating();
278286
connector.prepareForUpdate();
279287

280288
try {
281289
connector.inheritConfiguration(activeFlowConfiguration, workingFlowConfiguration, flowContextBundle);
290+
logger.debug("Successfully inherited configuration for {}", connector);
282291
} catch (final Exception e) {
292+
logger.error("Failed to inherit configuration for {}", connector, e);
283293
connector.abortUpdate(e);
284294
throw e;
285295
}

0 commit comments

Comments
 (0)