Skip to content

Commit 389a3ea

Browse files
committed
Refactoring according to comments
1 parent 2bb94bc commit 389a3ea

File tree

3 files changed

+130
-231
lines changed

3 files changed

+130
-231
lines changed

pkg/splunk/enterprise/indexercluster.go

Lines changed: 14 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -245,78 +245,20 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
245245

246246
// no need to requeue if everything is ready
247247
if cr.Status.Phase == enterpriseApi.PhaseReady {
248-
// Queue
249-
queue := enterpriseApi.Queue{}
250-
if cr.Spec.QueueRef.Name != "" {
251-
ns := cr.GetNamespace()
252-
if cr.Spec.QueueRef.Namespace != "" {
253-
ns = cr.Spec.QueueRef.Namespace
254-
}
255-
err = client.Get(ctx, types.NamespacedName{
256-
Name: cr.Spec.QueueRef.Name,
257-
Namespace: ns,
258-
}, &queue)
259-
if err != nil {
260-
return result, err
261-
}
262-
}
263-
if queue.Spec.Provider == "sqs" {
264-
if queue.Spec.SQS.Endpoint == "" && queue.Spec.SQS.AuthRegion != "" {
265-
ep, err := resolveSQSEndpoint(ctx, queue.Spec.SQS.AuthRegion)
266-
if err != nil {
267-
return result, err
268-
}
269-
queue.Spec.SQS.Endpoint = ep
270-
}
271-
}
272-
273-
// Object Storage
274-
os := enterpriseApi.ObjectStorage{}
275-
if cr.Spec.ObjectStorageRef.Name != "" {
276-
ns := cr.GetNamespace()
277-
if cr.Spec.ObjectStorageRef.Namespace != "" {
278-
ns = cr.Spec.ObjectStorageRef.Namespace
279-
}
280-
err = client.Get(ctx, types.NamespacedName{
281-
Name: cr.Spec.ObjectStorageRef.Name,
282-
Namespace: ns,
283-
}, &os)
284-
if err != nil {
285-
return result, err
286-
}
287-
}
288-
if os.Spec.Provider == "s3" {
289-
if os.Spec.S3.Endpoint == "" && queue.Spec.SQS.AuthRegion != "" {
290-
ep, err := resolveS3Endpoint(ctx, queue.Spec.SQS.AuthRegion)
291-
if err != nil {
292-
return result, err
293-
}
294-
os.Spec.S3.Endpoint = ep
295-
}
296-
}
297-
298-
// Secret reference
299-
accessKey, secretKey, version := "", "", ""
300-
if queue.Spec.Provider == "sqs" && cr.Spec.ServiceAccount == "" {
301-
for _, vol := range queue.Spec.SQS.VolList {
302-
if vol.SecretRef != "" {
303-
accessKey, secretKey, version, err = GetQueueRemoteVolumeSecrets(ctx, vol, client, cr)
304-
if err != nil {
305-
scopedLog.Error(err, "Failed to get queue remote volume secrets")
306-
return result, err
307-
}
308-
}
309-
}
248+
qosCfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, cr.Spec.QueueRef, cr.Spec.ObjectStorageRef, cr.Spec.ServiceAccount)
249+
if err != nil {
250+
scopedLog.Error(err, "Failed to resolve Queue/ObjectStorage config")
251+
return result, err
310252
}
311253

312-
secretChanged := cr.Status.CredentialSecretVersion != version
254+
secretChanged := cr.Status.CredentialSecretVersion != qosCfg.Version
313255
serviceAccountChanged := cr.Status.ServiceAccount != cr.Spec.ServiceAccount
314256

315257
// If queue is updated
316258
if cr.Spec.QueueRef.Name != "" {
317259
if secretChanged || serviceAccountChanged {
318260
mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client)
319-
err = mgr.updateIndexerConfFiles(ctx, cr, &queue.Spec, &os.Spec, accessKey, secretKey, client)
261+
err = mgr.updateIndexerConfFiles(ctx, cr, &qosCfg.Queue, &qosCfg.OS, qosCfg.AccessKey, qosCfg.SecretKey, client)
320262
if err != nil {
321263
eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error()))
322264
scopedLog.Error(err, "Failed to update conf file for Queue/Pipeline config change after pod creation")
@@ -332,7 +274,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
332274
scopedLog.Info("Restarted splunk", "indexer", i)
333275
}
334276

335-
cr.Status.CredentialSecretVersion = version
277+
cr.Status.CredentialSecretVersion = qosCfg.Version
336278
cr.Status.ServiceAccount = cr.Spec.ServiceAccount
337279
}
338280
}
@@ -598,77 +540,19 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient,
598540

599541
// no need to requeue if everything is ready
600542
if cr.Status.Phase == enterpriseApi.PhaseReady {
601-
// Queue
602-
queue := enterpriseApi.Queue{}
603-
if cr.Spec.QueueRef.Name != "" {
604-
ns := cr.GetNamespace()
605-
if cr.Spec.QueueRef.Namespace != "" {
606-
ns = cr.Spec.QueueRef.Namespace
607-
}
608-
err = client.Get(context.Background(), types.NamespacedName{
609-
Name: cr.Spec.QueueRef.Name,
610-
Namespace: ns,
611-
}, &queue)
612-
if err != nil {
613-
return result, err
614-
}
615-
}
616-
if queue.Spec.Provider == "sqs" {
617-
if queue.Spec.SQS.Endpoint == "" && queue.Spec.SQS.AuthRegion != "" {
618-
ep, err := resolveSQSEndpoint(ctx, queue.Spec.SQS.AuthRegion)
619-
if err != nil {
620-
return result, err
621-
}
622-
queue.Spec.SQS.Endpoint = ep
623-
}
624-
}
625-
626-
// Object Storage
627-
os := enterpriseApi.ObjectStorage{}
628-
if cr.Spec.ObjectStorageRef.Name != "" {
629-
ns := cr.GetNamespace()
630-
if cr.Spec.ObjectStorageRef.Namespace != "" {
631-
ns = cr.Spec.ObjectStorageRef.Namespace
632-
}
633-
err = client.Get(context.Background(), types.NamespacedName{
634-
Name: cr.Spec.ObjectStorageRef.Name,
635-
Namespace: ns,
636-
}, &os)
637-
if err != nil {
638-
return result, err
639-
}
640-
}
641-
if os.Spec.Provider == "s3" {
642-
if os.Spec.S3.Endpoint == "" && queue.Spec.SQS.AuthRegion != "" {
643-
ep, err := resolveS3Endpoint(ctx, queue.Spec.SQS.AuthRegion)
644-
if err != nil {
645-
return result, err
646-
}
647-
os.Spec.S3.Endpoint = ep
648-
}
649-
}
650-
651-
// Secret reference
652-
accessKey, secretKey, version := "", "", ""
653-
if queue.Spec.Provider == "sqs" && cr.Spec.ServiceAccount == "" {
654-
for _, vol := range queue.Spec.SQS.VolList {
655-
if vol.SecretRef != "" {
656-
accessKey, secretKey, version, err = GetQueueRemoteVolumeSecrets(ctx, vol, client, cr)
657-
if err != nil {
658-
scopedLog.Error(err, "Failed to get queue remote volume secrets")
659-
return result, err
660-
}
661-
}
662-
}
543+
qosCfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, cr.Spec.QueueRef, cr.Spec.ObjectStorageRef, cr.Spec.ServiceAccount)
544+
if err != nil {
545+
scopedLog.Error(err, "Failed to resolve Queue/ObjectStorage config")
546+
return result, err
663547
}
664548

665-
secretChanged := cr.Status.CredentialSecretVersion != version
549+
secretChanged := cr.Status.CredentialSecretVersion != qosCfg.Version
666550
serviceAccountChanged := cr.Status.ServiceAccount != cr.Spec.ServiceAccount
667551

668552
if cr.Spec.QueueRef.Name != "" {
669553
if secretChanged || serviceAccountChanged {
670554
mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client)
671-
err = mgr.updateIndexerConfFiles(ctx, cr, &queue.Spec, &os.Spec, accessKey, secretKey, client)
555+
err = mgr.updateIndexerConfFiles(ctx, cr, &qosCfg.Queue, &qosCfg.OS, qosCfg.AccessKey, qosCfg.SecretKey, client)
672556
if err != nil {
673557
eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error()))
674558
scopedLog.Error(err, "Failed to update conf file for Queue/Pipeline config change after pod creation")
@@ -684,7 +568,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient,
684568
scopedLog.Info("Restarted splunk", "indexer", i)
685569
}
686570

687-
cr.Status.CredentialSecretVersion = version
571+
cr.Status.CredentialSecretVersion = qosCfg.Version
688572
cr.Status.ServiceAccount = cr.Spec.ServiceAccount
689573
}
690574
}

pkg/splunk/enterprise/ingestorcluster.go

Lines changed: 7 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ import (
2121
"strings"
2222
"time"
2323

24-
"github.com/aws/aws-sdk-go-v2/config"
25-
"github.com/aws/aws-sdk-go-v2/service/s3"
26-
"github.com/aws/aws-sdk-go-v2/service/sqs"
2724
"github.com/go-logr/logr"
2825
enterpriseApi "github.com/splunk/splunk-operator/api/v4"
2926
splclient "github.com/splunk/splunk-operator/pkg/splunk/client"
@@ -212,77 +209,19 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
212209

213210
// No need to requeue if everything is ready
214211
if cr.Status.Phase == enterpriseApi.PhaseReady {
215-
// Queue
216-
queue := enterpriseApi.Queue{}
217-
if cr.Spec.QueueRef.Name != "" {
218-
ns := cr.GetNamespace()
219-
if cr.Spec.QueueRef.Namespace != "" {
220-
ns = cr.Spec.QueueRef.Namespace
221-
}
222-
err = client.Get(ctx, types.NamespacedName{
223-
Name: cr.Spec.QueueRef.Name,
224-
Namespace: ns,
225-
}, &queue)
226-
if err != nil {
227-
return result, err
228-
}
229-
}
230-
if queue.Spec.Provider == "sqs" {
231-
if queue.Spec.SQS.Endpoint == "" && queue.Spec.SQS.AuthRegion != "" {
232-
ep, err := resolveSQSEndpoint(ctx, queue.Spec.SQS.AuthRegion)
233-
if err != nil {
234-
return result, err
235-
}
236-
queue.Spec.SQS.Endpoint = ep
237-
}
238-
}
239-
240-
// Object Storage
241-
os := enterpriseApi.ObjectStorage{}
242-
if cr.Spec.ObjectStorageRef.Name != "" {
243-
ns := cr.GetNamespace()
244-
if cr.Spec.ObjectStorageRef.Namespace != "" {
245-
ns = cr.Spec.ObjectStorageRef.Namespace
246-
}
247-
err = client.Get(ctx, types.NamespacedName{
248-
Name: cr.Spec.ObjectStorageRef.Name,
249-
Namespace: ns,
250-
}, &os)
251-
if err != nil {
252-
return result, err
253-
}
254-
}
255-
if os.Spec.Provider == "s3" {
256-
if os.Spec.S3.Endpoint == "" && queue.Spec.SQS.AuthRegion != "" {
257-
ep, err := resolveS3Endpoint(ctx, queue.Spec.SQS.AuthRegion)
258-
if err != nil {
259-
return result, err
260-
}
261-
os.Spec.S3.Endpoint = ep
262-
}
263-
}
264-
265-
// Secret reference
266-
accessKey, secretKey, version := "", "", ""
267-
if queue.Spec.Provider == "sqs" && cr.Spec.ServiceAccount == "" {
268-
for _, vol := range queue.Spec.SQS.VolList {
269-
if vol.SecretRef != "" {
270-
accessKey, secretKey, version, err = GetQueueRemoteVolumeSecrets(ctx, vol, client, cr)
271-
if err != nil {
272-
scopedLog.Error(err, "Failed to get queue remote volume secrets")
273-
return result, err
274-
}
275-
}
276-
}
212+
qosCfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, cr.Spec.QueueRef, cr.Spec.ObjectStorageRef, cr.Spec.ServiceAccount)
213+
if err != nil {
214+
scopedLog.Error(err, "Failed to resolve Queue/ObjectStorage config")
215+
return result, err
277216
}
278217

279-
secretChanged := cr.Status.CredentialSecretVersion != version
218+
secretChanged := cr.Status.CredentialSecretVersion != qosCfg.Version
280219
serviceAccountChanged := cr.Status.ServiceAccount != cr.Spec.ServiceAccount
281220

282221
// If queue is updated
283222
if secretChanged || serviceAccountChanged {
284223
mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client)
285-
err = mgr.updateIngestorConfFiles(ctx, cr, &queue.Spec, &os.Spec, accessKey, secretKey, client)
224+
err = mgr.updateIngestorConfFiles(ctx, cr, &qosCfg.Queue, &qosCfg.OS, qosCfg.AccessKey, qosCfg.SecretKey, client)
286225
if err != nil {
287226
eventPublisher.Warning(ctx, "ApplyIngestorCluster", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error()))
288227
scopedLog.Error(err, "Failed to update conf file for Queue/Pipeline config change after pod creation")
@@ -298,7 +237,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
298237
scopedLog.Info("Restarted splunk", "ingestor", i)
299238
}
300239

301-
cr.Status.CredentialSecretVersion = version
240+
cr.Status.CredentialSecretVersion = qosCfg.Version
302241
cr.Status.ServiceAccount = cr.Spec.ServiceAccount
303242
}
304243

@@ -521,36 +460,3 @@ func getQueueAndObjectStorageInputsForIngestorConfFiles(queue *enterpriseApi.Que
521460

522461
return
523462
}
524-
525-
func resolveS3Endpoint(ctx context.Context, region string) (string, error) {
526-
cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
527-
if err != nil {
528-
return "", err
529-
}
530-
531-
client := s3.NewFromConfig(cfg)
532-
params := s3.EndpointParameters{Region: &region}
533-
534-
ep, err := client.Options().EndpointResolverV2.ResolveEndpoint(ctx, params)
535-
if err != nil {
536-
return "", err
537-
}
538-
// Full endpoint URL as string:
539-
return ep.URI.String(), nil
540-
}
541-
542-
func resolveSQSEndpoint(ctx context.Context, region string) (string, error) {
543-
cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
544-
if err != nil {
545-
return "", err
546-
}
547-
548-
client := sqs.NewFromConfig(cfg)
549-
params := sqs.EndpointParameters{Region: &region}
550-
551-
ep, err := client.Options().EndpointResolverV2.ResolveEndpoint(ctx, params)
552-
if err != nil {
553-
return "", err
554-
}
555-
return ep.URI.String(), nil
556-
}

0 commit comments

Comments
 (0)