diff --git a/internal/controller/consoleplugin/consoleplugin_objects.go b/internal/controller/consoleplugin/consoleplugin_objects.go index cb18862f19..40169bd11d 100644 --- a/internal/controller/consoleplugin/consoleplugin_objects.go +++ b/internal/controller/consoleplugin/consoleplugin_objects.go @@ -9,7 +9,6 @@ import ( "strconv" "time" - lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" osv1 "github.com/openshift/api/console/v1" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "gopkg.in/yaml.v2" @@ -28,6 +27,7 @@ import ( "github.com/netobserv/netobserv-operator/internal/controller/reconcilers" "github.com/netobserv/netobserv-operator/internal/pkg/helper" "github.com/netobserv/netobserv-operator/internal/pkg/helper/loki" + "github.com/netobserv/netobserv-operator/internal/pkg/manager/status" "github.com/netobserv/netobserv-operator/internal/pkg/metrics" "github.com/netobserv/netobserv-operator/internal/pkg/metrics/alerts" "github.com/netobserv/netobserv-operator/internal/pkg/volumes" @@ -521,25 +521,9 @@ func (b *builder) getHealthRecordingAnnotations() map[string]map[string]string { return annotsPerRecording } -func getLokiStatus(lokiStack *lokiv1.LokiStack) string { - if lokiStack == nil { - // This case should not happen - return "" - } - for _, conditions := range lokiStack.Status.Conditions { - if conditions.Reason == "ReadyComponents" { - if conditions.Status == "True" { - return "ready" - } - break - } - } - return "pending" -} - // returns a configmap with a digest of its configuration contents, which will be used to // detect any configuration change -func (b *builder) configMap(ctx context.Context, lokiStack *lokiv1.LokiStack) (*corev1.ConfigMap, string, error) { +func (b *builder) configMap(ctx context.Context, lokiStatus status.ComponentStatus) (*corev1.ConfigMap, string, error) { config := cfg.PluginConfig{ Server: cfg.ServerConfig{ Port: int(*b.advanced.Port), @@ -555,9 +539,13 @@ func (b *builder) configMap(ctx context.Context, lokiStack *lokiv1.LokiStack) (* // configure loki var err error config.Loki, err = b.getLokiConfig() - if lokiStack != nil { - config.Loki.Status = getLokiStatus(lokiStack) + if lokiStatus.Status != status.StatusUnknown { config.Loki.StatusURL = "" + if lokiStatus.Status == status.StatusReady { + config.Loki.Status = "ready" + } else { + config.Loki.Status = "pending" + } } if err != nil { return nil, "", err diff --git a/internal/controller/consoleplugin/consoleplugin_reconciler.go b/internal/controller/consoleplugin/consoleplugin_reconciler.go index 6d054b262d..3d8cefe3dd 100644 --- a/internal/controller/consoleplugin/consoleplugin_reconciler.go +++ b/internal/controller/consoleplugin/consoleplugin_reconciler.go @@ -13,11 +13,11 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log" - lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" "github.com/netobserv/netobserv-operator/internal/controller/constants" "github.com/netobserv/netobserv-operator/internal/controller/reconcilers" "github.com/netobserv/netobserv-operator/internal/pkg/helper" + "github.com/netobserv/netobserv-operator/internal/pkg/manager/status" "github.com/netobserv/netobserv-operator/internal/pkg/resources" apierrors "k8s.io/apimachinery/pkg/api/errors" ) @@ -54,10 +54,26 @@ func NewReconciler(cmn *reconcilers.Instance) CPReconciler { } // Reconcile is the reconciler entry point to reconcile the current plugin state with the desired configuration -func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCollector) error { - l := log.FromContext(ctx).WithName("console-plugin") +func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCollector, lokiStatus status.ComponentStatus) error { + l := log.FromContext(ctx).WithName("web-console") ctx = log.IntoContext(ctx, l) + defer r.Status.Commit(ctx, r.Client) + + err := r.reconcile(ctx, desired, lokiStatus) + if err != nil { + l.Error(err, "Web Console reconcile failure") + // Set status failure unless it was already set + if !r.Status.HasFailure() { + r.Status.SetFailure("WebConsoleError", err.Error()) + } + return err + } + + return nil +} + +func (r *CPReconciler) reconcile(ctx context.Context, desired *flowslatest.FlowCollector, lokiStatus status.ComponentStatus) error { // Retrieve current owned objects err := r.Managed.FetchAll(ctx) if err != nil { @@ -84,7 +100,7 @@ func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowC } } - cmDigest, err := r.reconcileConfigMap(ctx, &builder, &desired.Spec) + cmDigest, err := r.reconcileConfigMap(ctx, &builder, lokiStatus) if err != nil { return err } @@ -114,6 +130,11 @@ func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowC } else { // delete any existing owned object r.Managed.TryDeleteAll(ctx) + if desired.Spec.OnHold() { + r.Status.SetUnused("FlowCollector is on hold") + } else { + r.Status.SetUnused("Web console not enabled") + } } return nil @@ -180,29 +201,8 @@ func (r *CPReconciler) reconcilePlugin(ctx context.Context, builder *builder, de return nil } -func (r *CPReconciler) reconcileConfigMap(ctx context.Context, builder *builder, desired *flowslatest.FlowCollectorSpec) (string, error) { - var lokiStack *lokiv1.LokiStack - if desired.Loki.Mode == flowslatest.LokiModeLokiStack { - lokiStack = &lokiv1.LokiStack{} - ns := desired.Loki.LokiStack.Namespace - if ns == "" { - ns = desired.Namespace - } - if err := r.Client.Get(ctx, types.NamespacedName{Name: desired.Loki.LokiStack.Name, Namespace: ns}, lokiStack); err != nil { - lokiStack = nil - if apierrors.IsNotFound(err) { - log.FromContext(ctx).Info("LokiStack resource not found, status will not be available", - "name", desired.Loki.LokiStack.Name, - "namespace", ns) - } else { - log.FromContext(ctx).Error(err, "Failed to get LokiStack resource", - "name", desired.Loki.LokiStack.Name, - "namespace", ns) - } - } - } - - newCM, configDigest, err := builder.configMap(ctx, lokiStack) +func (r *CPReconciler) reconcileConfigMap(ctx context.Context, builder *builder, lokiStatus status.ComponentStatus) (string, error) { + newCM, configDigest, err := builder.configMap(ctx, lokiStatus) if err != nil { return "", err } diff --git a/internal/controller/consoleplugin/consoleplugin_test.go b/internal/controller/consoleplugin/consoleplugin_test.go index 0957c325e9..3f383959e8 100644 --- a/internal/controller/consoleplugin/consoleplugin_test.go +++ b/internal/controller/consoleplugin/consoleplugin_test.go @@ -12,7 +12,6 @@ import ( "k8s.io/utils/ptr" "sigs.k8s.io/yaml" - lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" "github.com/netobserv/flowlogs-pipeline/pkg/api" flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" config "github.com/netobserv/netobserv-operator/internal/controller/consoleplugin/config" @@ -33,6 +32,10 @@ var testResources = corev1.ResourceRequirements{ corev1.ResourceMemory: resource.MustParse("512Mi"), }, } +var lokiStatusUnused = status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusUnknown, +} func getPluginConfig() flowslatest.FlowCollectorConsolePlugin { return flowslatest.FlowCollectorConsolePlugin{ @@ -111,7 +114,7 @@ func getAutoScalerSpecs() (ascv2.HorizontalPodAutoscaler, flowslatest.FlowCollec func getBuilder(spec *flowslatest.FlowCollectorSpec, lk *helper.LokiConfig) builder { info := reconcilers.Common{Namespace: testNamespace, Loki: lk, ClusterInfo: &cluster.Info{}} b := newBuilder(info.NewInstance(map[reconcilers.ImageRef]string{reconcilers.MainImage: testImage}, status.Instance{}), spec, constants.PluginName) - _, _, _ = b.configMap(context.Background(), nil) // build configmap to update builder's volumes + _, _, _ = b.configMap(context.Background(), lokiStatusUnused) // build configmap to update builder's volumes return b } @@ -224,8 +227,8 @@ func TestConfigMapUpdateCheck(t *testing.T) { } spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin} builder := getBuilder(&spec, &loki) - old, _, _ := builder.configMap(context.Background(), nil) - nEw, _, _ := builder.configMap(context.Background(), nil) + old, _, _ := builder.configMap(context.Background(), lokiStatusUnused) + nEw, _, _ := builder.configMap(context.Background(), lokiStatusUnused) assert.Equal(old.Data, nEw.Data) // update loki @@ -240,7 +243,7 @@ func TestConfigMapUpdateCheck(t *testing.T) { }}, } builder = getBuilder(&spec, &loki) - nEw, _, _ = builder.configMap(context.Background(), nil) + nEw, _, _ = builder.configMap(context.Background(), lokiStatusUnused) assert.NotEqual(old.Data, nEw.Data) old = nEw @@ -248,7 +251,7 @@ func TestConfigMapUpdateCheck(t *testing.T) { loki.LokiManualParams.StatusURL = "http://loki.status:3100/" loki.LokiManualParams.StatusTLS.Enable = true builder = getBuilder(&spec, &loki) - nEw, _, _ = builder.configMap(context.Background(), nil) + nEw, _, _ = builder.configMap(context.Background(), lokiStatusUnused) assert.NotEqual(old.Data, nEw.Data) old = nEw @@ -259,7 +262,7 @@ func TestConfigMapUpdateCheck(t *testing.T) { CertFile: "status-ca.crt", } builder = getBuilder(&spec, &loki) - nEw, _, _ = builder.configMap(context.Background(), nil) + nEw, _, _ = builder.configMap(context.Background(), lokiStatusUnused) assert.NotEqual(old.Data, nEw.Data) old = nEw @@ -271,7 +274,7 @@ func TestConfigMapUpdateCheck(t *testing.T) { CertKey: "tls.key", } builder = getBuilder(&spec, &loki) - nEw, _, _ = builder.configMap(context.Background(), nil) + nEw, _, _ = builder.configMap(context.Background(), lokiStatusUnused) assert.NotEqual(old.Data, nEw.Data) } @@ -287,8 +290,8 @@ func TestConfigMapUpdateWithLokistackMode(t *testing.T) { loki := helper.NewLokiConfig(&lokiSpec, "any") spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: lokiSpec} builder := getBuilder(&spec, &loki) - old, _, _ := builder.configMap(context.Background(), nil) - nEw, _, _ := builder.configMap(context.Background(), nil) + old, _, _ := builder.configMap(context.Background(), lokiStatusUnused) + nEw, _, _ := builder.configMap(context.Background(), lokiStatusUnused) assert.Equal(old.Data, nEw.Data) // update lokistack name @@ -297,7 +300,7 @@ func TestConfigMapUpdateWithLokistackMode(t *testing.T) { spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: lokiSpec} builder = getBuilder(&spec, &loki) - nEw, _, _ = builder.configMap(context.Background(), nil) + nEw, _, _ = builder.configMap(context.Background(), lokiStatusUnused) assert.NotEqual(old.Data, nEw.Data) old = nEw @@ -307,7 +310,7 @@ func TestConfigMapUpdateWithLokistackMode(t *testing.T) { spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: lokiSpec} builder = getBuilder(&spec, &loki) - nEw, _, _ = builder.configMap(context.Background(), nil) + nEw, _, _ = builder.configMap(context.Background(), lokiStatusUnused) assert.NotEqual(old.Data, nEw.Data) } @@ -332,7 +335,7 @@ func TestConfigMapContent(t *testing.T) { Processor: flowslatest.FlowCollectorFLP{SubnetLabels: flowslatest.SubnetLabels{OpenShiftAutoDetect: ptr.To(false)}}, } builder := getBuilder(&spec, &loki) - cm, _, err := builder.configMap(context.Background(), nil) + cm, _, err := builder.configMap(context.Background(), lokiStatusUnused) assert.NotNil(cm) assert.Nil(err) @@ -495,20 +498,9 @@ func TestLokiStackStatusEmbedding(t *testing.T) { builder := getBuilder(&spec, &loki) // Test 1: LokiStack with ready status - lokiStackReady := &lokiv1.LokiStack{ - ObjectMeta: metav1.ObjectMeta{ - Name: "lokistack", - Namespace: "ls-namespace", - }, - Status: lokiv1.LokiStackStatus{ - Conditions: []metav1.Condition{ - { - Type: "Ready", - Status: "True", - Reason: "ReadyComponents", - }, - }, - }, + lokiStackReady := status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusReady, } cm, _, err := builder.configMap(context.Background(), lokiStackReady) assert.Nil(err) @@ -521,20 +513,10 @@ func TestLokiStackStatusEmbedding(t *testing.T) { assert.Empty(cfg.Loki.StatusURL, "StatusURL should be cleared when LokiStack status is embedded") // Test 2: LokiStack with pending status (no ready condition) - lokiStackPending := &lokiv1.LokiStack{ - ObjectMeta: metav1.ObjectMeta{ - Name: "lokistack", - Namespace: "ls-namespace", - }, - Status: lokiv1.LokiStackStatus{ - Conditions: []metav1.Condition{ - { - Type: "Pending", - Status: "False", - Reason: "PendingComponents", - }, - }, - }, + lokiStackPending := status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusInProgress, + Reason: "PendingComponents", } cm, _, err = builder.configMap(context.Background(), lokiStackPending) assert.Nil(err) @@ -545,33 +527,8 @@ func TestLokiStackStatusEmbedding(t *testing.T) { assert.Equal("pending", cfg.Loki.Status) assert.Empty(cfg.Loki.StatusURL) - // Test 3: LokiStack with ReadyComponents but Status=False - lokiStackNotReady := &lokiv1.LokiStack{ - ObjectMeta: metav1.ObjectMeta{ - Name: "lokistack", - Namespace: "ls-namespace", - }, - Status: lokiv1.LokiStackStatus{ - Conditions: []metav1.Condition{ - { - Type: "Ready", - Status: "False", - Reason: "ReadyComponents", - }, - }, - }, - } - cm, _, err = builder.configMap(context.Background(), lokiStackNotReady) - assert.Nil(err) - assert.NotNil(cm) - - err = yaml.Unmarshal([]byte(cm.Data["config.yaml"]), &cfg) - assert.Nil(err) - assert.Equal("pending", cfg.Loki.Status) - assert.Empty(cfg.Loki.StatusURL) - - // Test 4: No LokiStack provided (nil) - cm, _, err = builder.configMap(context.Background(), nil) + // Test 3: No LokiStack provided (nil) + cm, _, err = builder.configMap(context.Background(), lokiStatusUnused) assert.Nil(err) assert.NotNil(cm) @@ -584,68 +541,6 @@ func TestLokiStackStatusEmbedding(t *testing.T) { assert.NotEmpty(cfgNil.Loki.StatusURL) } -func TestGetLokiStatus(t *testing.T) { - assert := assert.New(t) - - // Test 1: nil LokiStack - status := getLokiStatus(nil) - assert.Empty(status) - - // Test 2: Ready status - lokiStackReady := &lokiv1.LokiStack{ - Status: lokiv1.LokiStackStatus{ - Conditions: []metav1.Condition{ - { - Type: "Ready", - Status: "True", - Reason: "ReadyComponents", - }, - }, - }, - } - status = getLokiStatus(lokiStackReady) - assert.Equal("ready", status) - - // Test 3: Pending status (no ReadyComponents) - lokiStackPending := &lokiv1.LokiStack{ - Status: lokiv1.LokiStackStatus{ - Conditions: []metav1.Condition{ - { - Type: "Pending", - Status: "True", - Reason: "Pending", - }, - }, - }, - } - status = getLokiStatus(lokiStackPending) - assert.Equal("pending", status) - - // Test 4: Not ready (ReadyComponents with Status=False) - lokiStackNotReady := &lokiv1.LokiStack{ - Status: lokiv1.LokiStackStatus{ - Conditions: []metav1.Condition{ - { - Type: "Ready", - Status: "False", - Reason: "ReadyComponents", - }, - }, - }, - } - status = getLokiStatus(lokiStackNotReady) - assert.Equal("pending", status) - - // Test 5: Empty conditions - lokiStackEmpty := &lokiv1.LokiStack{ - Status: lokiv1.LokiStackStatus{ - Conditions: []metav1.Condition{}, - }, - } - status = getLokiStatus(lokiStackEmpty) - assert.Equal("pending", status) -} - func TestLokiStackNamespaceDefaulting(t *testing.T) { assert := assert.New(t) @@ -694,7 +589,7 @@ func TestLokiStackNotFoundBehavior(t *testing.T) { // Test behavior when LokiStack is not found (nil is passed) // This simulates the reconciler behavior when Get() returns NotFound - cm, digest, err := builder.configMap(context.Background(), nil) + cm, digest, err := builder.configMap(context.Background(), lokiStatusUnused) // ConfigMap should still be created successfully assert.Nil(err) diff --git a/internal/controller/loki/config/config.go b/internal/controller/demoloki/config/config.go similarity index 100% rename from internal/controller/loki/config/config.go rename to internal/controller/demoloki/config/config.go diff --git a/internal/controller/loki/config/local-config.yaml b/internal/controller/demoloki/config/local-config.yaml similarity index 100% rename from internal/controller/loki/config/local-config.yaml rename to internal/controller/demoloki/config/local-config.yaml diff --git a/internal/controller/loki/loki_objects.go b/internal/controller/demoloki/demoloki_objects.go similarity index 99% rename from internal/controller/loki/loki_objects.go rename to internal/controller/demoloki/demoloki_objects.go index 022e183d33..bbc1ce30cb 100644 --- a/internal/controller/loki/loki_objects.go +++ b/internal/controller/demoloki/demoloki_objects.go @@ -1,4 +1,4 @@ -package loki +package demoloki import ( "fmt" @@ -13,9 +13,8 @@ import ( "k8s.io/utils/ptr" flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" - cfg "github.com/netobserv/netobserv-operator/internal/controller/loki/config" - "github.com/netobserv/netobserv-operator/internal/controller/constants" + cfg "github.com/netobserv/netobserv-operator/internal/controller/demoloki/config" "github.com/netobserv/netobserv-operator/internal/controller/reconcilers" "github.com/netobserv/netobserv-operator/internal/pkg/helper" "github.com/netobserv/netobserv-operator/internal/pkg/volumes" diff --git a/internal/controller/loki/loki_reconciler.go b/internal/controller/demoloki/demoloki_reconciler.go similarity index 86% rename from internal/controller/loki/loki_reconciler.go rename to internal/controller/demoloki/demoloki_reconciler.go index 4d9795eadc..cda8089fdb 100644 --- a/internal/controller/loki/loki_reconciler.go +++ b/internal/controller/demoloki/demoloki_reconciler.go @@ -1,4 +1,4 @@ -package loki +package demoloki import ( "context" @@ -15,7 +15,7 @@ import ( "github.com/netobserv/netobserv-operator/internal/pkg/helper" ) -// LReconciler reconciles the current console plugin state with the desired configuration +// LReconciler reconciles the current loki state with the desired configuration type LReconciler struct { *reconcilers.Instance configMap *corev1.ConfigMap @@ -35,11 +35,27 @@ func NewReconciler(cmn *reconcilers.Instance) LReconciler { return rec } -// Reconcile is the reconciler entry point to reconcile the current plugin state with the desired configuration +// Reconcile is the reconciler entry point to reconcile the current loki state with the desired configuration func (r *LReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCollector) error { - l := log.FromContext(ctx).WithName("loki") + l := log.FromContext(ctx).WithName("demo-loki") ctx = log.IntoContext(ctx, l) + defer r.Status.Commit(ctx, r.Client) + + err := r.reconcile(ctx, desired) + if err != nil { + l.Error(err, "Demo Loki reconcile failure") + // Set status failure unless it was already set + if !r.Status.HasFailure() { + r.Status.SetFailure("DemoLokiError", err.Error()) + } + return err + } + + return nil +} + +func (r *LReconciler) reconcile(ctx context.Context, desired *flowslatest.FlowCollector) error { // Retrieve current owned objects err := r.Managed.FetchAll(ctx) if err != nil { @@ -80,6 +96,7 @@ func (r *LReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCo if err := r.DeleteIfOwned(ctx, r.service); err != nil { return err } + r.Status.SetUnused("Demo mode not enabled") } return nil diff --git a/internal/controller/ebpf/agent_controller.go b/internal/controller/ebpf/agent_controller.go index f73c6c2a71..a37482fed9 100644 --- a/internal/controller/ebpf/agent_controller.go +++ b/internal/controller/ebpf/agent_controller.go @@ -134,6 +134,24 @@ func NewAgentController(common *reconcilers.Instance) *AgentController { func (c *AgentController) Reconcile(ctx context.Context, target *flowslatest.FlowCollector) error { rlog := log.FromContext(ctx).WithName("ebpf") ctx = log.IntoContext(ctx, rlog) + + defer c.Status.Commit(ctx, c.Client) + + err := c.reconcile(ctx, target) + if err != nil { + rlog.Error(err, "AgentController reconcile failure") + // Set status failure unless it was already set + if !c.Status.HasFailure() { + c.Status.SetFailure("AgentControllerError", err.Error()) + } + return err + } + + return nil +} + +func (c *AgentController) reconcile(ctx context.Context, target *flowslatest.FlowCollector) error { + rlog := log.FromContext(ctx) current, err := c.current(ctx) if err != nil { return fmt.Errorf("fetching current eBPF agent: %w", err) diff --git a/internal/controller/flowcollector_controller.go b/internal/controller/flowcollector_controller.go index 873482eef8..4d5a87a24f 100644 --- a/internal/controller/flowcollector_controller.go +++ b/internal/controller/flowcollector_controller.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" osv1 "github.com/openshift/api/console/v1" securityv1 "github.com/openshift/api/security/v1" appsv1 "k8s.io/api/apps/v1" @@ -14,16 +13,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" "github.com/netobserv/netobserv-operator/internal/controller/consoleplugin" - "github.com/netobserv/netobserv-operator/internal/controller/constants" + "github.com/netobserv/netobserv-operator/internal/controller/demoloki" "github.com/netobserv/netobserv-operator/internal/controller/ebpf" - "github.com/netobserv/netobserv-operator/internal/controller/loki" + "github.com/netobserv/netobserv-operator/internal/controller/lokistack" "github.com/netobserv/netobserv-operator/internal/controller/reconcilers" "github.com/netobserv/netobserv-operator/internal/pkg/cleanup" "github.com/netobserv/netobserv-operator/internal/pkg/helper" @@ -39,11 +35,11 @@ const ( // FlowCollectorReconciler reconciles a FlowCollector object type FlowCollectorReconciler struct { client.Client - mgr *manager.Manager - status status.Instance - watcher *watchers.Watcher - ctrl controller.Controller - lokiWatcherStarted bool + mgr *manager.Manager + status status.Instance + watcher *watchers.Watcher + ctrl controller.Controller + lokistackWatcher *lokistack.Watcher } func Start(ctx context.Context, mgr *manager.Manager) (manager.PostCreateHook, error) { @@ -52,7 +48,7 @@ func Start(ctx context.Context, mgr *manager.Manager) (manager.PostCreateHook, e r := FlowCollectorReconciler{ Client: mgr.Client, mgr: mgr, - status: mgr.Status.ForComponent(status.FlowCollectorLegacy), + status: mgr.Status.ForComponent(status.FlowCollectorController), } builder := ctrl.NewControllerManagedBy(mgr.Manager). @@ -72,17 +68,7 @@ func Start(ctx context.Context, mgr *manager.Manager) (manager.PostCreateHook, e builder.Owns(&osv1.ConsolePlugin{}, reconcilers.UpdateOrDeleteOnlyPred) } - if mgr.ClusterInfo.HasLokiStack(ctx) { - builder.Watches( - &lokiv1.LokiStack{}, - handler.EnqueueRequestsFromMapFunc(func(_ context.Context, _ client.Object) []ctrl.Request { - // When a LokiStack changes, trigger reconcile of the FlowCollector - return []ctrl.Request{{NamespacedName: constants.FlowCollectorName}} - }), - ) - r.lokiWatcherStarted = true - log.Info("LokiStack CRD detected") - } + r.lokistackWatcher = lokistack.Start(ctx, mgr, builder, func() controller.Controller { return r.ctrl }) ctrl, err := builder.Build(&r) if err != nil { @@ -116,17 +102,6 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, _ ctrl.Request) return ctrl.Result{}, nil } - // Dynamically start LokiStack watcher if the API became available - if desired.Spec.Loki.Mode == flowslatest.LokiModeLokiStack { - if err := r.ensureLokiStackWatcher(ctx); err != nil { - l.Error(err, "Failed to start LokiStack watcher") - // Don't fail reconciliation, just log the error - } - } - - // At the moment, status workflow is to start as ready then degrade if necessary - // Later (when legacy controller is broken down into individual controllers), status should start as unknown and only on success finishes as ready - r.status.SetReady() defer r.status.Commit(ctx, r.Client) err = r.reconcile(ctx, clh, desired) @@ -134,44 +109,15 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, _ ctrl.Request) l.Error(err, "FlowCollector reconcile failure") // Set status failure unless it was already set if !r.status.HasFailure() { - r.status.SetFailure("FlowCollectorGenericError", err.Error()) + r.status.SetFailure("ChildReconcilerError", err.Error()) } return ctrl.Result{}, err } + r.status.SetReady() return ctrl.Result{}, nil } -func (r *FlowCollectorReconciler) ensureLokiStackWatcher(ctx context.Context) error { - // Check if LokiStack API is available but watcher not started - if !r.mgr.ClusterInfo.HasLokiStack(ctx) { - return nil - } - - if r.lokiWatcherStarted { - return nil - } - - // LokiStack API is now available, start the watcher - log := log.FromContext(ctx) - log.Info("LokiStack CRD detected after startup, starting watcher") - - h := handler.TypedEnqueueRequestsFromMapFunc(func(_ context.Context, _ *lokiv1.LokiStack) []reconcile.Request { - // When a LokiStack changes, trigger reconcile of the FlowCollector - return []reconcile.Request{{NamespacedName: constants.FlowCollectorName}} - }) - - src := source.Kind(r.mgr.GetCache(), &lokiv1.LokiStack{}, h) - err := r.ctrl.Watch(src) - if err != nil { - return fmt.Errorf("failed to start LokiStack watcher: %w", err) - } - - r.lokiWatcherStarted = true - log.Info("LokiStack watcher started successfully") - return nil -} - func (r *FlowCollectorReconciler) reconcile(ctx context.Context, clh *helper.Client, desired *flowslatest.FlowCollector) error { ns := desired.Spec.GetNamespace() previousNamespace := r.status.GetDeployedNamespace(desired) @@ -187,13 +133,15 @@ func (r *FlowCollectorReconciler) reconcile(ctx context.Context, clh *helper.Cli } r.watcher.Reset(ns) + lokiStatus := r.lokistackWatcher.Reconcile(ctx, desired) + // Create reconcilers cpReconciler := consoleplugin.NewReconciler(reconcilersInfo.NewInstance( map[reconcilers.ImageRef]string{ reconcilers.MainImage: r.mgr.Config.ConsolePluginImage, reconcilers.ConsolePluginCompatImage: r.mgr.Config.ConsolePluginCompatImage, }, - r.status, + r.mgr.Status.ForComponent(status.WebConsole), )) // Check namespace changed @@ -210,25 +158,25 @@ func (r *FlowCollectorReconciler) reconcile(ctx context.Context, clh *helper.Cli reconcilers.MainImage: r.mgr.Config.EBPFAgentImage, reconcilers.BpfByteCodeImage: r.mgr.Config.EBPFByteCodeImage, }, - r.status, + r.mgr.Status.ForComponent(status.EBPFAgents), )) if err := ebpfAgentController.Reconcile(ctx, desired); err != nil { - return r.status.Error("ReconcileAgentFailed", err) + return err } // Console plugin - if err := cpReconciler.Reconcile(ctx, desired); err != nil { - return r.status.Error("ReconcileConsolePluginFailed", err) + if err := cpReconciler.Reconcile(ctx, desired, lokiStatus); err != nil { + return err } - lokiReconciler := loki.NewReconciler(reconcilersInfo.NewInstance( + lokiReconciler := demoloki.NewReconciler(reconcilersInfo.NewInstance( map[reconcilers.ImageRef]string{ reconcilers.MainImage: r.mgr.Config.DemoLokiImage, }, - r.status, + r.mgr.Status.ForComponent(status.DemoLoki), )) if err := lokiReconciler.Reconcile(ctx, desired); err != nil { - return r.status.Error("ReconcileLokiFailed", err) + return err } return nil diff --git a/internal/controller/flp/flp_monolith_reconciler.go b/internal/controller/flp/flp_monolith_reconciler.go index 108021aacc..a4dd98eb6c 100644 --- a/internal/controller/flp/flp_monolith_reconciler.go +++ b/internal/controller/flp/flp_monolith_reconciler.go @@ -90,8 +90,6 @@ func (r *monolithReconciler) reconcile(ctx context.Context, desired *flowslatest return nil } - r.Status.SetReady() // will be overidden if necessary, as error or pending - builder, err := newMonolithBuilder(r.Instance, &desired.Spec, flowMetrics, fcSlices, detectedSubnets) if err != nil { return err diff --git a/internal/controller/flp/flp_transfo_reconciler.go b/internal/controller/flp/flp_transfo_reconciler.go index e851ca092a..f15518af55 100644 --- a/internal/controller/flp/flp_transfo_reconciler.go +++ b/internal/controller/flp/flp_transfo_reconciler.go @@ -87,8 +87,6 @@ func (r *transformerReconciler) reconcile(ctx context.Context, desired *flowslat return nil } - r.Status.SetReady() // will be overidden if necessary, as error or pending - builder, err := newTransfoBuilder(r.Instance, &desired.Spec, flowMetrics, fcSlices, detectedSubnets) if err != nil { return err diff --git a/internal/controller/lokistack/lokistack_watcher.go b/internal/controller/lokistack/lokistack_watcher.go new file mode 100644 index 0000000000..0d6f136f74 --- /dev/null +++ b/internal/controller/lokistack/lokistack_watcher.go @@ -0,0 +1,243 @@ +package lokistack + +import ( + "context" + "fmt" + "strings" + + lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" + flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" + "github.com/netobserv/netobserv-operator/internal/controller/constants" + "github.com/netobserv/netobserv-operator/internal/pkg/manager" + "github.com/netobserv/netobserv-operator/internal/pkg/manager/status" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type Watcher struct { + cl client.Client + mgr *manager.Manager + getController func() controller.Controller + status status.Instance + lokiWatcherStarted bool +} + +func Start(ctx context.Context, mgr *manager.Manager, builder *builder.TypedBuilder[reconcile.Request], getController func() controller.Controller) *Watcher { + log := log.FromContext(ctx) + log.Info("Starting LokiStack watcher") + lsw := Watcher{ + cl: mgr.Client, + mgr: mgr, + getController: getController, + status: mgr.Status.ForComponent(status.LokiStack), + } + + if mgr.ClusterInfo.HasLokiStack(ctx) { + builder.Watches( + &lokiv1.LokiStack{}, + handler.EnqueueRequestsFromMapFunc(func(_ context.Context, _ client.Object) []reconcile.Request { + // When a LokiStack changes, trigger reconcile of the FlowCollector + return []reconcile.Request{{NamespacedName: constants.FlowCollectorName}} + }), + ) + lsw.lokiWatcherStarted = true + log.Info("LokiStack CRD detected") + } + + return &lsw +} + +func (lsw *Watcher) Reconcile(ctx context.Context, fc *flowslatest.FlowCollector) (ret status.ComponentStatus) { + l := log.Log.WithName("lokistack-watcher") + ctx = log.IntoContext(ctx, l) + + defer func() { + ret = lsw.status.Get() + }() + lsw.status.SetUnknown() + + if !fc.Spec.UseLoki() { + lsw.status.SetUnused("Loki is disabled") + return + } + + if fc.Spec.Loki.Mode != flowslatest.LokiModeLokiStack { + lsw.status.SetUnused("Loki is not configured in LokiStack mode") + return + } + + if !lsw.mgr.ClusterInfo.HasLokiStack(ctx) { + lsw.status.SetFailure("LokiStackAPIMissing", "Loki is configured in LokiStack mode, but LokiStack API is missing; check that the Loki Operator is correctly installed.") + return + } + + if err := lsw.ensureLokiStackWatcher(ctx); err != nil { + l.Error(err, "Failed to start LokiStack watcher") + lsw.status.SetFailure("CantWatchLokiStack", err.Error()) + // Don't fail reconciliation, just log the error + } + + if err := lsw.checkStatus(ctx, fc); err != nil { + l.Error(err, "Failed to fetch LokiStack status") + } + + return +} + +func (lsw *Watcher) ensureLokiStackWatcher(ctx context.Context) error { + if lsw.lokiWatcherStarted { + return nil + } + + // LokiStack API is now available, start the watcher + log := log.FromContext(ctx) + log.Info("LokiStack CRD detected after startup, starting watcher") + + h := handler.TypedEnqueueRequestsFromMapFunc(func(_ context.Context, _ *lokiv1.LokiStack) []reconcile.Request { + // When a LokiStack changes, trigger reconcile of the FlowCollector + return []reconcile.Request{{NamespacedName: constants.FlowCollectorName}} + }) + + src := source.Kind(lsw.mgr.GetCache(), &lokiv1.LokiStack{}, h) + err := lsw.getController().Watch(src) + if err != nil { + return fmt.Errorf("failed to start LokiStack watcher: %w", err) + } + + lsw.lokiWatcherStarted = true + log.Info("LokiStack watcher started successfully") + return nil +} + +func (lsw *Watcher) checkStatus(ctx context.Context, fc *flowslatest.FlowCollector) error { + lokiStack := &lokiv1.LokiStack{} + nsname := types.NamespacedName{Name: fc.Spec.Loki.LokiStack.Name, Namespace: fc.Spec.Namespace} + if len(fc.Spec.Loki.LokiStack.Namespace) > 0 { + nsname.Namespace = fc.Spec.Loki.LokiStack.Namespace + } + err := lsw.cl.Get(ctx, nsname, lokiStack) + if err != nil { + lsw.status.SetFailure("CantFetchLokiStack", err.Error()) + return err + } + + // Check LokiStack status conditions + var issues, warnings []string + if len(lokiStack.Status.Conditions) > 0 { + // Check for specific problem conditions first (Degraded, Error, Failed) + // These provide more actionable information than just "NotReady" + for _, cond := range lokiStack.Status.Conditions { + // Skip the Ready and Pending conditions + if cond.Type == "Ready" || cond.Type == "Pending" { + continue + } + // If any condition has Status=True for a problem condition, report it + condTypeLower := strings.ToLower(cond.Type) + if cond.Status == metav1.ConditionTrue && (strings.Contains(condTypeLower, "error") || + strings.Contains(condTypeLower, "degraded") || + strings.Contains(condTypeLower, "failed")) { + issues = append(issues, fmt.Sprintf("%s: %s", cond.Type, cond.Message)) + } else if cond.Status == metav1.ConditionTrue && strings.Contains(condTypeLower, "warning") { + warnings = append(warnings, fmt.Sprintf("%s: %s", cond.Type, cond.Message)) + } + } + } + + // Check LokiStack component status for failed or pending pods + componentIssues := checkLokiStackComponents(&lokiStack.Status.Components) + allIssues := issues + // Aggregate warnings and component issues + allIssues = append(allIssues, componentIssues...) + allIssues = append(allIssues, warnings...) + + if len(issues) > 0 { + lsw.status.SetFailure( + "LokiStackIssues", + fmt.Sprintf("LokiStack has issues [name: %s, namespace: %s]: %s", nsname.Name, nsname.Namespace, strings.Join(allIssues, "; ")), + ) + return nil + } + + // If no specific issues found, check the Ready condition + readyCond := meta.FindStatusCondition(lokiStack.Status.Conditions, "Ready") + if readyCond != nil && readyCond.Status != metav1.ConditionTrue { + msg := readyCond.Message + if len(allIssues) > 0 { + msg += "; " + strings.Join(allIssues, "; ") + } + lsw.status.SetNotReady( + "LokiStackNotReady", + fmt.Sprintf("LokiStack is not ready [name: %s, namespace: %s]: %s - %s", nsname.Name, nsname.Namespace, readyCond.Reason, msg), + ) + return nil + } + + if len(componentIssues) > 0 { + lsw.status.SetNotReady( + "LokiStackComponentIssues", + fmt.Sprintf("LokiStack components have issues [name: %s, namespace: %s]: %s", nsname.Name, nsname.Namespace, strings.Join(allIssues, "; ")), + ) + return nil + } + + if len(warnings) > 0 { + lsw.status.SetDegraded( + "LokiStackWarnings", + fmt.Sprintf("LokiStack has warnings [name: %s, namespace: %s]: %s", nsname.Name, nsname.Namespace, strings.Join(allIssues, "; ")), + ) + return nil + } + + lsw.status.SetReady() + return nil +} + +func checkLokiStackComponents(components *lokiv1.LokiStackComponentStatus) []string { + if components == nil { + return nil + } + + var issues []string + + // Helper function to check a component's pod status map + checkComponent := func(name string, podStatusMap lokiv1.PodStatusMap) { + if len(podStatusMap) == 0 { + return + } + + // Check for failed pods + if failedPods, ok := podStatusMap[lokiv1.PodFailed]; ok && len(failedPods) > 0 { + issues = append(issues, fmt.Sprintf("%s has %d failed pod(s): %s", name, len(failedPods), strings.Join(failedPods, ", "))) + } + + // Check for pending pods + if pendingPods, ok := podStatusMap[lokiv1.PodPending]; ok && len(pendingPods) > 0 { + issues = append(issues, fmt.Sprintf("%s has %d pending pod(s): %s", name, len(pendingPods), strings.Join(pendingPods, ", "))) + } + + // Check for unknown status pods + if unknownPods, ok := podStatusMap[lokiv1.PodStatusUnknown]; ok && len(unknownPods) > 0 { + issues = append(issues, fmt.Sprintf("%s has %d pod(s) with unknown status: %s", name, len(unknownPods), strings.Join(unknownPods, ", "))) + } + } + + // Check all LokiStack components + checkComponent("Compactor", components.Compactor) + checkComponent("Distributor", components.Distributor) + checkComponent("IndexGateway", components.IndexGateway) + checkComponent("Ingester", components.Ingester) + checkComponent("Querier", components.Querier) + checkComponent("QueryFrontend", components.QueryFrontend) + checkComponent("Gateway", components.Gateway) + checkComponent("Ruler", components.Ruler) + + return issues +} diff --git a/internal/pkg/manager/status/loki_test.go b/internal/controller/lokistack/lokistack_watcher_test.go similarity index 67% rename from internal/pkg/manager/status/loki_test.go rename to internal/controller/lokistack/lokistack_watcher_test.go index 89d43167ed..259daa6f51 100644 --- a/internal/pkg/manager/status/loki_test.go +++ b/internal/controller/lokistack/lokistack_watcher_test.go @@ -1,4 +1,4 @@ -package status +package lokistack import ( "context" @@ -7,12 +7,16 @@ import ( lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" + "github.com/netobserv/netobserv-operator/internal/pkg/cluster" + "github.com/netobserv/netobserv-operator/internal/pkg/manager" + "github.com/netobserv/netobserv-operator/internal/pkg/manager/status" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" kerr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -27,41 +31,58 @@ func (m *mockClient) Get(ctx context.Context, key types.NamespacedName, obj clie return args.Error(0) } +func initLSWatcher() (*mockClient, *Watcher) { + clust := &cluster.Info{} + clust.Mock("", "", cluster.LokiStack) + client := &mockClient{} + lsw := Watcher{ + mgr: &manager.Manager{ClusterInfo: clust}, + cl: client, + status: status.NewManager().ForComponent(status.LokiStack), + lokiWatcherStarted: true, + } + return client, &lsw +} + func TestCheckLoki_Disabled(t *testing.T) { fc := &flowslatest.FlowCollector{ Spec: flowslatest.FlowCollectorSpec{ Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(false), + Enable: ptr.To(false), }, }, } - client := &mockClient{} - condition := checkLoki(context.Background(), client, fc) + _, lsw := initLSWatcher() + st := lsw.Reconcile(context.Background(), fc) - assert.Equal(t, LokiIssue, condition.Type) - assert.Equal(t, "Unused", condition.Reason) - assert.Equal(t, metav1.ConditionUnknown, condition.Status) - assert.Contains(t, condition.Message, "Loki is disabled") + assert.Equal(t, status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusUnknown, + Reason: "ComponentUnused", + Message: "Loki is disabled", + }, st) } func TestCheckLoki_NotLokiStackMode(t *testing.T) { fc := &flowslatest.FlowCollector{ Spec: flowslatest.FlowCollectorSpec{ Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), + Enable: ptr.To(true), Mode: flowslatest.LokiModeManual, }, }, } - client := &mockClient{} - condition := checkLoki(context.Background(), client, fc) + _, lsw := initLSWatcher() + st := lsw.Reconcile(context.Background(), fc) - assert.Equal(t, LokiIssue, condition.Type) - assert.Equal(t, "Unused", condition.Reason) - assert.Equal(t, metav1.ConditionUnknown, condition.Status) - assert.Contains(t, condition.Message, "not configured in LokiStack mode") + assert.Equal(t, status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusUnknown, + Reason: "ComponentUnused", + Message: "Loki is not configured in LokiStack mode", + }, st) } func TestCheckLoki_LokiStackNotFound(t *testing.T) { @@ -72,7 +93,7 @@ func TestCheckLoki_LokiStackNotFound(t *testing.T) { Spec: flowslatest.FlowCollectorSpec{ Namespace: "netobserv", Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), + Enable: ptr.To(true), Mode: flowslatest.LokiModeLokiStack, LokiStack: flowslatest.LokiStackRef{ Name: "loki", @@ -81,19 +102,19 @@ func TestCheckLoki_LokiStackNotFound(t *testing.T) { }, } - client := &mockClient{} + client, lsw := initLSWatcher() nsname := types.NamespacedName{Name: "loki", Namespace: "netobserv"} client.On("Get", mock.Anything, nsname, mock.Anything, mock.Anything). Return(kerr.NewNotFound(schema.GroupResource{}, "loki")) - condition := checkLoki(context.Background(), client, fc) + st := lsw.Reconcile(context.Background(), fc) - assert.Equal(t, LokiIssue, condition.Type) - assert.Equal(t, "LokiStackNotFound", condition.Reason) - assert.Equal(t, metav1.ConditionTrue, condition.Status) - assert.Contains(t, condition.Message, "could not be found") - assert.Contains(t, condition.Message, "loki") - assert.Contains(t, condition.Message, "netobserv") + assert.Equal(t, status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusFailure, + Reason: "CantFetchLokiStack", + Message: ` "loki" not found`, + }, st) } func TestCheckLoki_LokiStackNotReady(t *testing.T) { @@ -121,7 +142,7 @@ func TestCheckLoki_LokiStackNotReady(t *testing.T) { Spec: flowslatest.FlowCollectorSpec{ Namespace: "netobserv", Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), + Enable: ptr.To(true), Mode: flowslatest.LokiModeLokiStack, LokiStack: flowslatest.LokiStackRef{ Name: "loki", @@ -130,21 +151,21 @@ func TestCheckLoki_LokiStackNotReady(t *testing.T) { }, } - client := &mockClient{} + client, lsw := initLSWatcher() nsname := types.NamespacedName{Name: "loki", Namespace: "netobserv"} client.On("Get", mock.Anything, nsname, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { arg := args.Get(2).(*lokiv1.LokiStack) *arg = *lokiStack }).Return(nil) - condition := checkLoki(context.Background(), client, fc) + st := lsw.Reconcile(context.Background(), fc) - assert.Equal(t, LokiIssue, condition.Type) - assert.Equal(t, "LokiStackNotReady", condition.Reason) - assert.Equal(t, metav1.ConditionTrue, condition.Status) - assert.Contains(t, condition.Message, "not ready") - assert.Contains(t, condition.Message, "PendingComponents") - assert.Contains(t, condition.Message, "Some components are still starting") + assert.Equal(t, status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusInProgress, + Reason: "LokiStackNotReady", + Message: `LokiStack is not ready [name: loki, namespace: netobserv]: PendingComponents - Some components are still starting`, + }, st) } func TestCheckLoki_LokiStackWithErrorCondition(t *testing.T) { @@ -178,7 +199,7 @@ func TestCheckLoki_LokiStackWithErrorCondition(t *testing.T) { Spec: flowslatest.FlowCollectorSpec{ Namespace: "netobserv", Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), + Enable: ptr.To(true), Mode: flowslatest.LokiModeLokiStack, LokiStack: flowslatest.LokiStackRef{ Name: "loki", @@ -187,20 +208,21 @@ func TestCheckLoki_LokiStackWithErrorCondition(t *testing.T) { }, } - client := &mockClient{} + client, lsw := initLSWatcher() nsname := types.NamespacedName{Name: "loki", Namespace: "netobserv"} client.On("Get", mock.Anything, nsname, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { arg := args.Get(2).(*lokiv1.LokiStack) *arg = *lokiStack }).Return(nil) - condition := checkLoki(context.Background(), client, fc) + st := lsw.Reconcile(context.Background(), fc) - assert.Equal(t, LokiIssue, condition.Type) - assert.Equal(t, "LokiStackIssues", condition.Reason) - assert.Equal(t, metav1.ConditionTrue, condition.Status) - assert.Contains(t, condition.Message, "StorageError") - assert.Contains(t, condition.Message, "Cannot connect to S3 backend") + assert.Equal(t, status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusFailure, + Reason: "LokiStackIssues", + Message: `LokiStack has issues [name: loki, namespace: netobserv]: StorageError: Cannot connect to S3 backend`, + }, st) } func TestCheckLoki_LokiStackWithWarningAndDegradedConditions(t *testing.T) { @@ -252,85 +274,7 @@ func TestCheckLoki_LokiStackWithWarningAndDegradedConditions(t *testing.T) { Spec: flowslatest.FlowCollectorSpec{ Namespace: "netobserv", Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), - Mode: flowslatest.LokiModeLokiStack, - LokiStack: flowslatest.LokiStackRef{ - Name: "loki", - }, - }, - }, - } - - client := &mockClient{} - nsname := types.NamespacedName{Name: "loki", Namespace: "netobserv"} - client.On("Get", mock.Anything, nsname, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - arg := args.Get(2).(*lokiv1.LokiStack) - *arg = *lokiStack - }).Return(nil) - - // Check that Degraded issue is reported in LokiIssue - condition := checkLoki(context.Background(), client, fc) - assert.Equal(t, LokiIssue, condition.Type) - assert.Equal(t, "LokiStackIssues", condition.Reason) - assert.Equal(t, metav1.ConditionTrue, condition.Status) - assert.Contains(t, condition.Message, "Degraded") - assert.Contains(t, condition.Message, "Missing object storage secret") - // Warning should NOT be in LokiIssue - assert.NotContains(t, condition.Message, "Warning") - assert.NotContains(t, condition.Message, "schema configuration") - - // Check that Warning is reported separately in LokiWarning - warningCondition := checkLokiWarnings(context.Background(), client, fc) - assert.Equal(t, LokiWarning, warningCondition.Type) - assert.Equal(t, "LokiStackWarnings", warningCondition.Reason) - assert.Equal(t, metav1.ConditionTrue, warningCondition.Status) - assert.Contains(t, warningCondition.Message, "Warning") - assert.Contains(t, warningCondition.Message, "The schema configuration does not contain the most recent schema version") -} - -func TestCheckLokiWarnings_Disabled(t *testing.T) { - fc := &flowslatest.FlowCollector{ - Spec: flowslatest.FlowCollectorSpec{ - Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(false), - }, - }, - } - - client := &mockClient{} - condition := checkLokiWarnings(context.Background(), client, fc) - - assert.Equal(t, LokiWarning, condition.Type) - assert.Equal(t, "Unused", condition.Reason) - assert.Equal(t, metav1.ConditionUnknown, condition.Status) -} - -func TestCheckLokiWarnings_NoWarnings(t *testing.T) { - lokiStack := &lokiv1.LokiStack{ - ObjectMeta: metav1.ObjectMeta{ - Name: "loki", - Namespace: "netobserv", - }, - Status: lokiv1.LokiStackStatus{ - Conditions: []metav1.Condition{ - { - Type: "Ready", - Status: metav1.ConditionTrue, - Reason: "Ready", - Message: "All components ready", - }, - }, - }, - } - - fc := &flowslatest.FlowCollector{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster", - }, - Spec: flowslatest.FlowCollectorSpec{ - Namespace: "netobserv", - Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), + Enable: ptr.To(true), Mode: flowslatest.LokiModeLokiStack, LokiStack: flowslatest.LokiStackRef{ Name: "loki", @@ -339,21 +283,22 @@ func TestCheckLokiWarnings_NoWarnings(t *testing.T) { }, } - client := &mockClient{} + client, lsw := initLSWatcher() nsname := types.NamespacedName{Name: "loki", Namespace: "netobserv"} client.On("Get", mock.Anything, nsname, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { arg := args.Get(2).(*lokiv1.LokiStack) *arg = *lokiStack }).Return(nil) - condition := checkLokiWarnings(context.Background(), client, fc) + st := lsw.Reconcile(context.Background(), fc) - assert.Equal(t, LokiWarning, condition.Type) - assert.Equal(t, "NoWarning", condition.Reason) - assert.Equal(t, metav1.ConditionFalse, condition.Status) + assert.Equal(t, status.StatusFailure, st.Status) + assert.Equal(t, "LokiStackIssues", st.Reason) + assert.Contains(t, st.Message, "Missing object storage secret") + assert.Contains(t, st.Message, "The schema configuration does not contain the most recent schema version and needs an update") } -func TestCheckLokiWarnings_WithWarning(t *testing.T) { +func TestCheckLoki_LokiStackWithJustWarnings(t *testing.T) { lokiStack := &lokiv1.LokiStack{ ObjectMeta: metav1.ObjectMeta{ Name: "loki", @@ -384,7 +329,7 @@ func TestCheckLokiWarnings_WithWarning(t *testing.T) { Spec: flowslatest.FlowCollectorSpec{ Namespace: "netobserv", Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), + Enable: ptr.To(true), Mode: flowslatest.LokiModeLokiStack, LokiStack: flowslatest.LokiStackRef{ Name: "loki", @@ -393,20 +338,21 @@ func TestCheckLokiWarnings_WithWarning(t *testing.T) { }, } - client := &mockClient{} + client, lsw := initLSWatcher() nsname := types.NamespacedName{Name: "loki", Namespace: "netobserv"} client.On("Get", mock.Anything, nsname, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { arg := args.Get(2).(*lokiv1.LokiStack) *arg = *lokiStack }).Return(nil) - condition := checkLokiWarnings(context.Background(), client, fc) + st := lsw.Reconcile(context.Background(), fc) - assert.Equal(t, LokiWarning, condition.Type) - assert.Equal(t, "LokiStackWarnings", condition.Reason) - assert.Equal(t, metav1.ConditionTrue, condition.Status) - assert.Contains(t, condition.Message, "Warning") - assert.Contains(t, condition.Message, "schema configuration") + assert.Equal(t, status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusDegraded, + Reason: "LokiStackWarnings", + Message: `LokiStack has warnings [name: loki, namespace: netobserv]: Warning: The schema configuration does not contain the most recent schema version`, + }, st) } func TestCheckLoki_LokiStackComponentsWithFailedPods(t *testing.T) { @@ -442,7 +388,7 @@ func TestCheckLoki_LokiStackComponentsWithFailedPods(t *testing.T) { Spec: flowslatest.FlowCollectorSpec{ Namespace: "netobserv", Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), + Enable: ptr.To(true), Mode: flowslatest.LokiModeLokiStack, LokiStack: flowslatest.LokiStackRef{ Name: "loki", @@ -451,24 +397,21 @@ func TestCheckLoki_LokiStackComponentsWithFailedPods(t *testing.T) { }, } - client := &mockClient{} + client, lsw := initLSWatcher() nsname := types.NamespacedName{Name: "loki", Namespace: "netobserv"} client.On("Get", mock.Anything, nsname, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { arg := args.Get(2).(*lokiv1.LokiStack) *arg = *lokiStack }).Return(nil) - condition := checkLoki(context.Background(), client, fc) - - assert.Equal(t, LokiIssue, condition.Type) - assert.Equal(t, "LokiStackComponentIssues", condition.Reason) - assert.Equal(t, metav1.ConditionTrue, condition.Status) - assert.Contains(t, condition.Message, "Ingester") - assert.Contains(t, condition.Message, "2 failed pod(s)") - assert.Contains(t, condition.Message, "ingester-0") - assert.Contains(t, condition.Message, "Querier") - assert.Contains(t, condition.Message, "1 pending pod(s)") - assert.Contains(t, condition.Message, "querier-0") + st := lsw.Reconcile(context.Background(), fc) + + assert.Equal(t, status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusInProgress, + Reason: "LokiStackComponentIssues", + Message: `LokiStack components have issues [name: loki, namespace: netobserv]: Ingester has 2 failed pod(s): ingester-0, ingester-1; Querier has 1 pending pod(s): querier-0`, + }, st) } func TestCheckLoki_LokiStackHealthy(t *testing.T) { @@ -507,7 +450,7 @@ func TestCheckLoki_LokiStackHealthy(t *testing.T) { Spec: flowslatest.FlowCollectorSpec{ Namespace: "netobserv", Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), + Enable: ptr.To(true), Mode: flowslatest.LokiModeLokiStack, LokiStack: flowslatest.LokiStackRef{ Name: "loki", @@ -516,18 +459,21 @@ func TestCheckLoki_LokiStackHealthy(t *testing.T) { }, } - client := &mockClient{} + client, lsw := initLSWatcher() nsname := types.NamespacedName{Name: "loki", Namespace: "netobserv"} client.On("Get", mock.Anything, nsname, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { arg := args.Get(2).(*lokiv1.LokiStack) *arg = *lokiStack }).Return(nil) - condition := checkLoki(context.Background(), client, fc) + st := lsw.Reconcile(context.Background(), fc) - assert.Equal(t, LokiIssue, condition.Type) - assert.Equal(t, "NoIssue", condition.Reason) - assert.Equal(t, metav1.ConditionFalse, condition.Status) + assert.Equal(t, status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusReady, + Reason: "", + Message: "", + }, st) } func TestCheckLoki_CustomNamespace(t *testing.T) { @@ -555,7 +501,7 @@ func TestCheckLoki_CustomNamespace(t *testing.T) { Spec: flowslatest.FlowCollectorSpec{ Namespace: "netobserv", Loki: flowslatest.FlowCollectorLoki{ - Enable: ptr(true), + Enable: ptr.To(true), Mode: flowslatest.LokiModeLokiStack, LokiStack: flowslatest.LokiStackRef{ Name: "custom-loki", @@ -565,18 +511,21 @@ func TestCheckLoki_CustomNamespace(t *testing.T) { }, } - client := &mockClient{} + client, lsw := initLSWatcher() nsname := types.NamespacedName{Name: "custom-loki", Namespace: "observability"} client.On("Get", mock.Anything, nsname, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { arg := args.Get(2).(*lokiv1.LokiStack) *arg = *lokiStack }).Return(nil) - condition := checkLoki(context.Background(), client, fc) + st := lsw.Reconcile(context.Background(), fc) - assert.Equal(t, LokiIssue, condition.Type) - assert.Equal(t, "NoIssue", condition.Reason) - assert.Equal(t, metav1.ConditionFalse, condition.Status) + assert.Equal(t, status.ComponentStatus{ + Name: status.LokiStack, + Status: status.StatusReady, + Reason: "", + Message: "", + }, st) } func TestCheckLokiStackComponents_AllComponentTypes(t *testing.T) { @@ -612,7 +561,7 @@ func TestCheckLokiStackComponents_AllComponentTypes(t *testing.T) { assert.Len(t, issues, 6) // Should report 6 issues (failed, pending, and unknown pods) // Check that all problematic components are reported - issuesStr := joinIssues(issues) + issuesStr := strings.Join(issues, "; ") assert.Contains(t, issuesStr, "Compactor has 1 failed pod(s): compactor-0") assert.Contains(t, issuesStr, "Distributor has 1 pending pod(s): distributor-0") assert.Contains(t, issuesStr, "IndexGateway has 1 pod(s) with unknown status: index-gateway-0") @@ -660,17 +609,3 @@ func TestCheckLokiStackComponents_OnlyRunningPods(t *testing.T) { issues := checkLokiStackComponents(components) assert.Empty(t, issues) } - -// Helper functions - -func ptr[T any](v T) *T { - return &v -} - -func joinIssues(issues []string) string { - result := "" - for _, issue := range issues { - result += issue + " " - } - return result -} diff --git a/internal/controller/reconcilers/reconcilers.go b/internal/controller/reconcilers/reconcilers.go index 72d596fb42..8de47ef4ab 100644 --- a/internal/controller/reconcilers/reconcilers.go +++ b/internal/controller/reconcilers/reconcilers.go @@ -122,6 +122,7 @@ func ReconcileConfigMap(ctx context.Context, cl *helper.Client, current, desired return cl.UpdateIfOwned(ctx, current, desired) } +// returns true if ready, false if still in progress func ReconcileDaemonSet(ctx context.Context, ci *Instance, old, n *appsv1.DaemonSet, containerName string, report *helper.ChangeReport) error { if !ci.Managed.Exists(old) { ci.Status.SetCreatingDaemonSet(n) diff --git a/internal/pkg/cluster/cluster.go b/internal/pkg/cluster/cluster.go index 721cfce693..e90c554183 100644 --- a/internal/pkg/cluster/cluster.go +++ b/internal/pkg/cluster/cluster.go @@ -30,7 +30,7 @@ type discoveryClient interface { } type Info struct { - apisMap map[string]bool + apisMap map[APIName]bool apisMapLock sync.RWMutex id string openShiftVersion *semver.Version @@ -44,14 +44,16 @@ type Info struct { onRefresh func() } +type APIName string + var ( - consolePlugin = "consoleplugins." + osv1.GroupVersion.String() - cno = "networks." + operatorv1.GroupVersion.String() - svcMonitor = "servicemonitors." + monv1.SchemeGroupVersion.String() - promRule = "prometheusrules." + monv1.SchemeGroupVersion.String() - ocpSecurity = "securitycontextconstraints." + securityv1.SchemeGroupVersion.String() - endpointSlices = "endpointslices." + discoveryv1.SchemeGroupVersion.String() - lokistacks = "lokistacks." + lokiv1.GroupVersion.String() + ConsolePlugin APIName = APIName("consoleplugins." + osv1.GroupVersion.String()) + CNO APIName = APIName("networks." + operatorv1.GroupVersion.String()) + SvcMonitor APIName = APIName("servicemonitors." + monv1.SchemeGroupVersion.String()) + PromRule APIName = APIName("prometheusrules." + monv1.SchemeGroupVersion.String()) + OCPSecurity APIName = APIName("securitycontextconstraints." + securityv1.SchemeGroupVersion.String()) + EndpointSlices APIName = APIName("endpointslices." + discoveryv1.SchemeGroupVersion.String()) + LokiStack APIName = APIName("lokistacks." + lokiv1.GroupVersion.String()) ) func NewInfo(ctx context.Context, cfg *rest.Config, dcl *discovery.DiscoveryClient, onRefresh func()) (*Info, func(ctx context.Context) error, error) { @@ -100,14 +102,14 @@ func (c *Info) fetchAvailableAPIsInternal(ctx context.Context, allowCriticalFail c.apisMapLock.Lock() defer c.apisMapLock.Unlock() if c.apisMap == nil { - c.apisMap = map[string]bool{ - consolePlugin: false, - cno: false, - svcMonitor: false, - promRule: false, - ocpSecurity: false, - endpointSlices: false, - lokistacks: false, + c.apisMap = map[APIName]bool{ + ConsolePlugin: false, + CNO: false, + SvcMonitor: false, + PromRule: false, + OCPSecurity: false, + EndpointSlices: false, + LokiStack: false, } firstRun = true } @@ -123,11 +125,11 @@ func (c *Info) fetchAvailableAPIsInternal(ctx context.Context, allowCriticalFail } else if hasDiscoveryError { // Check if the wanted API is in error for gv, gvErr := range discErr.Groups { - if strings.Contains(apiName, gv.String()) { + if strings.Contains(string(apiName), gv.String()) { log.Error(gvErr, "some API-related features are unavailable; you can check for stale APIs with 'kubectl get apiservice'", "GroupVersion", gv.String(), "api", apiName) // OCP Security API is critical - we MUST know if we're on OpenShift // to avoid wrong security context configurations - if apiName == ocpSecurity { + if apiName == OCPSecurity { criticalAPIFailed = true } } @@ -156,11 +158,11 @@ func (c *Info) fetchAvailableAPIsInternal(ctx context.Context, allowCriticalFail return nil } -func hasAPI(apiName string, resources []*metav1.APIResourceList) bool { +func hasAPI(apiName APIName, resources []*metav1.APIResourceList) bool { for i := range resources { for j := range resources[i].APIResources { gvk := resources[i].APIResources[j].Name + "." + resources[i].GroupVersion - if apiName == gvk { + if string(apiName) == gvk { return true } } @@ -271,18 +273,21 @@ func (c *Info) setInfo(id string, openShiftVersion *semver.Version, cni flowslat } // Mock shouldn't be used except for testing -func (c *Info) Mock(v string, cni flowslatest.NetworkType) { +func (c *Info) Mock(v string, cni flowslatest.NetworkType, apis ...APIName) { if c.apisMap == nil { - c.apisMap = make(map[string]bool) + c.apisMap = make(map[APIName]bool) } if v == "" { // No OpenShift - c.apisMap[ocpSecurity] = false + c.apisMap[OCPSecurity] = false c.openShiftVersion = nil } else { - c.apisMap[ocpSecurity] = true + c.apisMap[OCPSecurity] = true c.openShiftVersion = semver.New(v) } + for _, api := range apis { + c.apisMap[api] = true + } c.cni = cni c.ready = true } @@ -360,41 +365,41 @@ func (c *Info) IsOpenShift() bool { func (c *Info) HasConsolePlugin() bool { c.apisMapLock.RLock() defer c.apisMapLock.RUnlock() - return c.apisMap[consolePlugin] + return c.apisMap[ConsolePlugin] } // HasOCPSecurity returns true if "consoles.config.openshift.io" API was found func (c *Info) HasOCPSecurity() bool { c.apisMapLock.RLock() defer c.apisMapLock.RUnlock() - return c.apisMap[ocpSecurity] + return c.apisMap[OCPSecurity] } // HasCNO returns true if "networks.operator.openshift.io" API was found func (c *Info) HasCNO() bool { c.apisMapLock.RLock() defer c.apisMapLock.RUnlock() - return c.apisMap[cno] + return c.apisMap[CNO] } // HasSvcMonitor returns true if "servicemonitors.monitoring.coreos.com" API was found func (c *Info) HasSvcMonitor() bool { c.apisMapLock.RLock() defer c.apisMapLock.RUnlock() - return c.apisMap[svcMonitor] + return c.apisMap[SvcMonitor] } // HasPromRule returns true if "prometheusrules.monitoring.coreos.com" API was found func (c *Info) HasPromRule() bool { c.apisMapLock.RLock() defer c.apisMapLock.RUnlock() - return c.apisMap[promRule] + return c.apisMap[PromRule] } func (c *Info) HasEndpointSlices() bool { c.apisMapLock.RLock() defer c.apisMapLock.RUnlock() - return c.apisMap[endpointSlices] + return c.apisMap[EndpointSlices] } // hasCRDProperty returns property presence for any CRD, given a dot-separated path such as "spec.foo.bar" @@ -434,7 +439,7 @@ func getCRDPropertyInVersion(v *apix.CustomResourceDefinitionVersion, parts []st // HasLokiStack returns true if "lokistack" API was found func (c *Info) HasLokiStack(ctx context.Context) bool { - if !c.apisMap[lokistacks] { + if !c.apisMap[LokiStack] { err := c.fetchAvailableAPIsInternal(ctx, true) if err != nil { return false @@ -442,5 +447,5 @@ func (c *Info) HasLokiStack(ctx context.Context) bool { } c.apisMapLock.RLock() defer c.apisMapLock.RUnlock() - return c.apisMap[lokistacks] + return c.apisMap[LokiStack] } diff --git a/internal/pkg/cluster/cluster_test.go b/internal/pkg/cluster/cluster_test.go index 507039afce..1b1eaac7c8 100644 --- a/internal/pkg/cluster/cluster_test.go +++ b/internal/pkg/cluster/cluster_test.go @@ -280,14 +280,14 @@ func TestHasAPI(t *testing.T) { func TestIsOpenShift(t *testing.T) { // Test OpenShift detection info := &Info{ - apisMap: map[string]bool{ - ocpSecurity: true, + apisMap: map[APIName]bool{ + OCPSecurity: true, }, } assert.True(t, info.IsOpenShift()) // Test non-OpenShift - info.apisMap[ocpSecurity] = false + info.apisMap[OCPSecurity] = false assert.False(t, info.IsOpenShift()) } @@ -574,8 +574,8 @@ func (m *mockLiveClient) getCRD(_ context.Context, name string) (*apix.CustomRes func stubOpenShiftInfo(version string) (*Info, *configv1.ClusterVersion) { return &Info{ - apisMap: map[string]bool{ - ocpSecurity: true, + apisMap: map[APIName]bool{ + OCPSecurity: true, }, }, &configv1.ClusterVersion{ diff --git a/internal/pkg/manager/status/loki.go b/internal/pkg/manager/status/loki.go deleted file mode 100644 index f9e7364122..0000000000 --- a/internal/pkg/manager/status/loki.go +++ /dev/null @@ -1,211 +0,0 @@ -package status - -import ( - "context" - "fmt" - "strings" - - lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" - flowslatest "github.com/netobserv/netobserv-operator/api/flowcollector/v1beta2" - kerr "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -func checkLoki(ctx context.Context, c client.Client, fc *flowslatest.FlowCollector) metav1.Condition { - if !fc.Spec.UseLoki() { - return metav1.Condition{ - Type: LokiIssue, - Reason: "Unused", - Status: metav1.ConditionUnknown, - Message: "Loki is disabled", - } - } - if fc.Spec.Loki.Mode != flowslatest.LokiModeLokiStack { - return metav1.Condition{ - Type: LokiIssue, - Reason: "Unused", - Status: metav1.ConditionUnknown, - Message: "Loki is not configured in LokiStack mode", - } - } - lokiStack := &lokiv1.LokiStack{} - nsname := types.NamespacedName{Name: fc.Spec.Loki.LokiStack.Name, Namespace: fc.Spec.Namespace} - if len(fc.Spec.Loki.LokiStack.Namespace) > 0 { - nsname.Namespace = fc.Spec.Loki.LokiStack.Namespace - } - err := c.Get(ctx, nsname, lokiStack) - if err != nil { - if kerr.IsNotFound(err) { - return metav1.Condition{ - Type: LokiIssue, - Reason: "LokiStackNotFound", - Status: metav1.ConditionTrue, - Message: fmt.Sprintf("The configured LokiStack reference could not be found [name: %s, namespace: %s]", nsname.Name, nsname.Namespace), - } - } - return metav1.Condition{ - Type: LokiIssue, - Reason: "Error", - Status: metav1.ConditionTrue, - Message: fmt.Sprintf("Error while fetching configured LokiStack: %s", err.Error()), - } - } - - // Check LokiStack status conditions - if len(lokiStack.Status.Conditions) > 0 { - // Check for specific problem conditions first (Degraded, Error, Failed) - // These provide more actionable information than just "NotReady" - // Note: Warnings are handled separately in checkLokiWarnings() - var issues []string - for _, cond := range lokiStack.Status.Conditions { - // Skip the Ready, Pending, and Warning conditions - if cond.Type == "Ready" || cond.Type == "Pending" || cond.Type == "Warning" { - continue - } - // If any condition has Status=True for a problem condition, report it - condTypeLower := strings.ToLower(cond.Type) - if cond.Status == metav1.ConditionTrue && (strings.Contains(condTypeLower, "error") || - strings.Contains(condTypeLower, "degraded") || - strings.Contains(condTypeLower, "failed")) { - issues = append(issues, fmt.Sprintf("%s: %s", cond.Type, cond.Message)) - } - } - if len(issues) > 0 { - return metav1.Condition{ - Type: LokiIssue, - Reason: "LokiStackIssues", - Status: metav1.ConditionTrue, - Message: fmt.Sprintf("LokiStack has issues [name: %s, namespace: %s]: %s", nsname.Name, nsname.Namespace, strings.Join(issues, "; ")), - } - } - - // If no specific issues found, check the Ready condition - readyCond := meta.FindStatusCondition(lokiStack.Status.Conditions, "Ready") - if readyCond != nil && readyCond.Status != metav1.ConditionTrue { - return metav1.Condition{ - Type: LokiIssue, - Reason: "LokiStackNotReady", - Status: metav1.ConditionTrue, - Message: fmt.Sprintf("LokiStack is not ready [name: %s, namespace: %s]: %s - %s", nsname.Name, nsname.Namespace, readyCond.Reason, readyCond.Message), - } - } - } - - // Check LokiStack component status for failed or pending pods - componentIssues := checkLokiStackComponents(&lokiStack.Status.Components) - if len(componentIssues) > 0 { - return metav1.Condition{ - Type: LokiIssue, - Reason: "LokiStackComponentIssues", - Status: metav1.ConditionTrue, - Message: fmt.Sprintf("LokiStack components have issues [name: %s, namespace: %s]: %s", nsname.Name, nsname.Namespace, strings.Join(componentIssues, "; ")), - } - } - - return metav1.Condition{ - Type: LokiIssue, - Reason: "NoIssue", - Status: metav1.ConditionFalse, - } -} - -func checkLokiStackComponents(components *lokiv1.LokiStackComponentStatus) []string { - if components == nil { - return nil - } - - var issues []string - - // Helper function to check a component's pod status map - checkComponent := func(name string, podStatusMap lokiv1.PodStatusMap) { - if len(podStatusMap) == 0 { - return - } - - // Check for failed pods - if failedPods, ok := podStatusMap[lokiv1.PodFailed]; ok && len(failedPods) > 0 { - issues = append(issues, fmt.Sprintf("%s has %d failed pod(s): %s", name, len(failedPods), strings.Join(failedPods, ", "))) - } - - // Check for pending pods - if pendingPods, ok := podStatusMap[lokiv1.PodPending]; ok && len(pendingPods) > 0 { - issues = append(issues, fmt.Sprintf("%s has %d pending pod(s): %s", name, len(pendingPods), strings.Join(pendingPods, ", "))) - } - - // Check for unknown status pods - if unknownPods, ok := podStatusMap[lokiv1.PodStatusUnknown]; ok && len(unknownPods) > 0 { - issues = append(issues, fmt.Sprintf("%s has %d pod(s) with unknown status: %s", name, len(unknownPods), strings.Join(unknownPods, ", "))) - } - } - - // Check all LokiStack components - checkComponent("Compactor", components.Compactor) - checkComponent("Distributor", components.Distributor) - checkComponent("IndexGateway", components.IndexGateway) - checkComponent("Ingester", components.Ingester) - checkComponent("Querier", components.Querier) - checkComponent("QueryFrontend", components.QueryFrontend) - checkComponent("Gateway", components.Gateway) - checkComponent("Ruler", components.Ruler) - - return issues -} - -func checkLokiWarnings(ctx context.Context, c client.Client, fc *flowslatest.FlowCollector) metav1.Condition { - if !fc.Spec.UseLoki() { - return metav1.Condition{ - Type: LokiWarning, - Reason: "Unused", - Status: metav1.ConditionUnknown, - } - } - if fc.Spec.Loki.Mode != flowslatest.LokiModeLokiStack { - return metav1.Condition{ - Type: LokiWarning, - Reason: "Unused", - Status: metav1.ConditionUnknown, - } - } - lokiStack := &lokiv1.LokiStack{} - nsname := types.NamespacedName{Name: fc.Spec.Loki.LokiStack.Name, Namespace: fc.Spec.Namespace} - if len(fc.Spec.Loki.LokiStack.Namespace) > 0 { - nsname.Namespace = fc.Spec.Loki.LokiStack.Namespace - } - err := c.Get(ctx, nsname, lokiStack) - if err != nil { - // If we can't get the LokiStack, don't report warnings - // (the main checkLoki will report the error) - return metav1.Condition{ - Type: LokiWarning, - Reason: "NoWarning", - Status: metav1.ConditionFalse, - } - } - - // Check for Warning conditions - var warnings []string - for _, cond := range lokiStack.Status.Conditions { - condTypeLower := strings.ToLower(cond.Type) - if cond.Status == metav1.ConditionTrue && strings.Contains(condTypeLower, "warning") { - warnings = append(warnings, fmt.Sprintf("%s: %s", cond.Type, cond.Message)) - } - } - - if len(warnings) > 0 { - return metav1.Condition{ - Type: LokiWarning, - Reason: "LokiStackWarnings", - Status: metav1.ConditionTrue, - Message: fmt.Sprintf("LokiStack has warnings [name: %s, namespace: %s]: %s", nsname.Name, nsname.Namespace, strings.Join(warnings, "; ")), - } - } - - return metav1.Condition{ - Type: LokiWarning, - Reason: "NoWarning", - Status: metav1.ConditionFalse, - } -} diff --git a/internal/pkg/manager/status/status_manager.go b/internal/pkg/manager/status/status_manager.go index 7a0f1bc880..86dd5d6f05 100644 --- a/internal/pkg/manager/status/status_manager.go +++ b/internal/pkg/manager/status/status_manager.go @@ -21,78 +21,90 @@ import ( type ComponentName string const ( - FlowCollectorLegacy ComponentName = "FlowCollectorLegacy" + FlowCollectorController ComponentName = "FlowCollectorController" + EBPFAgents ComponentName = "EBPFAgents" + WebConsole ComponentName = "WebConsole" FLPParent ComponentName = "FLPParent" FLPMonolith ComponentName = "FLPMonolith" FLPTransformer ComponentName = "FLPTransformer" Monitoring ComponentName = "Monitoring" StaticController ComponentName = "StaticController" NetworkPolicy ComponentName = "NetworkPolicy" + DemoLoki ComponentName = "DemoLoki" + LokiStack ComponentName = "LokiStack" ConditionConfigurationIssue = "ConfigurationIssue" - LokiIssue = "LokiIssue" - LokiWarning = "LokiWarning" ) -var allNames = []ComponentName{FlowCollectorLegacy, Monitoring, StaticController} - type Manager struct { statuses sync.Map } func NewManager() *Manager { - s := Manager{} - for _, cpnt := range allNames { - s.statuses.Store(cpnt, ComponentStatus{ - name: cpnt, - status: StatusUnknown, - }) + return &Manager{} +} + +func (s *Manager) getStatus(cpnt ComponentName) *ComponentStatus { + v, _ := s.statuses.Load(cpnt) + if v != nil { + if s, ok := v.(ComponentStatus); ok { + return &s + } } - return &s + return nil } func (s *Manager) setInProgress(cpnt ComponentName, reason, message string) { s.statuses.Store(cpnt, ComponentStatus{ - name: cpnt, - status: StatusInProgress, - reason: reason, - message: message, + Name: cpnt, + Status: StatusInProgress, + Reason: reason, + Message: message, }) } func (s *Manager) setFailure(cpnt ComponentName, reason, message string) { s.statuses.Store(cpnt, ComponentStatus{ - name: cpnt, - status: StatusFailure, - reason: reason, - message: message, + Name: cpnt, + Status: StatusFailure, + Reason: reason, + Message: message, + }) +} + +func (s *Manager) setDegraded(cpnt ComponentName, reason, message string) { + s.statuses.Store(cpnt, ComponentStatus{ + Name: cpnt, + Status: StatusDegraded, + Reason: reason, + Message: message, }) } func (s *Manager) hasFailure(cpnt ComponentName) bool { v, _ := s.statuses.Load(cpnt) - return v != nil && v.(ComponentStatus).status == StatusFailure + return v != nil && v.(ComponentStatus).Status == StatusFailure } func (s *Manager) setReady(cpnt ComponentName) { s.statuses.Store(cpnt, ComponentStatus{ - name: cpnt, - status: StatusReady, + Name: cpnt, + Status: StatusReady, }) } func (s *Manager) setUnknown(cpnt ComponentName) { s.statuses.Store(cpnt, ComponentStatus{ - name: cpnt, - status: StatusUnknown, + Name: cpnt, + Status: StatusUnknown, }) } func (s *Manager) setUnused(cpnt ComponentName, message string) { s.statuses.Store(cpnt, ComponentStatus{ - name: cpnt, - status: StatusUnknown, - reason: "ComponentUnused", - message: message, + Name: cpnt, + Status: StatusUnknown, + Reason: "ComponentUnused", + Message: message, }) } @@ -103,20 +115,22 @@ func (s *Manager) getConditions() []metav1.Condition { Reason: "Ready", } conds := []metav1.Condition{} - counters := make(map[Status]int, len(allNames)) + counters := make(map[Status]int) s.statuses.Range(func(_, v any) bool { status := v.(ComponentStatus) conds = append(conds, status.toCondition()) - counters[status.status]++ + counters[status.Status]++ return true }) - global.Message = fmt.Sprintf("%d ready components, %d with failure, %d pending", counters[StatusReady], counters[StatusFailure], counters[StatusInProgress]) + global.Message = fmt.Sprintf("%d ready components, %d with failure, %d pending, %d degraded", counters[StatusReady], counters[StatusFailure], counters[StatusInProgress], counters[StatusDegraded]) if counters[StatusFailure] > 0 { global.Status = metav1.ConditionFalse global.Reason = "Failure" } else if counters[StatusInProgress] > 0 { global.Status = metav1.ConditionFalse global.Reason = "Pending" + } else if counters[StatusDegraded] > 0 { + global.Reason = "Ready,Degraded" } return append([]metav1.Condition{global}, conds...) } @@ -139,8 +153,6 @@ func updateStatus(ctx context.Context, c client.Client, conditions ...metav1.Con return err } conditions = append(conditions, checkValidation(ctx, &fc)) - conditions = append(conditions, checkLoki(ctx, c, &fc)) - conditions = append(conditions, checkLokiWarnings(ctx, c, &fc)) for _, c := range conditions { meta.SetStatusCondition(&fc.Status.Conditions, c) } @@ -179,6 +191,7 @@ func checkValidation(ctx context.Context, fc *flowslatest.FlowCollector) metav1. } func (s *Manager) ForComponent(cpnt ComponentName) Instance { + s.setUnknown(cpnt) return Instance{cpnt: cpnt, s: s} } @@ -187,6 +200,14 @@ type Instance struct { s *Manager } +func (i *Instance) Get() ComponentStatus { + s := i.s.getStatus(i.cpnt) + if s != nil { + return *s + } + return ComponentStatus{Name: i.cpnt, Status: StatusUnknown} +} + func (i *Instance) SetReady() { i.s.setReady(i.cpnt) } @@ -199,30 +220,37 @@ func (i *Instance) SetUnused(message string) { i.s.setUnused(i.cpnt, message) } +// CheckDeploymentProgress sets the status either as In Progress, or Ready. func (i *Instance) CheckDeploymentProgress(d *appsv1.Deployment) { - // TODO (when legacy controller is broken down into individual controllers) - // this should set the status as Ready when replicas match if d == nil { i.s.setInProgress(i.cpnt, "DeploymentNotCreated", "Deployment not created") - } else { - for _, c := range d.Status.Conditions { - if c.Type == appsv1.DeploymentAvailable { - if c.Status != v1.ConditionTrue { - i.s.setInProgress(i.cpnt, "DeploymentNotReady", fmt.Sprintf("Deployment %s not ready: %d/%d (%s)", d.Name, d.Status.UpdatedReplicas, d.Status.Replicas, c.Message)) - } - return + return + } + for _, c := range d.Status.Conditions { + if c.Type == appsv1.DeploymentAvailable { + if c.Status != v1.ConditionTrue { + i.s.setInProgress(i.cpnt, "DeploymentNotReady", fmt.Sprintf("Deployment %s not ready: %d/%d (%s)", d.Name, d.Status.UpdatedReplicas, d.Status.Replicas, c.Message)) + } else { + i.s.setReady(i.cpnt) } + return } } + if d.Status.UpdatedReplicas == d.Status.Replicas { + i.s.setReady(i.cpnt) + } else { + i.s.setInProgress(i.cpnt, "DeploymentNotReady", fmt.Sprintf("Deployment %s not ready: %d/%d (missing condition)", d.Name, d.Status.UpdatedReplicas, d.Status.Replicas)) + } } +// CheckDaemonSetProgress sets the status either as In Progress, or Ready. func (i *Instance) CheckDaemonSetProgress(ds *appsv1.DaemonSet) { - // TODO (when legacy controller is broken down into individual controllers) - // this should set the status as Ready when replicas match if ds == nil { i.s.setInProgress(i.cpnt, "DaemonSetNotCreated", "DaemonSet not created") } else if ds.Status.UpdatedNumberScheduled < ds.Status.DesiredNumberScheduled { i.s.setInProgress(i.cpnt, "DaemonSetNotReady", fmt.Sprintf("DaemonSet %s not ready: %d/%d", ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)) + } else { + i.s.setReady(i.cpnt) } } @@ -234,10 +262,18 @@ func (i *Instance) SetCreatingDaemonSet(ds *appsv1.DaemonSet) { i.s.setInProgress(i.cpnt, "CreatingDaemonSet", fmt.Sprintf("Creating daemon set %s", ds.Name)) } +func (i *Instance) SetNotReady(reason, message string) { + i.s.setInProgress(i.cpnt, reason, message) +} + func (i *Instance) SetFailure(reason, message string) { i.s.setFailure(i.cpnt, reason, message) } +func (i *Instance) SetDegraded(reason, message string) { + i.s.setDegraded(i.cpnt, reason, message) +} + func (i *Instance) Error(reason string, err error) error { i.SetFailure(reason, err.Error()) return err diff --git a/internal/pkg/manager/status/status_manager_test.go b/internal/pkg/manager/status/status_manager_test.go index 621ccc0c05..e5dd27b886 100644 --- a/internal/pkg/manager/status/status_manager_test.go +++ b/internal/pkg/manager/status/status_manager_test.go @@ -1,6 +1,7 @@ package status import ( + "slices" "testing" "github.com/stretchr/testify/assert" @@ -10,20 +11,18 @@ import ( func TestStatusWorkflow(t *testing.T) { s := NewManager() - sl := s.ForComponent(FlowCollectorLegacy) + sl := s.ForComponent(FlowCollectorController) sm := s.ForComponent(Monitoring) - sl.SetReady() // temporary until controllers are broken down sl.SetCreatingDaemonSet(&appsv1.DaemonSet{ObjectMeta: metav1.ObjectMeta{Name: "test"}}) sm.SetFailure("AnError", "bad one") conds := s.getConditions() - assert.Len(t, conds, 4) + assertHasConditionTypes(t, conds, []string{"Ready", "WaitingFlowCollectorController", "WaitingMonitoring"}) assertHasCondition(t, conds, "Ready", "Failure", metav1.ConditionFalse) - assertHasCondition(t, conds, "WaitingFlowCollectorLegacy", "CreatingDaemonSet", metav1.ConditionTrue) + assertHasCondition(t, conds, "WaitingFlowCollectorController", "CreatingDaemonSet", metav1.ConditionTrue) assertHasCondition(t, conds, "WaitingMonitoring", "AnError", metav1.ConditionTrue) - sl.SetReady() // temporary until controllers are broken down sl.CheckDaemonSetProgress(&appsv1.DaemonSet{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Status: appsv1.DaemonSetStatus{ DesiredNumberScheduled: 3, UpdatedNumberScheduled: 1, @@ -31,12 +30,11 @@ func TestStatusWorkflow(t *testing.T) { sm.SetUnknown() conds = s.getConditions() - assert.Len(t, conds, 4) + assertHasConditionTypes(t, conds, []string{"Ready", "WaitingFlowCollectorController", "WaitingMonitoring"}) assertHasCondition(t, conds, "Ready", "Pending", metav1.ConditionFalse) - assertHasCondition(t, conds, "WaitingFlowCollectorLegacy", "DaemonSetNotReady", metav1.ConditionTrue) + assertHasCondition(t, conds, "WaitingFlowCollectorController", "DaemonSetNotReady", metav1.ConditionTrue) assertHasCondition(t, conds, "WaitingMonitoring", "Unused", metav1.ConditionUnknown) - sl.SetReady() // temporary until controllers are broken down sl.CheckDaemonSetProgress(&appsv1.DaemonSet{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Status: appsv1.DaemonSetStatus{ DesiredNumberScheduled: 3, UpdatedNumberScheduled: 3, @@ -44,12 +42,11 @@ func TestStatusWorkflow(t *testing.T) { sm.SetUnused("message") conds = s.getConditions() - assert.Len(t, conds, 4) + assertHasConditionTypes(t, conds, []string{"Ready", "WaitingFlowCollectorController", "WaitingMonitoring"}) assertHasCondition(t, conds, "Ready", "Ready", metav1.ConditionTrue) - assertHasCondition(t, conds, "WaitingFlowCollectorLegacy", "Ready", metav1.ConditionFalse) + assertHasCondition(t, conds, "WaitingFlowCollectorController", "Ready", metav1.ConditionFalse) assertHasCondition(t, conds, "WaitingMonitoring", "ComponentUnused", metav1.ConditionUnknown) - sl.SetReady() // temporary until controllers are broken down sl.CheckDeploymentProgress(&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Status: appsv1.DeploymentStatus{ UpdatedReplicas: 2, Replicas: 2, @@ -57,9 +54,9 @@ func TestStatusWorkflow(t *testing.T) { sm.SetReady() conds = s.getConditions() - assert.Len(t, conds, 4) + assertHasConditionTypes(t, conds, []string{"Ready", "WaitingFlowCollectorController", "WaitingMonitoring"}) assertHasCondition(t, conds, "Ready", "Ready", metav1.ConditionTrue) - assertHasCondition(t, conds, "WaitingFlowCollectorLegacy", "Ready", metav1.ConditionFalse) + assertHasCondition(t, conds, "WaitingFlowCollectorController", "Ready", metav1.ConditionFalse) assertHasCondition(t, conds, "WaitingMonitoring", "Ready", metav1.ConditionFalse) } @@ -73,3 +70,12 @@ func assertHasCondition(t *testing.T, conditions []metav1.Condition, searchType, } assert.Fail(t, "Condition type not found", searchType, conditions) } + +func assertHasConditionTypes(t *testing.T, conditions []metav1.Condition, expectedTypes []string) { + var types []string + for _, c := range conditions { + types = append(types, c.Type) + } + slices.Sort(types) + assert.Equal(t, expectedTypes, types) +} diff --git a/internal/pkg/manager/status/statuses.go b/internal/pkg/manager/status/statuses.go index de2caba568..9be8acef3f 100644 --- a/internal/pkg/manager/status/statuses.go +++ b/internal/pkg/manager/status/statuses.go @@ -11,33 +11,34 @@ const ( StatusInProgress Status = "InProgress" StatusReady Status = "Ready" StatusFailure Status = "Failure" + StatusDegraded Status = "Degraded" ) type ComponentStatus struct { - name ComponentName - status Status - reason string - message string + Name ComponentName + Status Status + Reason string + Message string } func (s *ComponentStatus) toCondition() metav1.Condition { c := metav1.Condition{ - Type: "Waiting" + string(s.name), - Message: s.message, + Type: "Waiting" + string(s.Name), + Message: s.Message, } - switch s.status { + switch s.Status { case StatusUnknown: c.Status = metav1.ConditionUnknown c.Reason = "Unused" - case StatusFailure, StatusInProgress: + case StatusFailure, StatusInProgress, StatusDegraded: c.Status = metav1.ConditionTrue c.Reason = "NotReady" case StatusReady: c.Status = metav1.ConditionFalse c.Reason = "Ready" } - if s.reason != "" { - c.Reason = s.reason + if s.Reason != "" { + c.Reason = s.Reason } return c }