diff --git a/cmd/filter/main.go b/cmd/filter/main.go new file mode 100644 index 000000000..c689d640d --- /dev/null +++ b/cmd/filter/main.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "os" + + "knative.dev/pkg/injection" + "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/signals" + + "knative.dev/eventing-natss/pkg/broker/filter" +) + +func main() { + component := "natsjs-broker-filter" + + ctx := signals.NewContext() + ns := os.Getenv("NAMESPACE") + if ns != "" { + ctx = injection.WithNamespaceScope(ctx, ns) + } + + sharedmain.MainWithContext(ctx, component, filter.NewController) +} diff --git a/pkg/broker/filter/consumer.go b/pkg/broker/filter/consumer.go new file mode 100644 index 000000000..c2983eaa3 --- /dev/null +++ b/pkg/broker/filter/consumer.go @@ -0,0 +1,346 @@ +/* +Copyright 2026 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/nats-io/nats.go" + "go.uber.org/zap" + + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/logging" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/auth" + "knative.dev/eventing/pkg/eventingtls" + "knative.dev/eventing/pkg/kncloudevents" + + brokerutils "knative.dev/eventing-natss/pkg/broker/utils" +) + +const ( + // DefaultFetchBatchSize is the default number of messages to fetch in each batch + DefaultFetchBatchSize = 10 + // DefaultFetchTimeout is the default timeout for fetching messages + DefaultFetchTimeout = 200 * time.Millisecond +) + +// ConsumerManagerConfig holds configuration for the ConsumerManager +type ConsumerManagerConfig struct { + // FetchBatchSize is the number of messages to fetch in each batch. + // Defaults to DefaultFetchBatchSize if not set. + FetchBatchSize int + + // FetchTimeout is the timeout for fetching messages. + // Defaults to DefaultFetchTimeout if not set. + FetchTimeout time.Duration +} + +// ConsumerManager manages JetStream consumer subscriptions for triggers +type ConsumerManager struct { + logger *zap.SugaredLogger + ctx context.Context + + js nats.JetStreamContext + conn *nats.Conn + + // Configuration + fetchBatchSize int + fetchTimeout time.Duration + + // Event dispatcher + dispatcher *kncloudevents.Dispatcher + + // Map of trigger UID to subscription + subscriptions map[string]*TriggerSubscription + mu sync.RWMutex +} + +// TriggerSubscription holds the subscription and handler for a trigger +type TriggerSubscription struct { + trigger *eventingv1.Trigger + subscription *nats.Subscription + handler *TriggerHandler + streamName string + consumerName string + cancel context.CancelFunc +} + +// NewConsumerManager creates a new consumer manager +func NewConsumerManager(ctx context.Context, conn *nats.Conn, js nats.JetStreamContext, config *ConsumerManagerConfig) *ConsumerManager { + // Create OIDC token provider and dispatcher + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(eventingtls.ClientConfig{}, oidcTokenProvider) + + // Apply defaults + fetchBatchSize := DefaultFetchBatchSize + fetchTimeout := DefaultFetchTimeout + + if config != nil { + if config.FetchBatchSize > 0 { + fetchBatchSize = config.FetchBatchSize + } + if config.FetchTimeout > 0 { + fetchTimeout = config.FetchTimeout + } + } + + return &ConsumerManager{ + logger: logging.FromContext(ctx), + ctx: ctx, + js: js, + conn: conn, + fetchBatchSize: fetchBatchSize, + fetchTimeout: fetchTimeout, + dispatcher: dispatcher, + subscriptions: make(map[string]*TriggerSubscription), + } +} + +// SubscribeTrigger creates a pull-based subscription for a trigger's consumer +func (m *ConsumerManager) SubscribeTrigger( + trigger *eventingv1.Trigger, + broker *eventingv1.Broker, + subscriber duckv1.Addressable, + brokerIngressURL *duckv1.Addressable, + deadLetterSink *duckv1.Addressable, + retryConfig *kncloudevents.RetryConfig, + noRetryConfig *kncloudevents.RetryConfig, +) error { + m.mu.Lock() + defer m.mu.Unlock() + + triggerUID := string(trigger.UID) + logger := m.logger.With( + zap.String("trigger", trigger.Name), + zap.String("namespace", trigger.Namespace), + zap.String("trigger_uid", triggerUID), + ) + + // Check if we already have a subscription for this trigger + if existing, ok := m.subscriptions[triggerUID]; ok { + existing.handler.noRetryConfig = noRetryConfig + existing.handler.retryConfig = retryConfig + existing.handler.filter = buildTriggerFilter(logger, trigger) + existing.handler.deadLetterSink = deadLetterSink + + // Check if configuration has changed + if existing.handler.subscriber.URL.String() == subscriber.URL.String() { + logger.Debugw("trigger subscription already exists and is up to date") + return nil + } + // Configuration changed, unsubscribe and re-subscribe + logger.Infow("trigger configuration changed, re-subscribing") + if err := m.unsubscribeLocked(triggerUID); err != nil { + logger.Warnw("failed to unsubscribe old trigger subscription", zap.Error(err)) + } + } + + // Create the trigger handler + handler, err := NewTriggerHandler( + m.ctx, + trigger, + subscriber, + brokerIngressURL, + deadLetterSink, + retryConfig, + noRetryConfig, + m.dispatcher, + ) + if err != nil { + return fmt.Errorf("failed to create trigger handler: %w", err) + } + + // Derive stream and consumer names + streamName := brokerutils.BrokerStreamName(broker) + consumerName := brokerutils.TriggerConsumerName(triggerUID) + + // Get consumer info (also verifies consumer exists) + consumerInfo, err := m.js.ConsumerInfo(streamName, consumerName) + if err != nil { + handler.Cleanup() + if errors.Is(err, nats.ErrConsumerNotFound) { + return fmt.Errorf("consumer %s not found in stream %s: trigger controller may not have reconciled yet", consumerName, streamName) + } + return fmt.Errorf("failed to get consumer info: %w", err) + } + + // Get the filter subject from the consumer's configuration + filterSubject := brokerutils.BrokerPublishSubjectName(broker.Namespace, broker.Name) + ".>" + + logger.Infow("creating pull subscription for trigger consumer", + zap.String("stream", streamName), + zap.String("consumer", consumerName), + zap.String("filter_subject", filterSubject), + ) + + // Create pull subscription bound to the existing consumer + sub, err := m.js.PullSubscribe( + filterSubject, + consumerName, + nats.Bind(streamName, consumerName), + ) + if err != nil { + handler.Cleanup() + return fmt.Errorf("failed to create pull subscription: %w", err) + } + + // Set subscription and consumer info on handler + handler.subscription = sub + handler.consumer = consumerInfo + + // Create cancellable context for the fetch loop + ctx, cancel := context.WithCancel(m.ctx) + + // Store the subscription + m.subscriptions[triggerUID] = &TriggerSubscription{ + trigger: trigger, + subscription: sub, + handler: handler, + streamName: streamName, + consumerName: consumerName, + cancel: cancel, + } + + // Start the message fetch loop + go m.fetchLoop(ctx, sub, handler, logger) + + logger.Infow("successfully started pull subscription for trigger consumer") + return nil +} + +// fetchLoop continuously fetches messages from the pull consumer +func (m *ConsumerManager) fetchLoop( + ctx context.Context, + sub *nats.Subscription, + handler *TriggerHandler, + logger *zap.SugaredLogger, +) { + for { + select { + case <-ctx.Done(): + logger.Debugw("fetch loop stopped") + return + default: + // Fetch a batch of messages + msgs, err := sub.Fetch(m.fetchBatchSize, nats.MaxWait(m.fetchTimeout)) + if err != nil { + if errors.Is(err, nats.ErrTimeout) { + // No messages available, continue polling + continue + } + if errors.Is(err, nats.ErrConnectionClosed) || errors.Is(err, nats.ErrConsumerDeleted) || errors.Is(err, nats.ErrBadSubscription) { + logger.Warnw("subscription closed, stopping fetch loop", zap.Error(err)) + return + } + if errors.Is(err, context.Canceled) { + return + } + logger.Errorw("error fetching messages", zap.Error(err)) + // Back off on errors + time.Sleep(200 * time.Millisecond) + continue + } + + // Process fetched messages + for _, msg := range msgs { + handler.HandleMessage(msg) + } + } + } +} + +// UnsubscribeTrigger removes a subscription for a trigger +func (m *ConsumerManager) UnsubscribeTrigger(triggerUID string) error { + m.mu.Lock() + defer m.mu.Unlock() + + return m.unsubscribeLocked(triggerUID) +} + +// unsubscribeLocked removes a subscription (must be called with lock held) +func (m *ConsumerManager) unsubscribeLocked(triggerUID string) error { + sub, ok := m.subscriptions[triggerUID] + if !ok { + return nil + } + + logger := m.logger.With( + zap.String("trigger", sub.trigger.Name), + zap.String("namespace", sub.trigger.Namespace), + ) + + logger.Infow("unsubscribing from trigger consumer") + + // Cancel the fetch loop first + if sub.cancel != nil { + sub.cancel() + } + + // Unsubscribe from the pull consumer + if err := sub.subscription.Unsubscribe(); err != nil { + logger.Warnw("failed to unsubscribe", zap.Error(err)) + } + + // Cleanup the handler + sub.handler.Cleanup() + + // Remove from map + delete(m.subscriptions, triggerUID) + + return nil +} + +// Close closes all subscriptions +func (m *ConsumerManager) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + + m.logger.Infow("closing consumer manager", zap.Int("subscription_count", len(m.subscriptions))) + + var errs []error + for uid := range m.subscriptions { + if err := m.unsubscribeLocked(uid); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return fmt.Errorf("errors closing subscriptions: %v", errs) + } + return nil +} + +// GetSubscriptionCount returns the number of active subscriptions +func (m *ConsumerManager) GetSubscriptionCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.subscriptions) +} + +// HasSubscription checks if a subscription exists for a trigger +func (m *ConsumerManager) HasSubscription(triggerUID string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + _, ok := m.subscriptions[triggerUID] + return ok +} diff --git a/pkg/broker/filter/consumer_test.go b/pkg/broker/filter/consumer_test.go new file mode 100644 index 000000000..c9a235103 --- /dev/null +++ b/pkg/broker/filter/consumer_test.go @@ -0,0 +1,195 @@ +/* +Copyright 2026 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "context" + "fmt" + "testing" + "time" + + "knative.dev/pkg/logging" +) + +func TestConsumerManagerConfigDefaults(t *testing.T) { + // Verify default values + if DefaultFetchBatchSize != 10 { + t.Errorf("DefaultFetchBatchSize = %v, want 10", DefaultFetchBatchSize) + } + + if DefaultFetchTimeout != 200*time.Millisecond { + t.Errorf("DefaultFetchTimeout = %v, want 500ms", DefaultFetchTimeout) + } +} + +func TestConsumerManagerConfig(t *testing.T) { + tests := []struct { + name string + config *ConsumerManagerConfig + wantFetchBatchSize int + wantFetchTimeout time.Duration + }{ + { + name: "nil config uses defaults", + config: nil, + wantFetchBatchSize: DefaultFetchBatchSize, + wantFetchTimeout: DefaultFetchTimeout, + }, + { + name: "empty config uses defaults", + config: &ConsumerManagerConfig{}, + wantFetchBatchSize: DefaultFetchBatchSize, + wantFetchTimeout: DefaultFetchTimeout, + }, + { + name: "zero values use defaults", + config: &ConsumerManagerConfig{ + FetchBatchSize: 0, + FetchTimeout: 0, + }, + wantFetchBatchSize: DefaultFetchBatchSize, + wantFetchTimeout: DefaultFetchTimeout, + }, + { + name: "custom batch size only", + config: &ConsumerManagerConfig{ + FetchBatchSize: 20, + FetchTimeout: 0, + }, + wantFetchBatchSize: 20, + wantFetchTimeout: DefaultFetchTimeout, + }, + { + name: "custom timeout only", + config: &ConsumerManagerConfig{ + FetchBatchSize: 0, + FetchTimeout: 1 * time.Second, + }, + wantFetchBatchSize: DefaultFetchBatchSize, + wantFetchTimeout: 1 * time.Second, + }, + { + name: "both custom values", + config: &ConsumerManagerConfig{ + FetchBatchSize: 50, + FetchTimeout: 2 * time.Second, + }, + wantFetchBatchSize: 50, + wantFetchTimeout: 2 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // We can't easily test NewConsumerManager without a real NATS connection, + // so we test the config application logic directly + fetchBatchSize := DefaultFetchBatchSize + fetchTimeout := DefaultFetchTimeout + + if tt.config != nil { + if tt.config.FetchBatchSize > 0 { + fetchBatchSize = tt.config.FetchBatchSize + } + if tt.config.FetchTimeout > 0 { + fetchTimeout = tt.config.FetchTimeout + } + } + + if fetchBatchSize != tt.wantFetchBatchSize { + t.Errorf("fetchBatchSize = %v, want %v", fetchBatchSize, tt.wantFetchBatchSize) + } + + if fetchTimeout != tt.wantFetchTimeout { + t.Errorf("fetchTimeout = %v, want %v", fetchTimeout, tt.wantFetchTimeout) + } + }) + } +} + +func TestGetSubscriptionCount(t *testing.T) { + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + tests := []struct { + name string + count int + }{ + {"empty map", 0}, + {"one entry", 1}, + {"three entries", 3}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cm := &ConsumerManager{ + logger: logging.FromContext(ctx), + subscriptions: make(map[string]*TriggerSubscription), + } + for i := 0; i < tc.count; i++ { + uid := fmt.Sprintf("uid-%d", i) + cm.subscriptions[uid] = &TriggerSubscription{} + } + if got := cm.GetSubscriptionCount(); got != tc.count { + t.Errorf("GetSubscriptionCount() = %d, want %d", got, tc.count) + } + }) + } +} + +func TestHasSubscription(t *testing.T) { + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + cm := &ConsumerManager{ + logger: logging.FromContext(ctx), + subscriptions: make(map[string]*TriggerSubscription), + } + cm.subscriptions["existing-uid"] = &TriggerSubscription{} + + if !cm.HasSubscription("existing-uid") { + t.Error("HasSubscription() = false for existing UID, want true") + } + if cm.HasSubscription("missing-uid") { + t.Error("HasSubscription() = true for missing UID, want false") + } +} + +func TestConsumerManagerClose(t *testing.T) { + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + cm := &ConsumerManager{ + logger: logging.FromContext(ctx), + subscriptions: make(map[string]*TriggerSubscription), + } + + err := cm.Close() + if err != nil { + t.Errorf("Close() unexpected error on empty subscriptions: %v", err) + } +} + +func TestUnsubscribeTrigger_NotFound(t *testing.T) { + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + cm := &ConsumerManager{ + logger: logging.FromContext(ctx), + subscriptions: make(map[string]*TriggerSubscription), + } + + err := cm.UnsubscribeTrigger("non-existent-uid") + if err != nil { + t.Errorf("UnsubscribeTrigger() unexpected error for non-existent UID: %v", err) + } +} diff --git a/pkg/broker/filter/controller.go b/pkg/broker/filter/controller.go new file mode 100644 index 000000000..c26644193 --- /dev/null +++ b/pkg/broker/filter/controller.go @@ -0,0 +1,203 @@ +/* +Copyright 2026 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "context" + "net/http" + "time" + + "github.com/kelseyhightower/envconfig" + "github.com/nats-io/nats.go" + "go.uber.org/zap" + "k8s.io/client-go/tools/cache" + + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" + triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" + eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + + "knative.dev/eventing-natss/pkg/broker/constants" + commonnats "knative.dev/eventing-natss/pkg/common/nats" +) + +type envConfig struct { + PodName string `envconfig:"POD_NAME" required:"true"` + ContainerName string `envconfig:"CONTAINER_NAME" required:"true"` + NatsURL string `envconfig:"NATS_URL" required:"true"` + FetchBatchSize int `envconfig:"CONSUMER_FETCH_BATCH_SIZE" default:"0"` + FetchTimeout time.Duration `envconfig:"CONSUMER_FETCH_TIMEOUT" default:"0"` +} + +// NewController creates a new filter controller +func NewController(ctx context.Context, _ configmap.Watcher) *controller.Impl { + logger := logging.FromContext(ctx) + + env := &envConfig{} + if err := envconfig.Process("", env); err != nil { + logger.Fatalw("Failed to process environment variables", zap.Error(err)) + } + + // Create NATS connection using URL from environment variable + natsConn, err := commonnats.NewNatsConnFromURL(ctx, env.NatsURL) + if err != nil { + logger.Fatalw("Failed to create NATS connection", zap.Error(err)) + } + + // Create JetStream context + js, err := natsConn.JetStream() + if err != nil { + logger.Fatalw("Failed to create JetStream context", zap.Error(err)) + } + + // Get informers + triggerInformer := triggerinformer.Get(ctx) + brokerInformer := brokerinformer.Get(ctx) + + // Create consumer manager with optional configuration from environment + consumerConfig := &ConsumerManagerConfig{ + FetchBatchSize: env.FetchBatchSize, + FetchTimeout: env.FetchTimeout, + } + consumerManager := NewConsumerManager(ctx, natsConn, js, consumerConfig) + + // Create filter reconciler + reconciler := NewFilterReconciler( + ctx, + triggerInformer.Lister(), + brokerInformer.Lister(), + consumerManager, + ) + + // Create a simple controller impl - we don't use the standard reconciler pattern + // because we handle events directly via informer handlers + impl := controller.NewContext(ctx, &noopReconciler{}, controller.ControllerOptions{ + WorkQueueName: "NatsJetStreamBrokerFilter", + Logger: logger, + }) + + // Set up event handlers for Trigger resources + triggerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: filterTriggersByBrokerClass(brokerInformer.Lister()), + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + trigger := obj.(*eventingv1.Trigger) + if err := reconciler.ReconcileTrigger(ctx, trigger); err != nil { + logger.Errorw("Failed to reconcile trigger on add", zap.Error(err), + zap.String("trigger", trigger.Name), + zap.String("namespace", trigger.Namespace)) + } + }, + UpdateFunc: func(_, newObj interface{}) { + trigger := newObj.(*eventingv1.Trigger) + if err := reconciler.ReconcileTrigger(ctx, trigger); err != nil { + logger.Errorw("Failed to reconcile trigger on update", zap.Error(err), + zap.String("trigger", trigger.Name), + zap.String("namespace", trigger.Namespace)) + } + }, + DeleteFunc: func(obj interface{}) { + trigger := obj.(*eventingv1.Trigger) + if err := reconciler.DeleteTrigger(string(trigger.UID)); err != nil { + logger.Errorw("Failed to delete trigger subscription", zap.Error(err), + zap.String("trigger", trigger.Name), + zap.String("namespace", trigger.Namespace)) + } + }, + }, + }) + + // Start health server in background + go startHealthServer(ctx, logger, natsConn) + + logger.Info("Filter controller initialized") + return impl +} + +// noopReconciler is a no-op reconciler since we handle events directly +type noopReconciler struct { + reconciler.LeaderAwareFuncs +} + +func (r *noopReconciler) Reconcile(ctx context.Context, key string) error { + return nil +} + +// filterTriggersByBrokerClass returns a filter function that only passes triggers +// referencing brokers of class NatsJetStreamBroker +func filterTriggersByBrokerClass(brokerLister eventinglisters.BrokerLister) func(obj interface{}) bool { + return func(obj interface{}) bool { + trigger, ok := obj.(*eventingv1.Trigger) + if !ok { + return false + } + + // Get the broker referenced by this trigger + broker, err := brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + if err != nil { + // If we can't get the broker, include the trigger anyway + // and let the reconciler handle the error + return true + } + + // Check if the broker is of class NatsJetStreamBroker + return broker.GetAnnotations()[eventingv1.BrokerClassAnnotationKey] == constants.BrokerClassName + } +} + +func startHealthServer(ctx context.Context, logger *zap.SugaredLogger, natsConn *nats.Conn) { + mux := http.NewServeMux() + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + if natsConn.IsConnected() { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte("nats disconnected")) + } + }) + mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { + if natsConn.IsConnected() { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte("nats disconnected")) + } + }) + + server := &http.Server{ + Addr: ":8080", + Handler: mux, + } + + logger.Info("Starting health server on :8080") + go func() { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Errorw("Health server error", zap.Error(err)) + } + }() + + <-ctx.Done() + logger.Info("Shutting down health server") + server.Shutdown(context.Background()) +} diff --git a/pkg/broker/filter/controller_test.go b/pkg/broker/filter/controller_test.go new file mode 100644 index 000000000..2865f6c46 --- /dev/null +++ b/pkg/broker/filter/controller_test.go @@ -0,0 +1,97 @@ +/* +Copyright 2026 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "context" + "testing" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + + "knative.dev/eventing-natss/pkg/broker/constants" +) + +func TestFilterTriggersByBrokerClass(t *testing.T) { + tests := []struct { + name string + broker *eventingv1.Broker + trigger *eventingv1.Trigger + wantFiltered bool + }{ + { + name: "trigger referencing NatsJetStreamBroker class broker", + broker: newTestBroker(testNamespace, testBrokerName, constants.BrokerClassName), + trigger: newTestTrigger(testNamespace, testTriggerName, testBrokerName), + wantFiltered: true, + }, + { + name: "trigger referencing different broker class", + broker: newTestBroker(testNamespace, testBrokerName, "OtherBrokerClass"), + trigger: newTestTrigger(testNamespace, testTriggerName, testBrokerName), + wantFiltered: false, + }, + { + name: "trigger referencing broker without class annotation", + broker: newTestBroker(testNamespace, testBrokerName, ""), + trigger: newTestTrigger(testNamespace, testTriggerName, testBrokerName), + wantFiltered: false, + }, + { + name: "trigger referencing non-existent broker passes to reconciler", + broker: nil, + trigger: newTestTrigger(testNamespace, testTriggerName, "non-existent-broker"), + wantFiltered: true, + }, + { + name: "non-trigger object", + broker: newTestBroker(testNamespace, testBrokerName, constants.BrokerClassName), + trigger: nil, + wantFiltered: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + brokerLister := newFakeBrokerLister() + if tc.broker != nil { + brokerLister.addBroker(tc.broker) + } + + filterFunc := filterTriggersByBrokerClass(brokerLister) + + var obj interface{} + if tc.trigger != nil { + obj = tc.trigger + } else { + obj = "not a trigger" + } + + got := filterFunc(obj) + if got != tc.wantFiltered { + t.Errorf("filterTriggersByBrokerClass() = %v, want %v", got, tc.wantFiltered) + } + }) + } +} + +func TestNoopReconciler(t *testing.T) { + r := &noopReconciler{} + err := r.Reconcile(context.Background(), "test-namespace/test-name") + if err != nil { + t.Errorf("noopReconciler.Reconcile() unexpected error: %v", err) + } +} diff --git a/pkg/broker/filter/handler.go b/pkg/broker/filter/handler.go new file mode 100644 index 000000000..72cd2320d --- /dev/null +++ b/pkg/broker/filter/handler.go @@ -0,0 +1,351 @@ +/* +Copyright 2026 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "context" + "net/http" + + cejs "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/cloudevents/sdk-go/v2/types" + "github.com/nats-io/nats.go" + "go.uber.org/zap" + + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/logging" + + jsutils "knative.dev/eventing-natss/pkg/channel/jetstream/utils" + "knative.dev/eventing-natss/pkg/tracing" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/eventfilter" + "knative.dev/eventing/pkg/eventfilter/attributes" + "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" + "knative.dev/eventing/pkg/kncloudevents" +) + +var retryMax int32 = 3 +var retryTimeout = "PT1S" +var retryBackoffDelay = "PT0.5S" + +// TypeExtractorTransformer extracts the CloudEvent type from a message. +// Copied from channel/jetstream/dispatcher to avoid importing that package +// which registers channel informers. +type TypeExtractorTransformer string + +func (a *TypeExtractorTransformer) Transform(reader binding.MessageMetadataReader, _ binding.MessageMetadataWriter) error { + _, ty := reader.GetAttribute(spec.Type) + if ty != nil { + tyParsed, err := types.ToString(ty) + if err != nil { + return err + } + *a = TypeExtractorTransformer(tyParsed) + } + return nil +} + +var retryDelivery = eventingduckv1.BackoffPolicyLinear +var defaultRetry, _ = kncloudevents.RetryConfigFromDeliverySpec(eventingduckv1.DeliverySpec{ + Retry: &retryMax, + Timeout: &retryTimeout, + BackoffPolicy: &retryDelivery, + BackoffDelay: &retryBackoffDelay, +}) + +// TriggerHandler handles message dispatch for a single trigger +type TriggerHandler struct { + logger *zap.SugaredLogger + ctx context.Context + + // Trigger configuration + trigger *eventingv1.Trigger + subscriber duckv1.Addressable + filter eventfilter.Filter + + // Broker ingress URL for reply events + brokerIngressURL *duckv1.Addressable + + // Dispatcher for sending events + dispatcher *kncloudevents.Dispatcher + + // Retry configuration + retryConfig *kncloudevents.RetryConfig + noRetryConfig *kncloudevents.RetryConfig + + // Dead letter sink + deadLetterSink *duckv1.Addressable + + subscription *nats.Subscription + consumer *nats.ConsumerInfo +} + +// NewTriggerHandler creates a new handler for a trigger +func NewTriggerHandler( + ctx context.Context, + trigger *eventingv1.Trigger, + subscriber duckv1.Addressable, + brokerIngressURL *duckv1.Addressable, + deadLetterSink *duckv1.Addressable, + retryConfig *kncloudevents.RetryConfig, + noRetryConfig *kncloudevents.RetryConfig, + dispatcher *kncloudevents.Dispatcher, +) (*TriggerHandler, error) { + logger := logging.FromContext(ctx).With( + zap.String("trigger", trigger.Name), + zap.String("namespace", trigger.Namespace), + ) + + return &TriggerHandler{ + logger: logger, + ctx: ctx, + trigger: trigger, + subscriber: subscriber, + filter: buildTriggerFilter(logger, trigger), + brokerIngressURL: brokerIngressURL, + dispatcher: dispatcher, + retryConfig: retryConfig, + noRetryConfig: noRetryConfig, + deadLetterSink: deadLetterSink, + }, nil +} + +// HandleMessage processes a NATS message, applies filter, and dispatches to subscriber. +// With pull-based subscriptions, this is called synchronously from the fetch loop. +func (h *TriggerHandler) HandleMessage(msg *nats.Msg) { + logger := h.logger.With(zap.String("msg_id", msg.Header.Get(nats.MsgIdHdr))) + ctx := logging.WithLogger(h.ctx, logger) + + h.doHandle(ctx, msg) +} + +// doHandle processes the message synchronously +func (h *TriggerHandler) doHandle(ctx context.Context, msg *nats.Msg) { + logger := logging.FromContext(ctx) + + // Convert NATS message to CloudEvents message + message := cejs.NewMessage(msg) + if message.ReadEncoding() == binding.EncodingUnknown { + logger.Errorw("received a message with unknown encoding") + if err := msg.Term(); err != nil { + logger.Errorw("failed to terminate message", zap.Error(err)) + } + return + } + + // Convert to CloudEvent for filtering + event, err := binding.ToEvent(ctx, message) + if err != nil { + logger.Errorw("failed to convert message to CloudEvent", zap.Error(err)) + if err := msg.Term(); err != nil { + logger.Errorw("failed to terminate message", zap.Error(err)) + } + return + } + + // Apply filter + if h.filter != nil { + filterResult := h.filter.Filter(ctx, *event) + if filterResult == eventfilter.FailFilter { + logger.Debugw("event filtered out", + zap.String("type", event.Type()), + zap.String("source", event.Source()), + ) + // Ack the message since it was intentionally filtered + if err := msg.Ack(); err != nil { + logger.Errorw("failed to ack filtered message", zap.Error(err)) + } + return + } + } + + // Dispatch to subscriber + logger.Debugw("dispatching event to subscriber", + zap.String("subscriber", h.subscriber.URL.String()), + zap.String("type", event.Type()), + zap.String("source", event.Source()), + zap.String("id", event.ID()), + ) + + dispatchInfo, err := h.dispatchEvent(ctx, event, msg) + if err != nil { + logger.Errorw("failed to dispatch event", + zap.Error(err), + zap.Int("response_code", dispatchInfo.ResponseCode), + ) + return + } + + logger.Debugw("event dispatched successfully", + zap.Int("response_code", dispatchInfo.ResponseCode), + ) +} + +// dispatchEvent sends the event to the subscriber and handles ack/nack +func (h *TriggerHandler) dispatchEvent(ctx context.Context, event *cloudevents.Event, msg *nats.Msg) (*kncloudevents.DispatchInfo, error) { + logger := logging.FromContext(ctx) + + additionalHeaders := tracing.ConvertEventToHttpHeader(event) + te := TypeExtractorTransformer("") + + // Get retry number from message metadata + retryNumber := 1 + if meta, err := msg.Metadata(); err == nil { + retryNumber = int(meta.NumDelivered) + } + + // Determine if this is the last try + maxRetries := 1 + if h.retryConfig != nil { + maxRetries = h.retryConfig.RetryMax + } + lastTry := retryNumber > maxRetries + + // Dispatch the message to trigger's destination + dispatchInfo, err := h.dispatcher.SendEvent(ctx, *event, h.subscriber, + kncloudevents.WithHeader(additionalHeaders), + kncloudevents.WithTransformers(&te), + kncloudevents.WithRetryConfig(h.noRetryConfig), + ) + + result := determineNatsResult(dispatchInfo.ResponseCode, err) + + // Handle ack/nack/term based on result + switch { + case protocol.IsACK(result): + // process reply url case first + if h.brokerIngressURL != nil { + // TODO: should we retry in-memory reply url or go with re-delivery of the message? + replyDispatchInfo, replyErr := h.dispatcher.SendEvent(ctx, *event, *h.brokerIngressURL, + kncloudevents.WithRetryConfig(&defaultRetry), + kncloudevents.WithHeader(additionalHeaders), + kncloudevents.WithTransformers(&te), + ) + if replyErr != nil { + logger.Errorw("failed to send reply to broker ingress", + zap.Error(replyErr), + zap.Int("response_code", replyDispatchInfo.ResponseCode), + ) + } + } + if err := msg.Ack(nats.Context(ctx)); err != nil { + logger.Errorw("failed to ack message", zap.Error(err)) + } + case protocol.IsNACK(result): + if lastTry { + if h.deadLetterSink != nil { + // Send to dead letter sink + dlsDispatchInfo, dlsErr := h.dispatcher.SendEvent(ctx, *event, *h.deadLetterSink, + kncloudevents.WithRetryConfig(&defaultRetry), + kncloudevents.WithHeader(additionalHeaders), + kncloudevents.WithTransformers(&te), + ) + if dlsErr != nil { + logger.Errorw("failed to send to dead letter sink", + zap.Error(dlsErr), + zap.Int("response_code", dlsDispatchInfo.ResponseCode), + ) + } + } + + // Ack after DLS attempt + if err := msg.Ack(nats.Context(ctx)); err != nil { + logger.Errorw("failed to ack message after last retry", zap.Error(err)) + } + } else { + // Nack for retry + nakDelay := jsutils.CalculateNakDelayForRetryNumber(retryNumber, h.retryConfig) + if err := msg.NakWithDelay(nakDelay, nats.Context(ctx)); err != nil { + logger.Errorw("failed to nack message", zap.Error(err)) + } + } + default: + // Terminate - non-retriable error + if lastTry && h.deadLetterSink != nil { + // Send to dead letter sink + dlsDispatchInfo, dlsErr := h.dispatcher.SendEvent(ctx, *event, *h.deadLetterSink, + kncloudevents.WithRetryConfig(&defaultRetry), + kncloudevents.WithHeader(additionalHeaders), + kncloudevents.WithTransformers(&te), + ) + if dlsErr != nil { + logger.Errorw("failed to send to dead letter sink", + zap.Error(dlsErr), + zap.Int("response_code", dlsDispatchInfo.ResponseCode), + ) + } + } + + if err := msg.Term(nats.Context(ctx)); err != nil { + logger.Errorw("failed to term message", zap.Error(err)) + } + } + + return dispatchInfo, err +} + +// Cleanup releases resources +func (h *TriggerHandler) Cleanup() { + if h.filter != nil { + h.filter.Cleanup() + } +} + +func determineNatsResult(responseCode int, err error) protocol.Result { + result := protocol.ResultACK + if err != nil { + code := responseCode + if code/100 == 5 || code == http.StatusTooManyRequests || code == http.StatusRequestTimeout { + // Retriable error, effectively this is nats protocol NACK + result = protocol.NewReceipt(false, "%w", err) + } else { + // Non-retriable error + result = err + } + } + return result +} + +// buildTriggerFilter builds a filter from the trigger spec. +// Priority: +// 1. trigger.Spec.Filters (new subscriptions API filters) - if defined +// 2. trigger.Spec.Filter (legacy attributes filter) - if defined +// 3. nil (pass all events) - if neither is defined +func buildTriggerFilter(logger *zap.SugaredLogger, trigger *eventingv1.Trigger) eventfilter.Filter { + switch { + case len(trigger.Spec.Filters) > 0: + // Use new subscriptions API filters + logger.Debugw("using subscriptions API filters", + zap.Any("filters", trigger.Spec.Filters), + ) + return subscriptionsapi.CreateSubscriptionsAPIFilters(logger.Desugar(), trigger.Spec.Filters) + case trigger.Spec.Filter != nil && trigger.Spec.Filter.Attributes != nil: + // Use legacy attributes filter + logger.Debugw("using legacy attributes filter", + zap.Any("filter", trigger.Spec.Filter), + ) + return attributes.NewAttributesFilter(trigger.Spec.Filter.Attributes) + default: + // No filter defined, pass all events + logger.Debugw("no filter defined, passing all events") + return nil + } +} diff --git a/pkg/broker/filter/handler_test.go b/pkg/broker/filter/handler_test.go new file mode 100644 index 000000000..641f59673 --- /dev/null +++ b/pkg/broker/filter/handler_test.go @@ -0,0 +1,656 @@ +/* +Copyright 2026 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "context" + "errors" + "net/http" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/logging" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/eventfilter" + "knative.dev/eventing/pkg/kncloudevents" +) + +func TestDetermineNatsResult(t *testing.T) { + tests := []struct { + name string + responseCode int + err error + wantACK bool + wantNACK bool + }{ + { + name: "success - no error", + responseCode: http.StatusOK, + err: nil, + wantACK: true, + wantNACK: false, + }, + { + name: "success - 2xx response", + responseCode: http.StatusAccepted, + err: nil, + wantACK: true, + wantNACK: false, + }, + { + name: "retriable - 500 error", + responseCode: http.StatusInternalServerError, + err: errors.New("server error"), + wantACK: false, + wantNACK: true, + }, + { + name: "retriable - 502 bad gateway", + responseCode: http.StatusBadGateway, + err: errors.New("bad gateway"), + wantACK: false, + wantNACK: true, + }, + { + name: "retriable - 503 service unavailable", + responseCode: http.StatusServiceUnavailable, + err: errors.New("service unavailable"), + wantACK: false, + wantNACK: true, + }, + { + name: "retriable - 504 gateway timeout", + responseCode: http.StatusGatewayTimeout, + err: errors.New("gateway timeout"), + wantACK: false, + wantNACK: true, + }, + { + name: "retriable - 429 too many requests", + responseCode: http.StatusTooManyRequests, + err: errors.New("too many requests"), + wantACK: false, + wantNACK: true, + }, + { + name: "retriable - 408 request timeout", + responseCode: http.StatusRequestTimeout, + err: errors.New("request timeout"), + wantACK: false, + wantNACK: true, + }, + { + name: "non-retriable - 400 bad request", + responseCode: http.StatusBadRequest, + err: errors.New("bad request"), + wantACK: false, + wantNACK: false, + }, + { + name: "non-retriable - 401 unauthorized", + responseCode: http.StatusUnauthorized, + err: errors.New("unauthorized"), + wantACK: false, + wantNACK: false, + }, + { + name: "non-retriable - 403 forbidden", + responseCode: http.StatusForbidden, + err: errors.New("forbidden"), + wantACK: false, + wantNACK: false, + }, + { + name: "non-retriable - 404 not found", + responseCode: http.StatusNotFound, + err: errors.New("not found"), + wantACK: false, + wantNACK: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := determineNatsResult(tt.responseCode, tt.err) + + isACK := protocol.IsACK(result) + isNACK := protocol.IsNACK(result) + + if isACK != tt.wantACK { + t.Errorf("IsACK() = %v, want %v", isACK, tt.wantACK) + } + + if isNACK != tt.wantNACK { + t.Errorf("IsNACK() = %v, want %v", isNACK, tt.wantNACK) + } + }) + } +} + +func TestTypeExtractorTransformer(t *testing.T) { + // TypeExtractorTransformer is a string type that implements binding.Transformer + // It extracts the CloudEvent type from a message + + te := TypeExtractorTransformer("") + + // Initial value should be empty + if string(te) != "" { + t.Errorf("Initial value = %v, want empty string", string(te)) + } +} + +func TestRetryConfigDefaults(t *testing.T) { + // Verify the default retry configuration values + if retryMax != 3 { + t.Errorf("retryMax = %v, want 3", retryMax) + } + + if retryTimeout != "PT1S" { + t.Errorf("retryTimeout = %v, want PT1S", retryTimeout) + } + + if retryBackoffDelay != "PT0.5S" { + t.Errorf("retryBackoffDelay = %v, want PT0.5S", retryBackoffDelay) + } +} + +func TestDetermineNatsResult_EdgeCases(t *testing.T) { + tests := []struct { + name string + responseCode int + err error + wantACK bool + }{ + { + name: "zero response code with no error", + responseCode: 0, + err: nil, + wantACK: true, + }, + { + name: "zero response code with error", + responseCode: 0, + err: errors.New("some error"), + wantACK: false, + }, + { + name: "1xx informational (no error)", + responseCode: 100, + err: nil, + wantACK: true, + }, + { + name: "3xx redirect with error", + responseCode: 301, + err: errors.New("redirect"), + wantACK: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := determineNatsResult(tt.responseCode, tt.err) + + isACK := protocol.IsACK(result) + if isACK != tt.wantACK { + t.Errorf("IsACK() = %v, want %v", isACK, tt.wantACK) + } + }) + } +} + +func TestBuildTriggerFilter(t *testing.T) { + logger := zap.NewNop().Sugar() + + tests := []struct { + name string + trigger *eventingv1.Trigger + wantNilFilter bool + testEvent cloudevents.Event + wantFilterPass bool + }{ + { + name: "no filter - passes all events", + trigger: &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-trigger", + Namespace: "default", + }, + Spec: eventingv1.TriggerSpec{ + Broker: "test-broker", + }, + }, + wantNilFilter: true, + wantFilterPass: true, + }, + { + name: "legacy filter only - uses attributes filter", + trigger: &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-trigger", + Namespace: "default", + }, + Spec: eventingv1.TriggerSpec{ + Broker: "test-broker", + Filter: &eventingv1.TriggerFilter{ + Attributes: map[string]string{ + "type": "test.event.type", + }, + }, + }, + }, + wantNilFilter: false, + testEvent: func() cloudevents.Event { + e := cloudevents.NewEvent() + e.SetType("test.event.type") + e.SetSource("test-source") + return e + }(), + wantFilterPass: true, + }, + { + name: "legacy filter - event does not match", + trigger: &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-trigger", + Namespace: "default", + }, + Spec: eventingv1.TriggerSpec{ + Broker: "test-broker", + Filter: &eventingv1.TriggerFilter{ + Attributes: map[string]string{ + "type": "test.event.type", + }, + }, + }, + }, + wantNilFilter: false, + testEvent: func() cloudevents.Event { + e := cloudevents.NewEvent() + e.SetType("different.type") + e.SetSource("test-source") + return e + }(), + wantFilterPass: false, + }, + { + name: "new filters take priority over legacy filter", + trigger: &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-trigger", + Namespace: "default", + }, + Spec: eventingv1.TriggerSpec{ + Broker: "test-broker", + // New filters - should take priority + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + Exact: map[string]string{ + "type": "new.filter.type", + }, + }, + }, + // Legacy filter - should be ignored + Filter: &eventingv1.TriggerFilter{ + Attributes: map[string]string{ + "type": "legacy.filter.type", + }, + }, + }, + }, + wantNilFilter: false, + testEvent: func() cloudevents.Event { + e := cloudevents.NewEvent() + e.SetType("new.filter.type") + e.SetSource("test-source") + return e + }(), + wantFilterPass: true, + }, + { + name: "new filters - event matches legacy but not new filter", + trigger: &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-trigger", + Namespace: "default", + }, + Spec: eventingv1.TriggerSpec{ + Broker: "test-broker", + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + Exact: map[string]string{ + "type": "new.filter.type", + }, + }, + }, + Filter: &eventingv1.TriggerFilter{ + Attributes: map[string]string{ + "type": "legacy.filter.type", + }, + }, + }, + }, + wantNilFilter: false, + testEvent: func() cloudevents.Event { + e := cloudevents.NewEvent() + e.SetType("legacy.filter.type") // matches legacy but not new + e.SetSource("test-source") + return e + }(), + wantFilterPass: false, // new filters take priority, so should fail + }, + { + name: "new filters with prefix", + trigger: &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-trigger", + Namespace: "default", + }, + Spec: eventingv1.TriggerSpec{ + Broker: "test-broker", + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + Prefix: map[string]string{ + "type": "test.event.", + }, + }, + }, + }, + }, + wantNilFilter: false, + testEvent: func() cloudevents.Event { + e := cloudevents.NewEvent() + e.SetType("test.event.created") + e.SetSource("test-source") + return e + }(), + wantFilterPass: true, + }, + { + name: "new filters with suffix", + trigger: &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-trigger", + Namespace: "default", + }, + Spec: eventingv1.TriggerSpec{ + Broker: "test-broker", + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + Suffix: map[string]string{ + "type": ".created", + }, + }, + }, + }, + }, + wantNilFilter: false, + testEvent: func() cloudevents.Event { + e := cloudevents.NewEvent() + e.SetType("order.created") + e.SetSource("test-source") + return e + }(), + wantFilterPass: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filter := buildTriggerFilter(logger, tt.trigger) + + if tt.wantNilFilter { + if filter != nil { + t.Errorf("expected nil filter, got %v", filter) + } + return + } + + if filter == nil { + t.Fatal("expected non-nil filter") + } + + // Test the filter with the test event + result := filter.Filter(context.Background(), tt.testEvent) + passed := result != eventfilter.FailFilter + + if passed != tt.wantFilterPass { + t.Errorf("filter result = %v (passed=%v), want passed=%v", result, passed, tt.wantFilterPass) + } + }) + } +} + +func TestBuildTriggerFilter_NilAttributes(t *testing.T) { + logger := zap.NewNop().Sugar() + + // Filter set but Attributes nil should fall through to default (nil filter). + trigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{Name: "t", Namespace: "ns"}, + Spec: eventingv1.TriggerSpec{ + Broker: "b", + Filter: &eventingv1.TriggerFilter{ + Attributes: nil, + }, + }, + } + f := buildTriggerFilter(logger, trigger) + if f != nil { + t.Error("expected nil filter when Filter is set but Attributes is nil") + } +} + +func TestBuildTriggerFilter_MultipleAttributes(t *testing.T) { + logger := zap.NewNop().Sugar() + + trigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{Name: "t", Namespace: "ns"}, + Spec: eventingv1.TriggerSpec{ + Broker: "b", + Filter: &eventingv1.TriggerFilter{ + Attributes: map[string]string{ + "type": "order.created", + "source": "shop", + }, + }, + }, + } + f := buildTriggerFilter(logger, trigger) + if f == nil { + t.Fatal("expected non-nil filter") + } + + t.Run("all attributes match", func(t *testing.T) { + e := cloudevents.NewEvent() + e.SetType("order.created") + e.SetSource("shop") + if f.Filter(context.Background(), e) == eventfilter.FailFilter { + t.Error("expected event to pass filter") + } + }) + + t.Run("partial match fails", func(t *testing.T) { + e := cloudevents.NewEvent() + e.SetType("order.created") + e.SetSource("other-shop") + if f.Filter(context.Background(), e) != eventfilter.FailFilter { + t.Error("expected event to fail filter") + } + }) +} + +func TestTypeExtractorTransformerTransform(t *testing.T) { + tests := []struct { + name string + eventType string + wantType string + }{ + { + name: "extracts type from event", + eventType: "com.example.event.v1", + wantType: "com.example.event.v1", + }, + { + name: "extracts empty type", + eventType: "", + wantType: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + e := cloudevents.NewEvent() + e.SetType(tc.eventType) + e.SetSource("test-source") + + em := binding.EventMessage(e) + + te := TypeExtractorTransformer("") + err := te.Transform(&em, nil) + if err != nil { + t.Fatalf("Transform() unexpected error: %v", err) + } + if string(te) != tc.wantType { + t.Errorf("TypeExtractorTransformer = %q, want %q", string(te), tc.wantType) + } + }) + } +} + +func TestNewTriggerHandler(t *testing.T) { + ctx := logging.WithLogger(context.Background(), zap.NewNop().Sugar()) + + trigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{Name: "test-trigger", Namespace: "test-ns"}, + Spec: eventingv1.TriggerSpec{Broker: "test-broker"}, + } + + subscriberURL, _ := apis.ParseURL("http://subscriber.example.com") + subscriber := duckv1.Addressable{URL: subscriberURL} + + handler, err := NewTriggerHandler(ctx, trigger, subscriber, nil, nil, nil, nil, nil) + if err != nil { + t.Fatalf("NewTriggerHandler() unexpected error: %v", err) + } + if handler == nil { + t.Fatal("NewTriggerHandler() returned nil handler") + } + if handler.trigger != trigger { + t.Error("handler.trigger not set correctly") + } + if handler.subscriber.URL.String() != subscriber.URL.String() { + t.Errorf("handler.subscriber URL = %v, want %v", handler.subscriber.URL, subscriber.URL) + } + if handler.filter != nil { + t.Error("handler.filter should be nil for trigger without filter") + } + if handler.brokerIngressURL != nil { + t.Error("handler.brokerIngressURL should be nil when not provided") + } + if handler.deadLetterSink != nil { + t.Error("handler.deadLetterSink should be nil when not provided") + } + if handler.retryConfig != nil { + t.Error("handler.retryConfig should be nil when not provided") + } +} + +func TestNewTriggerHandler_WithOptionalParams(t *testing.T) { + ctx := logging.WithLogger(context.Background(), zap.NewNop().Sugar()) + + trigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{Name: "test-trigger", Namespace: "test-ns"}, + Spec: eventingv1.TriggerSpec{ + Broker: "test-broker", + Filter: &eventingv1.TriggerFilter{ + Attributes: map[string]string{"type": "test.type"}, + }, + }, + } + + subscriberURL, _ := apis.ParseURL("http://subscriber.example.com") + subscriber := duckv1.Addressable{URL: subscriberURL} + + ingressURL, _ := apis.ParseURL("http://broker-ingress.example.com") + brokerIngress := &duckv1.Addressable{URL: ingressURL} + + dlsURL, _ := apis.ParseURL("http://dead-letter.example.com") + dls := &duckv1.Addressable{URL: dlsURL} + + retryConfig := &kncloudevents.RetryConfig{RetryMax: 3} + noRetryConfig := &kncloudevents.RetryConfig{RetryMax: 0} + + handler, err := NewTriggerHandler(ctx, trigger, subscriber, brokerIngress, dls, retryConfig, noRetryConfig, nil) + if err != nil { + t.Fatalf("NewTriggerHandler() unexpected error: %v", err) + } + if handler.filter == nil { + t.Error("handler.filter should not be nil for trigger with filter") + } + if handler.brokerIngressURL != brokerIngress { + t.Error("handler.brokerIngressURL not set correctly") + } + if handler.deadLetterSink != dls { + t.Error("handler.deadLetterSink not set correctly") + } + if handler.retryConfig != retryConfig { + t.Error("handler.retryConfig not set correctly") + } + if handler.noRetryConfig != noRetryConfig { + t.Error("handler.noRetryConfig not set correctly") + } +} + +func TestTriggerHandlerCleanup(t *testing.T) { + t.Run("nil filter", func(t *testing.T) { + h := &TriggerHandler{} + h.Cleanup() // should not panic + }) + + t.Run("with filter", func(t *testing.T) { + logger := zap.NewNop().Sugar() + trigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{Name: "t", Namespace: "ns"}, + Spec: eventingv1.TriggerSpec{ + Broker: "b", + Filter: &eventingv1.TriggerFilter{ + Attributes: map[string]string{"type": "test"}, + }, + }, + } + f := buildTriggerFilter(logger, trigger) + h := &TriggerHandler{filter: f} + h.Cleanup() // should call f.Cleanup() without error + }) +} + +func TestDefaultRetryConfigInitialized(t *testing.T) { + if defaultRetry.RetryMax != int(retryMax) { + t.Errorf("defaultRetry.RetryMax = %v, want %v", defaultRetry.RetryMax, retryMax) + } + if defaultRetry.Backoff == nil { + t.Error("defaultRetry.Backoff should not be nil") + } +} diff --git a/pkg/broker/filter/reconciler.go b/pkg/broker/filter/reconciler.go new file mode 100644 index 000000000..90354ef84 --- /dev/null +++ b/pkg/broker/filter/reconciler.go @@ -0,0 +1,160 @@ +/* +Copyright 2026 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "context" + "fmt" + "net/http" + "time" + + "go.uber.org/zap" + apierrs "k8s.io/apimachinery/pkg/api/errors" + + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/logging" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + "knative.dev/eventing/pkg/kncloudevents" + + "knative.dev/eventing-natss/pkg/broker/constants" +) + +// FilterReconciler reconciles triggers and manages consumer subscriptions +type FilterReconciler struct { + logger *zap.SugaredLogger + + triggerLister eventinglisters.TriggerLister + brokerLister eventinglisters.BrokerLister + + consumerManager *ConsumerManager +} + +// NewFilterReconciler creates a new filter reconciler +func NewFilterReconciler( + ctx context.Context, + triggerLister eventinglisters.TriggerLister, + brokerLister eventinglisters.BrokerLister, + consumerManager *ConsumerManager, +) *FilterReconciler { + return &FilterReconciler{ + logger: logging.FromContext(ctx), + triggerLister: triggerLister, + brokerLister: brokerLister, + consumerManager: consumerManager, + } +} + +// ReconcileTrigger reconciles a trigger to ensure the filter has a subscription +func (r *FilterReconciler) ReconcileTrigger(ctx context.Context, trigger *eventingv1.Trigger) error { + logger := r.logger.With( + zap.String("trigger", trigger.Name), + zap.String("namespace", trigger.Namespace), + ) + + // Get the broker + broker, err := r.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + if err != nil { + if apierrs.IsNotFound(err) { + logger.Debugw("broker not found, skipping trigger") + return nil + } + return fmt.Errorf("failed to get broker: %w", err) + } + + // Check broker class + if broker.GetAnnotations()[eventingv1.BrokerClassAnnotationKey] != constants.BrokerClassName { + logger.Debugw("broker is not NatsJetStreamBroker, skipping") + return nil + } + + // Check if broker is ready + if !broker.IsReady() { + logger.Debugw("broker is not ready, skipping trigger") + return nil + } + + // Check if trigger is ready + if trigger.Status.SubscriberURI == nil { + logger.Debugw("trigger subscriber URI not resolved yet, skipping") + return nil + } + + // Build subscriber addressable from trigger status + subscriber := duckv1.Addressable{URL: trigger.Status.SubscriberURI} + + // Get broker ingress URL for reply events + var brokerIngressURL *duckv1.Addressable + if broker.Status.Address != nil && broker.Status.Address.URL != nil { + brokerIngressURL = &duckv1.Addressable{URL: broker.Status.Address.URL.DeepCopy()} + } + + // Get dead letter sink if configured + var deadLetterSink *duckv1.Addressable + if trigger.Status.DeadLetterSinkURI != nil { + deadLetterSink = &duckv1.Addressable{URL: trigger.Status.DeadLetterSinkURI.DeepCopy()} + } + + // Build retry config from trigger delivery spec + var retryConfig *kncloudevents.RetryConfig + if trigger.Spec.Delivery != nil { + config, err := kncloudevents.RetryConfigFromDeliverySpec(*trigger.Spec.Delivery) + if err != nil { + logger.Warnw("failed to build retry config from delivery spec", zap.Error(err)) + } else { + retryConfig = &config + } + } + + // Build no-retry config (JetStream handles retries via redelivery) + var requestTimeout time.Duration + if retryConfig != nil { + requestTimeout = retryConfig.RequestTimeout + } + var noRetryConfig = kncloudevents.RetryConfig{ + RetryMax: 0, + CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) { + return false, nil + }, + Backoff: func(attemptNum int, resp *http.Response) time.Duration { + return 0 + }, + RequestTimeout: requestTimeout, + } + + // Subscribe to the trigger's consumer + err = r.consumerManager.SubscribeTrigger( + trigger, + broker, + subscriber, + brokerIngressURL, + deadLetterSink, + retryConfig, + &noRetryConfig, + ) + if err != nil { + return fmt.Errorf("failed to subscribe to trigger: %w", err) + } + + return nil +} + +// DeleteTrigger removes the subscription for a deleted trigger +func (r *FilterReconciler) DeleteTrigger(triggerUID string) error { + return r.consumerManager.UnsubscribeTrigger(triggerUID) +} diff --git a/pkg/broker/filter/reconciler_test.go b/pkg/broker/filter/reconciler_test.go new file mode 100644 index 000000000..ce36ecd1a --- /dev/null +++ b/pkg/broker/filter/reconciler_test.go @@ -0,0 +1,509 @@ +/* +Copyright 2026 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/nats-io/nats.go" + corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/logging" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + + "knative.dev/eventing-natss/pkg/broker/constants" +) + +const ( + testNamespace = "test-namespace" + testBrokerName = "test-broker" + testTriggerName = "test-trigger" + testTriggerUID = "test-trigger-uid-12345" +) + +// newTestBroker creates a test broker with the given class annotation. +func newTestBroker(namespace, name, brokerClass string) *eventingv1.Broker { + annotations := make(map[string]string) + if brokerClass != "" { + annotations[eventingv1.BrokerClassAnnotationKey] = brokerClass + } + return &eventingv1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + Annotations: annotations, + }, + } +} + +// newReadyTestBroker creates a test broker that reports as ready. +func newReadyTestBroker(namespace, name, brokerClass string) *eventingv1.Broker { + b := newTestBroker(namespace, name, brokerClass) + // IsReady checks ObservedGeneration == Generation and all conditions happy. + b.Generation = 1 + b.Status.InitializeConditions() + b.Status.ObservedGeneration = 1 + // Mark all dependent conditions true so the top-level Ready becomes true. + manager := b.GetConditionSet().Manage(&b.Status) + manager.MarkTrue(eventingv1.BrokerConditionIngress) + manager.MarkTrue(eventingv1.BrokerConditionTriggerChannel) + manager.MarkTrue(eventingv1.BrokerConditionFilter) + manager.MarkTrue(eventingv1.BrokerConditionAddressable) + manager.MarkTrue(eventingv1.BrokerConditionDeadLetterSinkResolved) + manager.MarkTrue(eventingv1.BrokerConditionEventPoliciesReady) + return b +} + +// newTestTrigger creates a test trigger referencing a broker. +func newTestTrigger(namespace, name, brokerName string) *eventingv1.Trigger { + return &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + UID: types.UID(testTriggerUID), + }, + Spec: eventingv1.TriggerSpec{ + Broker: brokerName, + }, + } +} + +// newTestTriggerWithSubscriber creates a trigger with a resolved subscriber URI. +func newTestTriggerWithSubscriber(namespace, name, brokerName, subscriberURL string) *eventingv1.Trigger { + t := newTestTrigger(namespace, name, brokerName) + u, _ := apis.ParseURL(subscriberURL) + t.Status.SubscriberURI = u + return t +} + +// --- Fake listers --- + +type fakeBrokerLister struct { + brokers map[string]map[string]*eventingv1.Broker +} + +func newFakeBrokerLister() *fakeBrokerLister { + return &fakeBrokerLister{ + brokers: make(map[string]map[string]*eventingv1.Broker), + } +} + +func (f *fakeBrokerLister) addBroker(broker *eventingv1.Broker) { + if f.brokers[broker.Namespace] == nil { + f.brokers[broker.Namespace] = make(map[string]*eventingv1.Broker) + } + f.brokers[broker.Namespace][broker.Name] = broker +} + +func (f *fakeBrokerLister) List(selector labels.Selector) ([]*eventingv1.Broker, error) { + var result []*eventingv1.Broker + for _, ns := range f.brokers { + for _, broker := range ns { + result = append(result, broker) + } + } + return result, nil +} + +func (f *fakeBrokerLister) Brokers(namespace string) eventinglisters.BrokerNamespaceLister { + return &fakeBrokerNamespaceLister{ + namespace: namespace, + brokers: f.brokers[namespace], + } +} + +type fakeBrokerNamespaceLister struct { + namespace string + brokers map[string]*eventingv1.Broker +} + +func (f *fakeBrokerNamespaceLister) List(selector labels.Selector) ([]*eventingv1.Broker, error) { + result := make([]*eventingv1.Broker, 0, len(f.brokers)) + for _, broker := range f.brokers { + result = append(result, broker) + } + return result, nil +} + +func (f *fakeBrokerNamespaceLister) Get(name string) (*eventingv1.Broker, error) { + if broker, ok := f.brokers[name]; ok { + return broker, nil + } + return nil, apierrs.NewNotFound(schema.GroupResource{Group: "eventing.knative.dev", Resource: "brokers"}, name) +} + +func TestReconcileTrigger(t *testing.T) { + tests := []struct { + name string + broker *eventingv1.Broker + trigger *eventingv1.Trigger + wantErr bool + }{ + { + name: "broker not found returns nil", + broker: nil, + trigger: newTestTrigger(testNamespace, testTriggerName, testBrokerName), + wantErr: false, + }, + { + name: "broker wrong class returns nil", + broker: newTestBroker(testNamespace, testBrokerName, "OtherBrokerClass"), + trigger: newTestTrigger(testNamespace, testTriggerName, testBrokerName), + wantErr: false, + }, + { + name: "broker not ready returns nil", + broker: newTestBroker(testNamespace, testBrokerName, constants.BrokerClassName), + trigger: newTestTrigger(testNamespace, testTriggerName, testBrokerName), + wantErr: false, + }, + { + name: "subscriber URI not resolved returns nil", + broker: newReadyTestBroker(testNamespace, testBrokerName, constants.BrokerClassName), + trigger: newTestTrigger(testNamespace, testTriggerName, testBrokerName), + wantErr: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + brokerLister := newFakeBrokerLister() + if tc.broker != nil { + brokerLister.addBroker(tc.broker) + } + + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + r := &FilterReconciler{ + logger: logging.FromContext(ctx), + brokerLister: brokerLister, + } + + err := r.ReconcileTrigger(ctx, tc.trigger) + if tc.wantErr && err == nil { + t.Error("ReconcileTrigger() expected error, got nil") + } + if !tc.wantErr && err != nil { + t.Errorf("ReconcileTrigger() unexpected error: %v", err) + } + }) + } +} + +func TestReconcileTrigger_BrokerLookupError(t *testing.T) { + // Use a broker lister that returns an error other than NotFound. + brokerLister := &errorBrokerLister{ + err: fmt.Errorf("internal error"), + } + + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + r := &FilterReconciler{ + logger: logging.FromContext(ctx), + brokerLister: brokerLister, + } + + trigger := newTestTrigger(testNamespace, testTriggerName, testBrokerName) + err := r.ReconcileTrigger(ctx, trigger) + if err == nil { + t.Error("ReconcileTrigger() expected error for non-NotFound broker lookup error, got nil") + } +} + +// errorBrokerLister returns an arbitrary error for any lookup. +type errorBrokerLister struct { + err error +} + +func (e *errorBrokerLister) List(selector labels.Selector) ([]*eventingv1.Broker, error) { + return nil, e.err +} + +func (e *errorBrokerLister) Brokers(namespace string) eventinglisters.BrokerNamespaceLister { + return &errorBrokerNamespaceLister{err: e.err} +} + +type errorBrokerNamespaceLister struct { + err error +} + +func (e *errorBrokerNamespaceLister) List(selector labels.Selector) ([]*eventingv1.Broker, error) { + return nil, e.err +} + +func (e *errorBrokerNamespaceLister) Get(name string) (*eventingv1.Broker, error) { + return nil, e.err +} + +func TestDeleteTrigger(t *testing.T) { + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + cm := &ConsumerManager{ + logger: logging.FromContext(ctx), + subscriptions: make(map[string]*TriggerSubscription), + } + + r := &FilterReconciler{ + logger: logging.FromContext(ctx), + consumerManager: cm, + } + + // Deleting a non-existent trigger should return nil. + if err := r.DeleteTrigger("non-existent-uid"); err != nil { + t.Errorf("DeleteTrigger() unexpected error for non-existent trigger: %v", err) + } +} + +// Verify that IsReady helper produces truly ready brokers for use in other +// test cases. This guards against upstream condition-set changes breaking +// the helper silently. +func TestNewReadyTestBroker(t *testing.T) { + b := newReadyTestBroker(testNamespace, testBrokerName, constants.BrokerClassName) + if !b.IsReady() { + cond := b.Status.GetTopLevelCondition() + t.Errorf("newReadyTestBroker() produced broker that is not ready; top-level condition: %+v", cond) + for _, c := range b.Status.Conditions { + if c.Status != corev1.ConditionTrue { + t.Errorf(" condition %s = %s (%s)", c.Type, c.Status, c.Reason) + } + } + } +} + +// Verify subscriber URI detection works on both nil and non-nil cases. +func TestNewTestTriggerWithSubscriber(t *testing.T) { + t.Run("without subscriber", func(t *testing.T) { + trigger := newTestTrigger(testNamespace, testTriggerName, testBrokerName) + if trigger.Status.SubscriberURI != nil { + t.Errorf("expected nil SubscriberURI, got %v", trigger.Status.SubscriberURI) + } + }) + + t.Run("with subscriber", func(t *testing.T) { + trigger := newTestTriggerWithSubscriber(testNamespace, testTriggerName, testBrokerName, "http://example.com") + if trigger.Status.SubscriberURI == nil { + t.Error("expected non-nil SubscriberURI") + } + }) +} + +// TestReconcileTrigger_SkipReasons verifies that each early-return guard in +// ReconcileTrigger is independently effective. We run through a progression +// where exactly one guard is removed at each step to confirm each check. +func TestReconcileTrigger_SkipReasons(t *testing.T) { + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + // Ready broker + trigger with subscriber should NOT be skipped. + // We cannot proceed without a real JetStream, but we can confirm the + // early returns work by verifying the inverse: a ready broker with + // wrong class is still skipped. + broker := newReadyTestBroker(testNamespace, testBrokerName, "WrongClass") + trigger := newTestTriggerWithSubscriber(testNamespace, testTriggerName, testBrokerName, "http://subscriber.example.com") + + brokerLister := newFakeBrokerLister() + brokerLister.addBroker(broker) + + r := &FilterReconciler{ + logger: logging.FromContext(ctx), + brokerLister: brokerLister, + } + + // Even though trigger has a subscriber and broker is ready, the wrong + // class causes a skip (nil return). + if err := r.ReconcileTrigger(ctx, trigger); err != nil { + t.Errorf("ReconcileTrigger() with wrong broker class should skip, got error: %v", err) + } +} + +// --- Fake JetStream --- + +// fakeJetStream embeds the JetStreamContext interface and overrides only +// ConsumerInfo. Any other method will panic if called (embedded nil value), +// which is acceptable since our tests never reach those code paths. +type fakeJetStream struct { + nats.JetStreamContext + consumerInfoErr error +} + +func (f *fakeJetStream) ConsumerInfo(stream, name string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { + return nil, f.consumerInfoErr +} + +func int32Ptr(v int32) *int32 { return &v } + +func TestNewFilterReconciler(t *testing.T) { + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + r := NewFilterReconciler(ctx, nil, nil, nil) + if r == nil { + t.Fatal("NewFilterReconciler() returned nil") + } + if r.logger == nil { + t.Error("reconciler.logger should not be nil") + } +} + +func TestReconcileTrigger_FullPath(t *testing.T) { + tests := []struct { + name string + brokerAddr bool + deadLetterURI string + deliveryRetry *int32 + jsErr error + wantErrContains string + }{ + { + name: "basic path through to SubscribeTrigger", + jsErr: fmt.Errorf("connection refused"), + wantErrContains: "failed to subscribe to trigger", + }, + { + name: "with broker ingress address", + brokerAddr: true, + jsErr: fmt.Errorf("connection refused"), + wantErrContains: "failed to subscribe to trigger", + }, + { + name: "with dead letter sink", + deadLetterURI: "http://dead-letter.example.com", + jsErr: fmt.Errorf("connection refused"), + wantErrContains: "failed to subscribe to trigger", + }, + { + name: "with delivery spec", + deliveryRetry: int32Ptr(3), + jsErr: fmt.Errorf("connection refused"), + wantErrContains: "failed to subscribe to trigger", + }, + { + name: "all optional fields set", + brokerAddr: true, + deadLetterURI: "http://dead-letter.example.com", + deliveryRetry: int32Ptr(5), + jsErr: fmt.Errorf("connection refused"), + wantErrContains: "failed to subscribe to trigger", + }, + { + name: "consumer not found error", + jsErr: nats.ErrConsumerNotFound, + wantErrContains: "not found", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + broker := newReadyTestBroker(testNamespace, testBrokerName, constants.BrokerClassName) + if tc.brokerAddr { + broker.Status.SetAddress(&duckv1.Addressable{ + URL: apis.HTTP("broker-ingress.example.com"), + }) + } + + trigger := newTestTriggerWithSubscriber(testNamespace, testTriggerName, testBrokerName, "http://subscriber.example.com") + if tc.deadLetterURI != "" { + u, _ := apis.ParseURL(tc.deadLetterURI) + trigger.Status.DeadLetterSinkURI = u + } + if tc.deliveryRetry != nil { + trigger.Spec.Delivery = &eventingduckv1.DeliverySpec{ + Retry: tc.deliveryRetry, + } + } + + brokerLister := newFakeBrokerLister() + brokerLister.addBroker(broker) + + cm := &ConsumerManager{ + logger: logging.FromContext(ctx), + ctx: ctx, + js: &fakeJetStream{consumerInfoErr: tc.jsErr}, + subscriptions: make(map[string]*TriggerSubscription), + } + + r := &FilterReconciler{ + logger: logging.FromContext(ctx), + brokerLister: brokerLister, + consumerManager: cm, + } + + err := r.ReconcileTrigger(ctx, trigger) + if err == nil { + t.Fatal("ReconcileTrigger() expected error, got nil") + } + if !strings.Contains(err.Error(), tc.wantErrContains) { + t.Errorf("error %q should contain %q", err.Error(), tc.wantErrContains) + } + }) + } +} + +func TestReconcileTrigger_ExistingSubscription(t *testing.T) { + ctx := logging.WithLogger(context.Background(), logging.FromContext(context.TODO())) + + broker := newReadyTestBroker(testNamespace, testBrokerName, constants.BrokerClassName) + subscriberURL := "http://subscriber.example.com" + trigger := newTestTriggerWithSubscriber(testNamespace, testTriggerName, testBrokerName, subscriberURL) + triggerUID := string(trigger.UID) + + brokerLister := newFakeBrokerLister() + brokerLister.addBroker(broker) + + // Pre-populate subscription with matching subscriber URL. + parsedURL, _ := apis.ParseURL(subscriberURL) + existingHandler := &TriggerHandler{ + subscriber: duckv1.Addressable{URL: parsedURL}, + } + + cm := &ConsumerManager{ + logger: logging.FromContext(ctx), + ctx: ctx, + js: &fakeJetStream{consumerInfoErr: fmt.Errorf("should not be called")}, + subscriptions: map[string]*TriggerSubscription{ + triggerUID: { + trigger: trigger, + handler: existingHandler, + }, + }, + } + + r := &FilterReconciler{ + logger: logging.FromContext(ctx), + brokerLister: brokerLister, + consumerManager: cm, + } + + // Should return nil because existing subscription has same URL. + err := r.ReconcileTrigger(ctx, trigger) + if err != nil { + t.Errorf("ReconcileTrigger() unexpected error for existing subscription with same URL: %v", err) + } +}