Skip to content

Conversation

@kasiakoziol
Copy link
Collaborator

@kasiakoziol kasiakoziol commented Jul 28, 2025

Depends on

Description

This PR implements a separation between ingestion and indexing services within the Splunk Operator for Kubernetes. The goal is to enable the operator to independently manage the ingestion service while maintaining seamless integration with the indexing service.

Key Changes

  • Introduction of new Custom Resource Definition for IngestorCluster, Queue and ObjectStorage
  • Support for Remote Queue and Pipeline inputs in IngestorCluster and IndexerCluster via Queue and ObjectStorage reference

Testing and Verification

  • Manual tests
  • Automated tests

Related Issues

Jira Epic: https://splunk.atlassian.net/browse/CSPL-3549

PR Checklist

  • Code changes adhere to the project's coding standards.
  • Relevant unit and integration tests are included.
  • Documentation has been updated accordingly.
  • All tests pass locally.
  • The PR description follows the project's guidelines.

@kasiakoziol kasiakoziol force-pushed the CSPL-3551-ingestion-cr branch 6 times, most recently from ca230a9 to eb21049 Compare July 28, 2025 15:15
@kasiakoziol kasiakoziol force-pushed the CSPL-3551-ingestion-cr branch from eb21049 to 2cca0d7 Compare July 29, 2025 08:24
@coveralls
Copy link
Collaborator

coveralls commented Jul 29, 2025

Pull Request Test Coverage Report for Build 21988973023

Details

  • 540 of 978 (55.21%) changed or added relevant lines in 15 files are covered.
  • 5 unchanged lines in 2 files lost coverage.
  • Overall coverage decreased (-2.1%) to 84.173%

Changes Missing Coverage Covered Lines Changed/Added Lines %
pkg/splunk/enterprise/types.go 6 10 60.0%
pkg/splunk/enterprise/upgrade.go 0 4 0.0%
pkg/splunk/client/enterprise.go 33 39 84.62%
pkg/splunk/enterprise/objectstorage.go 31 40 77.5%
pkg/splunk/enterprise/queue.go 31 40 77.5%
pkg/splunk/enterprise/indexercluster.go 106 152 69.74%
pkg/splunk/enterprise/util.go 51 128 39.84%
internal/controller/indexercluster_controller.go 2 84 2.38%
internal/controller/ingestorcluster_controller.go 58 146 39.73%
pkg/splunk/enterprise/ingestorcluster.go 207 320 64.69%
Files with Coverage Reduction New Missed Lines %
pkg/splunk/enterprise/cp.go 1 33.33%
pkg/splunk/enterprise/indexercluster.go 4 72.9%
Totals Coverage Status
Change from base Build 21988082802: -2.1%
Covered Lines: 11466
Relevant Lines: 13622

💛 - Coveralls

@kasiakoziol kasiakoziol force-pushed the CSPL-3551-ingestion-cr branch from d3779b0 to 44349fa Compare July 29, 2025 14:54
@kasiakoziol kasiakoziol force-pushed the CSPL-3551-ingestion-cr branch 13 times, most recently from 3b2cf1c to 5cad7f9 Compare August 5, 2025 08:49
@Igor-splunk
Copy link
Collaborator

I have read the CLA Document and I hereby sign the CLA

@kasiakoziol
Copy link
Collaborator Author

recheck

@Igor-splunk
Copy link
Collaborator

I have read the Code of Conduct and I hereby accept the Terms

@Igor-splunk
Copy link
Collaborator

recheck

@kasiakoziol kasiakoziol changed the title [DO NOT MERGE BEFORE #1606 and #1610] CSPL-3549 Splunk Operator Enhancement – Ingestion and Indexing Separation CSPL-3549 Splunk Operator Enhancement – Ingestion and Indexing Separation Feb 10, 2026

// If queue is updated
if cr.Spec.QueueRef.Name != "" {
if secretChanged || serviceAccountChanged {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we need to worry about changes to the serviceAccount name or role binding itself. IRSA is external to Splunk and operates at the pod level to provide temporary AWS credentials. As long as the pod is correctly annotated and associated with the IAM role, IRSA will continue to function independently of Splunk’s internal logic.

Separately, and unrelated to Ingestion and Indexing separation, we should consider what happens if the AWS secret changes in the SmartStore configuration. That scenario also requires attention. If object storage credentials are updated, we need a well-defined mechanism to ensure the new credentials are picked up safely and consistently.

Given these cases, the restart logic needs to be revisited. Instead of implementing custom coordination logic inside the controller, we should rely on Kubernetes-native mechanisms. For example:

  • Let Kubernetes make eviction decisions based on Pod Disruption Budgets.
  • Use standard StatefulSet rolling restart behavior where appropriate.
  • Ensure that any SmartStore configuration changes are applied via Ansible and/or init containers during pod restart.

The goal should be to keep the operator lightweight and defer restart orchestration to Kubernetes wherever possible, rather than embedding complex restart coordination inside the controller.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SmartStore is not implemented to Index and Ingestion separation, so that's a separate case as mentioned by you. We might want to get back to this topic in the nearest future.

Yes, we need to revisit the restart implementation. I created a task as asked by you and we might want to look into this soon.

@kasiakoziol kasiakoziol force-pushed the CSPL-3551-ingestion-cr branch from 6f08c09 to 7d7a55e Compare February 11, 2026 15:59
DEPLOYMENT_TYPE: ""
ARM64: "true"
GRAVITON_TESTING: "true"
AWS_INDEX_INGEST_SEP_ACCESS_KEY_ID: ${{ secrets.AWS_INDEX_INGEST_SEP_ACCESS_KEY_ID }}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access key is not a secret

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed what we do for other things. I am not sure if we want to disclose it. Should I change it now or leave it till we revisit our pipelines?

Message string `json:"message"`

// Credential secret version to track changes to the secret and trigger rolling restart of indexer cluster peers when the secret is updated
CredentialSecretVersion string `json:"credentialSecretVersion,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we provide secret?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a volume

volumes:
        - name: s3-sqs-volume
          secretRef: s3-secret-change

This is only to cover a use case of Splunk not accepting tokens for EKS 1.34.

// See https://help.splunk.com/en/splunk-enterprise/leverage-rest-apis/rest-api-reference/10.0/configuration-endpoints/configuration-endpoint-descriptions
func (c *SplunkClient) UpdateConfFile(scopedLog logr.Logger, fileName, property string, propertyKVList [][]string) error {
// Creates an object in a conf file if it doesn't exist
endpoint := fmt.Sprintf("%s/servicesNS/nobody/system/configs/conf-%s", c.ManagementURI, fileName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where this endpoint comes from? why there is nobody in it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am reverting since in docs for 10.2 servicesNS/nobody is used

@Igor-splunk
Copy link
Collaborator

Igor-splunk commented Feb 13, 2026

We can extract resolving Queue and ObjectStorage to a separate function. ApplyIngestorCluster, ApplyIndexerClusterManager, and ApplyIndexerCluster.

pkg/splunk/enterprise/util.go —

// QueueOSConfig holds resolved Queue and ObjectStorage specs with credentials
type QueueOSConfig struct {
    Queue     enterpriseApi.QueueSpec
    OS        enterpriseApi.ObjectStorageSpec
    AccessKey string
    SecretKey string
    Version   string
}
 
// ResolveQueueAndObjectStorage fetches Queue and ObjectStorage CRs, resolves
// their endpoints, and extracts credentials from the referenced secret.
func ResolveQueueAndObjectStorage(ctx context.Context, c splcommon.ControllerClient, cr splcommon.MetaObject, queueRef, osRef corev1.ObjectReference, serviceAccount string) (*QueueOSConfig, error) {
    cfg := &QueueOSConfig{}
 
    if queueRef.Name != "" {
        ns := cr.GetNamespace()
        if queueRef.Namespace != "" {
            ns = queueRef.Namespace
        }
        var queue enterpriseApi.Queue
        if err := c.Get(ctx, types.NamespacedName{Name: queueRef.Name, Namespace: ns}, &queue); err != nil {
            return nil, err
        }
        cfg.Queue = queue.Spec
    }
    if cfg.Queue.Provider == "sqs" {
        if cfg.Queue.SQS.Endpoint == "" && cfg.Queue.SQS.AuthRegion != "" {
            ep, err := resolveSQSEndpoint(ctx, cfg.Queue.SQS.AuthRegion)
            if err != nil {
                return nil, err
            }
            cfg.Queue.SQS.Endpoint = ep
        }
    }
 
    if osRef.Name != "" {
        ns := cr.GetNamespace()
        if osRef.Namespace != "" {
            ns = osRef.Namespace
        }
        var os enterpriseApi.ObjectStorage
        if err := c.Get(ctx, types.NamespacedName{Name: osRef.Name, Namespace: ns}, &os); err != nil {
            return nil, err
        }
        cfg.OS = os.Spec
    }
    if cfg.OS.Provider == "s3" {
        if cfg.OS.S3.Endpoint == "" && cfg.Queue.SQS.AuthRegion != "" {
            ep, err := resolveS3Endpoint(ctx, cfg.Queue.SQS.AuthRegion)
            if err != nil {
                return nil, err
            }
            cfg.OS.S3.Endpoint = ep
        }
    }
 
    if cfg.Queue.Provider == "sqs" && serviceAccount == "" {
        for _, vol := range cfg.Queue.SQS.VolList {
            if vol.SecretRef != "" {
                accessKey, secretKey, version, err := GetQueueRemoteVolumeSecrets(ctx, vol, c, cr)
                if err != nil {
                    return nil, err
                }
                cfg.AccessKey = accessKey
                cfg.SecretKey = secretKey
                cfg.Version = version
            }
        }
    }
 
    return cfg, nil
}

pkg/splunk/enterprise/ingestorcluster.go — ApplyIngestorCluster

        qosCfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, cr.Spec.QueueRef, cr.Spec.ObjectStorageRef, cr.Spec.ServiceAccount)
        if err != nil {
            scopedLog.Error(err, "Failed to resolve Queue/ObjectStorage config")
            return result, err
        }
 
        secretChanged := cr.Status.CredentialSecretVersion != qosCfg.Version
        serviceAccountChanged := cr.Status.ServiceAccount != cr.Spec.ServiceAccount
 
        if secretChanged || serviceAccountChanged {
            mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client)
            err = mgr.updateIngestorConfFiles(ctx, cr, &qosCfg.Queue, &qosCfg.OS, qosCfg.AccessKey, qosCfg.SecretKey, client)
            // ... rest unchanged
            cr.Status.CredentialSecretVersion = qosCfg.Version
            cr.Status.ServiceAccount = cr.Spec.ServiceAccount
        }

pkg/splunk/enterprise/indexercluster.go — ApplyIndexerClusterManager and ApplyIndexerCluster

        qosCfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, cr.Spec.QueueRef, cr.Spec.ObjectStorageRef, cr.Spec.ServiceAccount)
        if err != nil {
            scopedLog.Error(err, "Failed to resolve Queue/ObjectStorage config")
            return result, err
        }
 
        secretChanged := cr.Status.CredentialSecretVersion != qosCfg.Version
        serviceAccountChanged := cr.Status.ServiceAccount != cr.Spec.ServiceAccount
 
        if cr.Spec.QueueRef.Name != "" {
            if secretChanged || serviceAccountChanged {
                mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client)
                err = mgr.updateIndexerConfFiles(ctx, cr, &qosCfg.Queue, &qosCfg.OS, qosCfg.AccessKey, qosCfg.SecretKey, client)
                // ... rest unchanged
                cr.Status.CredentialSecretVersion = qosCfg.Version
                cr.Status.ServiceAccount = cr.Spec.ServiceAccount
            }
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants