Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
return err
}

// Bind fleet secrets manager to fleet config manager if both are fleet-based
// This needs to happen before SolveConfigSecrets so secrets can be resolved
if a.config.OrbAgent.ConfigManager.Active == "fleet" && a.config.OrbAgent.SecretsManager.Active == "fleet" {
if fleetCM, ok := a.configManager.(*configmgr.FleetConfigManager); ok {
if err := fleetCM.BindSecretsManager(a.secretsManager); err != nil {
a.logger.Error("error binding fleet secrets manager", "error", err)
return err
}
}
}

var err error
if a.config.OrbAgent.Backends,
a.config.OrbAgent.ConfigManager,
Expand Down
10 changes: 8 additions & 2 deletions agent/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,15 @@ type VaultManager struct {
Schedule *string `yaml:"schedule,omitempty"`
}

// SecretsSources represents the configuration for manager sources, including vault.
// FleetSecretsManager represents the configuration for the Fleet secrets manager
type FleetSecretsManager struct {
Timeout *int `yaml:"timeout,omitempty"` // Request timeout in seconds
}

// SecretsSources represents the configuration for manager sources, including vault and fleet.
type SecretsSources struct {
Vault VaultManager `yaml:"vault"`
Vault VaultManager `yaml:"vault"`
Fleet FleetSecretsManager `yaml:"fleet"`
}

// ManagerSecrets represents the configuration for the Secrets Manager
Expand Down
72 changes: 58 additions & 14 deletions agent/configmgr/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/netboxlabs/orb-agent/agent/otlpbridge"
"github.com/netboxlabs/orb-agent/agent/policymgr"
"github.com/netboxlabs/orb-agent/agent/redact"
"github.com/netboxlabs/orb-agent/agent/secretsmgr"
)

// Compile-time check to ensure fleetConfigManager implements Manager interface
var _ Manager = (*fleetConfigManager)(nil)
// Compile-time check to ensure FleetConfigManager implements Manager interface
var _ Manager = (*FleetConfigManager)(nil)

type fleetConfigManager struct {
// FleetConfigManager implements the Manager interface for Fleet-based configuration
type FleetConfigManager struct {
logger *slog.Logger
connection fleet.MQTTConnector
authTokenManager *fleet.AuthTokenManager
Expand All @@ -38,10 +40,10 @@ type fleetConfigManager struct {
monitorCancel context.CancelFunc
}

func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever) *fleetConfigManager {
func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever) *FleetConfigManager {
resetChan := make(chan struct{}, 1)
reconnectChan := make(chan struct{}, 1)
return &fleetConfigManager{
return &FleetConfigManager{
logger: logger,
connection: fleet.NewMQTTConnection(logger, pMgr, resetChan, reconnectChan, backendState),
authTokenManager: fleet.NewAuthTokenManager(logger),
Expand All @@ -52,11 +54,11 @@ func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, ba
}
}

// newFleetConfigManagerWithConnection creates a fleetConfigManager with a custom connection (for testing)
func newFleetConfigManagerWithConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever, conn fleet.MQTTConnector) *fleetConfigManager {
// newFleetConfigManagerWithConnection creates a FleetConfigManager with a custom connection (for testing)
func newFleetConfigManagerWithConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever, conn fleet.MQTTConnector) *FleetConfigManager {
resetChan := make(chan struct{}, 1)
reconnectChan := make(chan struct{}, 1)
return &fleetConfigManager{
return &FleetConfigManager{
logger: logger,
connection: conn, // Use provided connection instead of creating new one
authTokenManager: fleet.NewAuthTokenManager(logger),
Expand All @@ -67,7 +69,8 @@ func newFleetConfigManagerWithConnection(logger *slog.Logger, pMgr policymgr.Pol
}
}

func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error {
// Start initializes and starts the Fleet configuration manager
func (fleetManager *FleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error {
ctx := context.Background()

var err error
Expand Down Expand Up @@ -222,8 +225,48 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
return nil
}

// BindSecretsManager binds a fleet secrets manager to the MQTT connection
func (fleetManager *FleetConfigManager) BindSecretsManager(sm secretsmgr.Manager) error {
// Check if it's a fleet secrets manager by type assertion
fleetSM, ok := sm.(*secretsmgr.FleetSecretsManager)
if !ok {
// Try to get the underlying fleet secrets manager
// This handles the case where the manager is wrapped
return nil // Not a fleet secrets manager, nothing to bind
}

// Register OnReadyHook to bind secrets manager when MQTT connection is ready
fleetManager.connection.AddOnReadyHook(func(cm *autopaho.ConnectionManager, topics fleet.TokenResponseTopics) {
// Create publisher and subscriber adapters
pub := secretsmgr.NewCMAdapterPublisher(cm)
sub := secretsmgr.NewCMAdapterSubscriber(cm)

// Bind the secrets manager to MQTT
if err := fleetSM.BindMQTT(pub, sub, topics.SecretsRequest, topics.SecretsResponse, topics.SecretsUpdated); err != nil {
fleetManager.logger.Error("failed to bind fleet secrets manager to MQTT", "error", err)
return
}

// Register topic handlers for secrets topics
// Note: These handlers will be called from OnPublishReceived in connection.go
fleetManager.connection.RegisterTopicHandler(topics.SecretsResponse, func(topic string, payload []byte) error {
return fleetSM.HandleMessage(topic, payload)
})
fleetManager.connection.RegisterTopicHandler(topics.SecretsUpdated, func(topic string, payload []byte) error {
return fleetSM.HandleMessage(topic, payload)
})

fleetManager.logger.Info("Fleet secrets manager bound to MQTT",
slog.String("request_topic", topics.SecretsRequest),
slog.String("response_topic", topics.SecretsResponse),
slog.String("updated_topic", topics.SecretsUpdated))
})

return nil
}

// refreshAndReconnect refreshes the JWT token and reconnects to MQTT
func (fleetManager *fleetConfigManager) refreshAndReconnect(ctx context.Context, timeout time.Duration) error {
func (fleetManager *FleetConfigManager) refreshAndReconnect(ctx context.Context, timeout time.Duration) error {
// Refresh JWT token
token, err := fleetManager.authTokenManager.RefreshToken(ctx)
if err != nil {
Expand Down Expand Up @@ -271,7 +314,7 @@ func (fleetManager *fleetConfigManager) refreshAndReconnect(ctx context.Context,
return nil
}

func (fleetManager *fleetConfigManager) configToSafeString(cfg config.Config) (string, error) {
func (fleetManager *FleetConfigManager) configToSafeString(cfg config.Config) (string, error) {
redacted := redact.SensitiveData(cfg)
configYaml, err := yaml.Marshal(redacted)
if err != nil {
Expand All @@ -280,13 +323,14 @@ func (fleetManager *fleetConfigManager) configToSafeString(cfg config.Config) (s
return string(configYaml), nil
}

func (fleetManager *fleetConfigManager) GetContext(ctx context.Context) context.Context {
// GetContext returns the context for the Fleet configuration manager
func (fleetManager *FleetConfigManager) GetContext(ctx context.Context) context.Context {
// Empty implementation for now - just return the context as-is
return ctx
}

// monitorTokenExpiry periodically checks token expiry and triggers reconnection before token expires
func (fleetManager *fleetConfigManager) monitorTokenExpiry() {
func (fleetManager *FleetConfigManager) monitorTokenExpiry() {
// Check interval: default 30 seconds, configurable via config
checkInterval := 30 * time.Second
if fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenExpiryCheckInterval != nil && *fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenExpiryCheckInterval > 0 {
Expand Down Expand Up @@ -346,7 +390,7 @@ func (fleetManager *fleetConfigManager) monitorTokenExpiry() {
}

// Stop gracefully shuts down the OTLP bridge and token expiry monitor.
func (fleetManager *fleetConfigManager) Stop(ctx context.Context) error {
func (fleetManager *FleetConfigManager) Stop(ctx context.Context) error {
// Stop token expiry monitor
if fleetManager.monitorCancel != nil {
fleetManager.monitorCancel()
Expand Down
67 changes: 50 additions & 17 deletions agent/configmgr/fleet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"net/url"
"strings"
"sync"
"time"

"github.com/eclipse/paho.golang/autopaho"
Expand All @@ -15,6 +16,9 @@ import (
"github.com/netboxlabs/orb-agent/agent/policymgr"
)

// TopicMessageHandler handles messages for a specific topic
type TopicMessageHandler func(topic string, payload []byte) error

// MQTTConnection manages the MQTT connection
type MQTTConnection struct {
logger *slog.Logger
Expand All @@ -23,11 +27,13 @@ type MQTTConnection struct {
messaging *Messaging
resetChan chan struct{}
onReadyHooks []func(cm *autopaho.ConnectionManager, topics TokenResponseTopics)
topicHandlers map[string]TopicMessageHandler
connectionTopics TokenResponseTopics
reconnectChan chan struct{}
capabilitiesFailCount int
groupMembershipFailCount int
heartbeatFailCount int
mu sync.Mutex
}

// NewMQTTConnection creates a new MQTTConnection
Expand All @@ -40,6 +46,7 @@ func NewMQTTConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, resetC
messaging: NewMessaging(logger, pMgr, resetChan, &groupManager),
resetChan: resetChan,
onReadyHooks: make([]func(cm *autopaho.ConnectionManager, topics TokenResponseTopics), 0),
topicHandlers: make(map[string]TopicMessageHandler),
reconnectChan: reconnectChan,
}
}
Expand All @@ -49,6 +56,13 @@ func (connection *MQTTConnection) AddOnReadyHook(fn func(cm *autopaho.Connection
connection.onReadyHooks = append(connection.onReadyHooks, fn)
}

// RegisterTopicHandler registers a handler for a specific topic
func (connection *MQTTConnection) RegisterTopicHandler(topic string, handler TopicMessageHandler) {
connection.mu.Lock()
defer connection.mu.Unlock()
connection.topicHandlers[topic] = handler
}

// TopicActions are the actions to take on a topic
type TopicActions struct {
Subscribe func(topic string) error
Expand All @@ -72,6 +86,7 @@ type MQTTConnector interface {
Disconnect(ctx context.Context, heartbeatTopic string) error
Reconnect(ctx context.Context, details ConnectionDetails, backends map[string]backend.Backend, labels map[string]string, configFile string, timeout time.Duration) error
AddOnReadyHook(fn func(cm *autopaho.ConnectionManager, topics TokenResponseTopics))
RegisterTopicHandler(topic string, handler TopicMessageHandler)
}

// Connect connects to the MQTT broker
Expand Down Expand Up @@ -212,25 +227,43 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio
// Log any published messages to subscribed topics
connection.logger.Info("received MQTT message", "topic", pr.Packet.Topic)

orgID := strings.Split(pr.Packet.Topic, "/")[1]

// Use a fresh context for async message handling, not the Connect() context
// which may be canceled or have a short timeout
err = connection.messaging.DispatchToHandlers(
context.Background(),
pr.Packet.Payload,
orgID,
details.AgentID,
TopicActions{
Subscribe: connection.subscribeToTopic,
Publish: connection.publishToTopic,
Unsubscribe: connection.unsubscribeFromTopic,
},
)
if err != nil {
connection.logger.Error("failed to dispatch to handlers", "error", err)
// Check if there's a topic-specific handler
connection.mu.Lock()
handler, hasHandler := connection.topicHandlers[pr.Packet.Topic]
connection.mu.Unlock()

if hasHandler {
// Process in goroutine to avoid blocking message acknowledgment
go func() {
if err := handler(pr.Packet.Topic, pr.Packet.Payload); err != nil {
connection.logger.Error("topic handler failed", "topic", pr.Packet.Topic, "error", err)
}
}()
return true, nil
}

// Process in goroutine to avoid blocking message acknowledgment
go func() {
orgID := strings.Split(pr.Packet.Topic, "/")[1]

// Use a fresh context for async message handling, not the Connect() context
// which may be canceled or have a short timeout
err := connection.messaging.DispatchToHandlers(
context.Background(),
pr.Packet.Payload,
orgID,
details.AgentID,
TopicActions{
Subscribe: connection.subscribeToTopic,
Publish: connection.publishToTopic,
Unsubscribe: connection.unsubscribeFromTopic,
},
)
if err != nil {
connection.logger.Error("failed to dispatch to handlers", "error", err)
}
}()

return true, nil
},
},
Expand Down
71 changes: 71 additions & 0 deletions agent/configmgr/fleet/messages/secrets_messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package messages

import (
"time"
)

// CurrentSecretsSchemaVersion defines the current version of the secrets schema
const CurrentSecretsSchemaVersion = "1.0"

// Error codes for secret operations
const (
ErrorCodeNotFound = "NOT_FOUND"
ErrorCodeForbidden = "FORBIDDEN"
ErrorCodeInvalidPath = "INVALID_PATH"
ErrorCodeTimeout = "TIMEOUT"
ErrorCodeInternalError = "INTERNAL_ERROR"
ErrorCodeRateLimited = "RATE_LIMITED"
)

// SecretRequest represents a single secret request in a SecretRequestMsg
type SecretRequest struct {
Path string `json:"path"` // The path to the secret in the control plane's secret store
Context string `json:"context"` // The context where the secret is used (policy ID, "config", or "backend")
}

// SecretRequestMsg represents a request for secrets
type SecretRequestMsg struct {
SchemaVersion string `json:"schema_version"`
RequestID string `json:"request_id"` // UUID v4
Timestamp time.Time `json:"timestamp"`
Secrets []SecretRequest `json:"secrets"`
}

// SecretValue represents a successfully retrieved secret
type SecretValue struct {
Path string `json:"path"`
Value string `json:"value"`
Version int `json:"version"`
Metadata map[string]string `json:"metadata,omitempty"`
}

// SecretError represents an error for a failed secret retrieval
type SecretError struct {
Path string `json:"path"`
Error string `json:"error"` // Human-readable error message
Code string `json:"code"` // Machine-readable error code
}

// SecretResponseMsg represents a response to a secret request
type SecretResponseMsg struct {
SchemaVersion string `json:"schema_version"`
RequestID string `json:"request_id"` // Matches the request_id from the original request
Timestamp time.Time `json:"timestamp"`
Status string `json:"status"` // "success", "partial", "error"
Secrets []SecretValue `json:"secrets,omitempty"` // Omitted if status is "error"
Errors []SecretError `json:"errors,omitempty"` // Omitted if status is "success"
}

// SecretUpdate represents a single secret update notification
type SecretUpdate struct {
Path string `json:"path"`
Version int `json:"version"`
Contexts []string `json:"contexts"` // List of contexts (policy IDs) that use this secret
}

// SecretUpdateNotificationMsg represents a push notification for updated secrets
type SecretUpdateNotificationMsg struct {
SchemaVersion string `json:"schema_version"`
Timestamp time.Time `json:"timestamp"`
Updates []SecretUpdate `json:"updates"`
}
5 changes: 5 additions & 0 deletions agent/configmgr/fleet/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ func (m *MockMQTTConnection) AddOnReadyHook(fn func(cm *autopaho.ConnectionManag
m.hooks = append(m.hooks, fn)
}

// RegisterTopicHandler registers a handler for a specific topic (mock implementation)
func (m *MockMQTTConnection) RegisterTopicHandler(_ string, _ TopicMessageHandler) {
// No-op for mock
}

// TriggerOnReadyHook triggers all registered onReady hooks (for testing)
func (m *MockMQTTConnection) TriggerOnReadyHook(cm *autopaho.ConnectionManager, topics TokenResponseTopics) {
for _, hook := range m.hooks {
Expand Down
Loading
Loading