8484import org .apache .nifi .shared .azure .eventhubs .AzureEventHubComponent ;
8585import org .apache .nifi .shared .azure .eventhubs .AzureEventHubTransportType ;
8686import org .apache .nifi .shared .azure .eventhubs .BlobStorageAuthenticationStrategy ;
87+ import org .apache .nifi .services .azure .AzureIdentityFederationTokenProvider ;
8788import org .apache .nifi .util .StopWatch ;
8889import org .apache .nifi .util .StringUtils ;
8990
@@ -150,6 +151,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
150151 static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils .SERVICE_BUS_ENDPOINT ;
151152 static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent .AUTHENTICATION_STRATEGY ;
152153 static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent .OAUTH2_ACCESS_TOKEN_PROVIDER ;
154+ static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent .IDENTITY_FEDERATION_TOKEN_PROVIDER ;
153155 static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor .Builder ()
154156 .name ("Shared Access Policy Name" )
155157 .description ("The name of the shared access policy. This policy must have Listen claims." )
@@ -267,6 +269,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
267269 .required (true )
268270 .dependsOn (BLOB_STORAGE_AUTHENTICATION_STRATEGY , BlobStorageAuthenticationStrategy .OAUTH2 )
269271 .build ();
272+ static final PropertyDescriptor BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER = new PropertyDescriptor .Builder ()
273+ .name ("Storage Identity Federation Token Provider" )
274+ .description ("Controller Service exchanging workload identity tokens for Azure AD access tokens when using Identity Federation with Azure Blob Storage." )
275+ .identifiesControllerService (AzureIdentityFederationTokenProvider .class )
276+ .required (true )
277+ .dependsOn (BLOB_STORAGE_AUTHENTICATION_STRATEGY , BlobStorageAuthenticationStrategy .IDENTITY_FEDERATION )
278+ .build ();
270279 static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor .Builder ()
271280 .name ("Storage Account Key" )
272281 .description ("The Azure Storage account key to store event hub consumer group state." )
@@ -326,6 +335,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
326335 POLICY_PRIMARY_KEY ,
327336 AUTHENTICATION_STRATEGY ,
328337 EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER ,
338+ EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER ,
329339 CONSUMER_GROUP ,
330340 RECORD_READER ,
331341 RECORD_WRITER ,
@@ -340,6 +350,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
340350 STORAGE_ACCOUNT_KEY ,
341351 STORAGE_SAS_TOKEN ,
342352 BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER ,
353+ BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER ,
343354 PROXY_CONFIGURATION_SERVICE
344355 );
345356
@@ -435,6 +446,7 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
435446 final String storageSasToken = validationContext .getProperty (STORAGE_SAS_TOKEN ).evaluateAttributeExpressions ().getValue ();
436447 final CheckpointStrategy checkpointStrategy = CheckpointStrategy .valueOf (validationContext .getProperty (CHECKPOINT_STRATEGY ).getValue ());
437448 final boolean blobOauthProviderSet = validationContext .getProperty (BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER ).isSet ();
449+ final boolean blobIdentityFederationProviderSet = validationContext .getProperty (BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER ).isSet ();
438450
439451 if ((recordReader != null && recordWriter == null ) || (recordReader == null && recordWriter != null )) {
440452 results .add (new ValidationResult .Builder ()
@@ -527,9 +539,42 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
527539 .valid (false )
528540 .build ());
529541 }
542+ if (blobIdentityFederationProviderSet ) {
543+ results .add (new ValidationResult .Builder ()
544+ .subject (BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER .getDisplayName ())
545+ .explanation ("%s must not be set when %s is %s."
546+ .formatted (BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER .getDisplayName (),
547+ BLOB_STORAGE_AUTHENTICATION_STRATEGY .getDisplayName (),
548+ BlobStorageAuthenticationStrategy .OAUTH2 .getDisplayName ()))
549+ .valid (false )
550+ .build ());
551+ }
552+ } else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy .IDENTITY_FEDERATION ) {
553+ if (StringUtils .isNotBlank (storageAccountKey )) {
554+ results .add (new ValidationResult .Builder ()
555+ .subject (STORAGE_ACCOUNT_KEY .getDisplayName ())
556+ .explanation ("%s must not be set when %s is %s."
557+ .formatted (STORAGE_ACCOUNT_KEY .getDisplayName (),
558+ BLOB_STORAGE_AUTHENTICATION_STRATEGY .getDisplayName (),
559+ BlobStorageAuthenticationStrategy .IDENTITY_FEDERATION .getDisplayName ()))
560+ .valid (false )
561+ .build ());
562+ }
563+
564+ if (StringUtils .isNotBlank (storageSasToken )) {
565+ results .add (new ValidationResult .Builder ()
566+ .subject (STORAGE_SAS_TOKEN .getDisplayName ())
567+ .explanation ("%s must not be set when %s is %s."
568+ .formatted (STORAGE_SAS_TOKEN .getDisplayName (),
569+ BLOB_STORAGE_AUTHENTICATION_STRATEGY .getDisplayName (),
570+ BlobStorageAuthenticationStrategy .IDENTITY_FEDERATION .getDisplayName ()))
571+ .valid (false )
572+ .build ());
573+ }
530574 }
531575 }
532- results .addAll (AzureEventHubUtils .customValidate (ACCESS_POLICY_NAME , POLICY_PRIMARY_KEY , EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER , validationContext ));
576+ results .addAll (AzureEventHubUtils .customValidate (ACCESS_POLICY_NAME , POLICY_PRIMARY_KEY ,
577+ EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER , EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER , validationContext ));
533578 return results ;
534579 }
535580
@@ -627,6 +672,14 @@ protected EventProcessorClient createClient(final ProcessContext context) {
627672 blobContainerClientBuilder .endpoint (endpoint );
628673 blobContainerClientBuilder .credential (tokenCredential );
629674 }
675+ case IDENTITY_FEDERATION -> {
676+ final AzureIdentityFederationTokenProvider tokenProvider =
677+ context .getProperty (BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER ).asControllerService (AzureIdentityFederationTokenProvider .class );
678+ final TokenCredential tokenCredential = AzureEventHubUtils .createTokenCredential (tokenProvider );
679+ final String endpoint = createBlobEndpoint (storageAccountName , domainName );
680+ blobContainerClientBuilder .endpoint (endpoint );
681+ blobContainerClientBuilder .credential (tokenCredential );
682+ }
630683 }
631684 blobContainerClientBuilder .containerName (containerName );
632685
@@ -682,6 +735,13 @@ protected EventProcessorClient createClient(final ProcessContext context) {
682735 final TokenCredential tokenCredential = AzureEventHubUtils .createTokenCredential (tokenProvider );
683736 eventProcessorClientBuilder .credential (fullyQualifiedNamespace , eventHubName , tokenCredential );
684737 }
738+ case IDENTITY_FEDERATION -> {
739+ final AzureIdentityFederationTokenProvider tokenProvider =
740+ context .getProperty (EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER )
741+ .asControllerService (AzureIdentityFederationTokenProvider .class );
742+ final TokenCredential tokenCredential = AzureEventHubUtils .createTokenCredential (tokenProvider );
743+ eventProcessorClientBuilder .credential (fullyQualifiedNamespace , eventHubName , tokenCredential );
744+ }
685745 }
686746
687747 final Integer prefetchCount = context .getProperty (PREFETCH_COUNT ).evaluateAttributeExpressions ().asInteger ();
@@ -921,7 +981,7 @@ private String createStorageConnectionString(final ProcessContext context,
921981 String .format (FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY , storageAccountName , storageAccountKey , domainName );
922982 case SHARED_ACCESS_SIGNATURE ->
923983 String .format (FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN , storageAccountName , domainName , storageSasToken );
924- case OAUTH2 -> throw new IllegalArgumentException (String .format (
984+ case OAUTH2 , IDENTITY_FEDERATION -> throw new IllegalArgumentException (String .format (
925985 "Blob Storage Authentication Strategy %s does not support connection string authentication" , blobStorageAuthenticationStrategy ));
926986 };
927987 }
0 commit comments