@@ -12,12 +12,16 @@ import (
1212 corev1 "k8s.io/api/core/v1"
1313 ctrl "sigs.k8s.io/controller-runtime"
1414 "sigs.k8s.io/controller-runtime/pkg/client"
15+ "sigs.k8s.io/controller-runtime/pkg/controller"
1516 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1617 "sigs.k8s.io/controller-runtime/pkg/handler"
1718 "sigs.k8s.io/controller-runtime/pkg/log"
19+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
20+ "sigs.k8s.io/controller-runtime/pkg/source"
1821
1922 flowslatest "github.com/netobserv/network-observability-operator/api/flowcollector/v1beta2"
2023 "github.com/netobserv/network-observability-operator/internal/controller/consoleplugin"
24+ "github.com/netobserv/network-observability-operator/internal/controller/constants"
2125 "github.com/netobserv/network-observability-operator/internal/controller/ebpf"
2226 "github.com/netobserv/network-observability-operator/internal/controller/loki"
2327 "github.com/netobserv/network-observability-operator/internal/controller/reconcilers"
@@ -35,9 +39,11 @@ const (
3539// FlowCollectorReconciler reconciles a FlowCollector object
3640type FlowCollectorReconciler struct {
3741 client.Client
38- mgr * manager.Manager
39- status status.Instance
40- watcher * watchers.Watcher
42+ mgr * manager.Manager
43+ status status.Instance
44+ watcher * watchers.Watcher
45+ ctrl controller.Controller
46+ lokiWatcherStarted bool
4147}
4248
4349func Start (ctx context.Context , mgr * manager.Manager ) (manager.PostCreateHook , error ) {
@@ -66,15 +72,23 @@ func Start(ctx context.Context, mgr *manager.Manager) (manager.PostCreateHook, e
6672 builder .Owns (& osv1.ConsolePlugin {}, reconcilers .UpdateOrDeleteOnlyPred )
6773 }
6874
69- if mgr .ClusterInfo .HasLokiStack () {
70- builder .Watches (& lokiv1.LokiStack {}, & handler.EnqueueRequestForObject {})
75+ if mgr .ClusterInfo .HasLokiStack (ctx ) {
76+ builder .Watches (
77+ & lokiv1.LokiStack {},
78+ handler .EnqueueRequestsFromMapFunc (func (_ context.Context , _ client.Object ) []ctrl.Request {
79+ // When a LokiStack changes, trigger reconcile of the FlowCollector
80+ return []ctrl.Request {{NamespacedName : constants .FlowCollectorName }}
81+ }),
82+ )
83+ r .lokiWatcherStarted = true
7184 log .Info ("LokiStack CRD detected" )
7285 }
7386
7487 ctrl , err := builder .Build (& r )
7588 if err != nil {
7689 return nil , err
7790 }
91+ r .ctrl = ctrl
7892 r .watcher = watchers .NewWatcher (ctrl )
7993
8094 return nil , nil
@@ -102,6 +116,14 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, _ ctrl.Request)
102116 return ctrl.Result {}, nil
103117 }
104118
119+ // Dynamically start LokiStack watcher if the API became available
120+ if desired .Spec .Loki .Mode == flowslatest .LokiModeLokiStack {
121+ if err := r .ensureLokiStackWatcher (ctx ); err != nil {
122+ l .Error (err , "Failed to start LokiStack watcher" )
123+ // Don't fail reconciliation, just log the error
124+ }
125+ }
126+
105127 // At the moment, status workflow is to start as ready then degrade if necessary
106128 // Later (when legacy controller is broken down into individual controllers), status should start as unknown and only on success finishes as ready
107129 r .status .SetReady ()
@@ -120,6 +142,36 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, _ ctrl.Request)
120142 return ctrl.Result {}, nil
121143}
122144
145+ func (r * FlowCollectorReconciler ) ensureLokiStackWatcher (ctx context.Context ) error {
146+ // Check if LokiStack API is available but watcher not started
147+ if ! r .mgr .ClusterInfo .HasLokiStack (ctx ) {
148+ return nil
149+ }
150+
151+ if r .lokiWatcherStarted {
152+ return nil
153+ }
154+
155+ // LokiStack API is now available, start the watcher
156+ log := log .FromContext (ctx )
157+ log .Info ("LokiStack CRD detected after startup, starting watcher" )
158+
159+ h := handler .TypedEnqueueRequestsFromMapFunc (func (_ context.Context , _ * lokiv1.LokiStack ) []reconcile.Request {
160+ // When a LokiStack changes, trigger reconcile of the FlowCollector
161+ return []reconcile.Request {{NamespacedName : constants .FlowCollectorName }}
162+ })
163+
164+ src := source .Kind (r .mgr .GetCache (), & lokiv1.LokiStack {}, h )
165+ err := r .ctrl .Watch (src )
166+ if err != nil {
167+ return fmt .Errorf ("failed to start LokiStack watcher: %w" , err )
168+ }
169+
170+ r .lokiWatcherStarted = true
171+ log .Info ("LokiStack watcher started successfully" )
172+ return nil
173+ }
174+
123175func (r * FlowCollectorReconciler ) reconcile (ctx context.Context , clh * helper.Client , desired * flowslatest.FlowCollector ) error {
124176 ns := desired .Spec .GetNamespace ()
125177 previousNamespace := r .status .GetDeployedNamespace (desired )
0 commit comments