Skip to content
39 changes: 39 additions & 0 deletions cmd/filter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"os"

"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"

"knative.dev/eventing-natss/pkg/broker/filter"
)

func main() {
component := "natsjs-broker-filter"

ctx := signals.NewContext()
ns := os.Getenv("NAMESPACE")
if ns != "" {
ctx = injection.WithNamespaceScope(ctx, ns)
}

sharedmain.MainWithContext(ctx, component, filter.NewController)
}
346 changes: 346 additions & 0 deletions pkg/broker/filter/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,346 @@
/*
Copyright 2026 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filter

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/nats-io/nats.go"
"go.uber.org/zap"

duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/logging"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/kncloudevents"

brokerutils "knative.dev/eventing-natss/pkg/broker/utils"
)

const (
// DefaultFetchBatchSize is the default number of messages to fetch in each batch
DefaultFetchBatchSize = 10
// DefaultFetchTimeout is the default timeout for fetching messages
DefaultFetchTimeout = 200 * time.Millisecond
)

// ConsumerManagerConfig holds configuration for the ConsumerManager
type ConsumerManagerConfig struct {
// FetchBatchSize is the number of messages to fetch in each batch.
// Defaults to DefaultFetchBatchSize if not set.
FetchBatchSize int

// FetchTimeout is the timeout for fetching messages.
// Defaults to DefaultFetchTimeout if not set.
FetchTimeout time.Duration
}

// ConsumerManager manages JetStream consumer subscriptions for triggers
type ConsumerManager struct {
logger *zap.SugaredLogger
ctx context.Context

js nats.JetStreamContext
conn *nats.Conn

// Configuration
fetchBatchSize int
fetchTimeout time.Duration

// Event dispatcher
dispatcher *kncloudevents.Dispatcher

// Map of trigger UID to subscription
subscriptions map[string]*TriggerSubscription
mu sync.RWMutex
}

// TriggerSubscription holds the subscription and handler for a trigger
type TriggerSubscription struct {
trigger *eventingv1.Trigger
subscription *nats.Subscription
handler *TriggerHandler
streamName string
consumerName string
cancel context.CancelFunc
}

// NewConsumerManager creates a new consumer manager
func NewConsumerManager(ctx context.Context, conn *nats.Conn, js nats.JetStreamContext, config *ConsumerManagerConfig) *ConsumerManager {
// Create OIDC token provider and dispatcher
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
dispatcher := kncloudevents.NewDispatcher(eventingtls.ClientConfig{}, oidcTokenProvider)

// Apply defaults
fetchBatchSize := DefaultFetchBatchSize
fetchTimeout := DefaultFetchTimeout

if config != nil {
if config.FetchBatchSize > 0 {
fetchBatchSize = config.FetchBatchSize
}
if config.FetchTimeout > 0 {
fetchTimeout = config.FetchTimeout
}
}

return &ConsumerManager{
logger: logging.FromContext(ctx),
ctx: ctx,
js: js,
conn: conn,
fetchBatchSize: fetchBatchSize,
fetchTimeout: fetchTimeout,
dispatcher: dispatcher,
subscriptions: make(map[string]*TriggerSubscription),
}
}

// SubscribeTrigger creates a pull-based subscription for a trigger's consumer
func (m *ConsumerManager) SubscribeTrigger(
trigger *eventingv1.Trigger,
broker *eventingv1.Broker,
subscriber duckv1.Addressable,
brokerIngressURL *duckv1.Addressable,
deadLetterSink *duckv1.Addressable,
retryConfig *kncloudevents.RetryConfig,
noRetryConfig *kncloudevents.RetryConfig,
) error {
m.mu.Lock()
defer m.mu.Unlock()

triggerUID := string(trigger.UID)
logger := m.logger.With(
zap.String("trigger", trigger.Name),
zap.String("namespace", trigger.Namespace),
zap.String("trigger_uid", triggerUID),
)

// Check if we already have a subscription for this trigger
if existing, ok := m.subscriptions[triggerUID]; ok {
existing.handler.noRetryConfig = noRetryConfig
existing.handler.retryConfig = retryConfig
existing.handler.filter = buildTriggerFilter(logger, trigger)
existing.handler.deadLetterSink = deadLetterSink

// Check if configuration has changed
if existing.handler.subscriber.URL.String() == subscriber.URL.String() {
logger.Debugw("trigger subscription already exists and is up to date")
return nil
}
// Configuration changed, unsubscribe and re-subscribe
logger.Infow("trigger configuration changed, re-subscribing")
if err := m.unsubscribeLocked(triggerUID); err != nil {
logger.Warnw("failed to unsubscribe old trigger subscription", zap.Error(err))
}
}

// Create the trigger handler
handler, err := NewTriggerHandler(
m.ctx,
trigger,
subscriber,
brokerIngressURL,
deadLetterSink,
retryConfig,
noRetryConfig,
m.dispatcher,
)
if err != nil {
return fmt.Errorf("failed to create trigger handler: %w", err)
}

// Derive stream and consumer names
streamName := brokerutils.BrokerStreamName(broker)
consumerName := brokerutils.TriggerConsumerName(triggerUID)

// Get consumer info (also verifies consumer exists)
consumerInfo, err := m.js.ConsumerInfo(streamName, consumerName)
if err != nil {
handler.Cleanup()
if errors.Is(err, nats.ErrConsumerNotFound) {
return fmt.Errorf("consumer %s not found in stream %s: trigger controller may not have reconciled yet", consumerName, streamName)
}
return fmt.Errorf("failed to get consumer info: %w", err)
}

// Get the filter subject from the consumer's configuration
filterSubject := brokerutils.BrokerPublishSubjectName(broker.Namespace, broker.Name) + ".>"

logger.Infow("creating pull subscription for trigger consumer",
zap.String("stream", streamName),
zap.String("consumer", consumerName),
zap.String("filter_subject", filterSubject),
)

// Create pull subscription bound to the existing consumer
sub, err := m.js.PullSubscribe(
filterSubject,
consumerName,
nats.Bind(streamName, consumerName),
)
if err != nil {
handler.Cleanup()
return fmt.Errorf("failed to create pull subscription: %w", err)
}

// Set subscription and consumer info on handler
handler.subscription = sub
handler.consumer = consumerInfo

// Create cancellable context for the fetch loop
ctx, cancel := context.WithCancel(m.ctx)

// Store the subscription
m.subscriptions[triggerUID] = &TriggerSubscription{
trigger: trigger,
subscription: sub,
handler: handler,
streamName: streamName,
consumerName: consumerName,
cancel: cancel,
}

// Start the message fetch loop
go m.fetchLoop(ctx, sub, handler, logger)

logger.Infow("successfully started pull subscription for trigger consumer")
return nil
}

// fetchLoop continuously fetches messages from the pull consumer
func (m *ConsumerManager) fetchLoop(
ctx context.Context,
sub *nats.Subscription,
handler *TriggerHandler,
logger *zap.SugaredLogger,
) {
for {
select {
case <-ctx.Done():
logger.Debugw("fetch loop stopped")
return
default:
// Fetch a batch of messages
msgs, err := sub.Fetch(m.fetchBatchSize, nats.MaxWait(m.fetchTimeout))
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
// No messages available, continue polling
continue
}
if errors.Is(err, nats.ErrConnectionClosed) || errors.Is(err, nats.ErrConsumerDeleted) || errors.Is(err, nats.ErrBadSubscription) {
logger.Warnw("subscription closed, stopping fetch loop", zap.Error(err))
return
}
if errors.Is(err, context.Canceled) {
return
}
logger.Errorw("error fetching messages", zap.Error(err))
// Back off on errors
time.Sleep(200 * time.Millisecond)
continue
}

// Process fetched messages
for _, msg := range msgs {
handler.HandleMessage(msg)
}
}
}
}

// UnsubscribeTrigger removes a subscription for a trigger
func (m *ConsumerManager) UnsubscribeTrigger(triggerUID string) error {
m.mu.Lock()
defer m.mu.Unlock()

return m.unsubscribeLocked(triggerUID)
}

// unsubscribeLocked removes a subscription (must be called with lock held)
func (m *ConsumerManager) unsubscribeLocked(triggerUID string) error {
sub, ok := m.subscriptions[triggerUID]
if !ok {
return nil
}

logger := m.logger.With(
zap.String("trigger", sub.trigger.Name),
zap.String("namespace", sub.trigger.Namespace),
)

logger.Infow("unsubscribing from trigger consumer")

// Cancel the fetch loop first
if sub.cancel != nil {
sub.cancel()
}

// Unsubscribe from the pull consumer
if err := sub.subscription.Unsubscribe(); err != nil {
logger.Warnw("failed to unsubscribe", zap.Error(err))
}

// Cleanup the handler
sub.handler.Cleanup()

// Remove from map
delete(m.subscriptions, triggerUID)

return nil
}

// Close closes all subscriptions
func (m *ConsumerManager) Close() error {
m.mu.Lock()
defer m.mu.Unlock()

m.logger.Infow("closing consumer manager", zap.Int("subscription_count", len(m.subscriptions)))

var errs []error
for uid := range m.subscriptions {
if err := m.unsubscribeLocked(uid); err != nil {
errs = append(errs, err)
}
}

if len(errs) > 0 {
return fmt.Errorf("errors closing subscriptions: %v", errs)
}
return nil
}

// GetSubscriptionCount returns the number of active subscriptions
func (m *ConsumerManager) GetSubscriptionCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.subscriptions)
}

// HasSubscription checks if a subscription exists for a trigger
func (m *ConsumerManager) HasSubscription(triggerUID string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.subscriptions[triggerUID]
return ok
}
Loading
Loading