Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions agent/backend/devicediscovery/device_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ backend.Backend = (*deviceDiscoveryBackend)(nil)
const (
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down Expand Up @@ -247,8 +247,7 @@ func (d *deviceDiscoveryBackend) Start(ctx context.Context, cancelFunc context.C
}
version, readinessErr = d.Version()
if readinessErr == nil {
d.logger.Info("device-discovery readiness ok, got version ",
"device_discovery_version", version)
d.logger.Info("device-discovery readiness ok, got version", "version", version)
break
}
backoffDuration := time.Duration(backoff) * time.Second
Expand Down
5 changes: 2 additions & 3 deletions agent/backend/networkdiscovery/network_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ backend.Backend = (*networkDiscoveryBackend)(nil)
const (
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down Expand Up @@ -264,8 +264,7 @@ func (d *networkDiscoveryBackend) Start(ctx context.Context, cancelFunc context.
}
version, readinessErr = d.Version()
if readinessErr == nil {
d.logger.Info("network-discovery readiness ok, got version ",
"network_discovery_version", version)
d.logger.Info("network-discovery readiness ok, got version", "version", version)
break
}
backoffDuration := time.Duration(backoff) * time.Second
Expand Down
5 changes: 2 additions & 3 deletions agent/backend/opentelemetryinfinity/opentelemetry_infinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
defaultAPIPort = "10222"
versionTimeout = 5
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down Expand Up @@ -181,8 +181,7 @@ func (o *openTelemetryBackend) Start(ctx context.Context, cancelFunc context.Can
}
version, readinessErr = o.Version()
if readinessErr == nil {
o.logger.Info("opentelemetry infinity readiness ok, got version ",
"opentelemetry_infinity_version", version)
o.logger.Info("opentelemetry infinity readiness ok, got version", "version", version)
break
}
backoffDuration := time.Duration(backoff) * time.Second
Expand Down
16 changes: 13 additions & 3 deletions agent/backend/pktvisor/pktvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ backend.Backend = (*pktvisorBackend)(nil)

const (
defaultBinary = "pktvisord"
readinessBackoff = 10
readinessBackoff = 12
readinessTimeout = 10
applyPolicyTimeout = 10
removePolicyTimeout = 20
Expand Down Expand Up @@ -205,8 +205,7 @@ func (p *pktvisorBackend) Start(ctx context.Context, cancelFunc context.CancelFu
readinessError = backend.CommonRequest("pktvisor", p.proc, p.logger, url, &appMetrics, http.MethodGet,
http.NoBody, "application/json", readinessTimeout, "error")
if readinessError == nil {
p.logger.Info("pktvisor readiness ok, got version ",
"pktvisor_version", appMetrics.App.Version)
p.logger.Info("pktvisor readiness ok, got version", "version", appMetrics.App.Version)
break
}
backoffDuration := time.Duration(backoff) * time.Second
Expand Down Expand Up @@ -314,6 +313,7 @@ func parsePktvisorEntity(line string) (entity, name, rest string, ok bool) {
func (p *pktvisorBackend) Stop(ctx context.Context) error {
p.logger.Info("routine call to stop pktvisor", "routine", ctx.Value(config.ContextKey("routine")))
defer p.cancelFunc()

err := p.proc.Stop()
finalStatus := <-p.statusChan
if err != nil {
Expand All @@ -337,6 +337,15 @@ func (p *pktvisorBackend) Configure(logger *slog.Logger, repo policies.PolicyRep
p.adminAPIPort = defaultAPIPort
p.agentLabels = common.Otlp.AgentLabels

// Clean up old temp config file if it exists
if p.configFile != "" {
if err := os.Remove(p.configFile); err != nil && !os.IsNotExist(err) {
p.logger.Warn("failed to remove old pktvisor temp config file",
"file", p.configFile,
"error", err)
}
}

// Create temp config file
tmpDir := os.TempDir()
tmpFile, err := os.CreateTemp(tmpDir, "pktvisor-*.yaml")
Expand Down Expand Up @@ -432,6 +441,7 @@ func (p *pktvisorBackend) FullReset(ctx context.Context) error {
}
}

// Note: Stop() already cleaned up temp file, but kept path for restart
// for each policy, restart the scraper
backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, config.ContextKey("routine"), "pktvisor"))

Expand Down
5 changes: 2 additions & 3 deletions agent/backend/snmpdiscovery/snmp_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ backend.Backend = (*snmpDiscoveryBackend)(nil)
const (
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down Expand Up @@ -265,8 +265,7 @@ func (d *snmpDiscoveryBackend) Start(ctx context.Context, cancelFunc context.Can
}
version, readinessErr = d.Version()
if readinessErr == nil {
d.logger.Info("snmp-discovery readiness ok, got version ",
"snmp_discovery_version", version)
d.logger.Info("snmp-discovery readiness ok, got version", "version", version)
break
}
backoffDuration := time.Duration(backoff) * time.Second
Expand Down
5 changes: 2 additions & 3 deletions agent/backend/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ backend.Backend = (*workerBackend)(nil)
const (
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down Expand Up @@ -248,8 +248,7 @@ func (d *workerBackend) Start(ctx context.Context, cancelFunc context.CancelFunc
}
version, readinessErr = d.Version()
if readinessErr == nil {
d.logger.Info("worker readiness ok, got version ",
"worker_version", version)
d.logger.Info("worker readiness ok, got version ", "version", version)
break
}
backoffDuration := time.Duration(backoff) * time.Second
Expand Down
28 changes: 15 additions & 13 deletions agent/docker/orb-agent-entry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,24 @@ trap agentstop2 SIGTERM
# eternal loop
while true
do
# pid file dont exist
if [ ! -f "/var/run/orb-agent.pid" ]; then
# running orb-agent in background
nohup /run-agent.sh "${agent_args[@]}" &
sleep 2
if [ -d "/nohup.out" ]; then
tail -f /nohup.out &
fi
else
# pid file exists
if [ -f "/var/run/orb-agent.pid" ]; then
PID=$(cat /var/run/orb-agent.pid)
if [ ! -d "/proc/$PID" ]; then
# stop container
echo "$PID is not running"
# Process not running, clean stale PID file and continue to start agent
echo "Cleaning stale PID file for $PID (process not running)"
rm /var/run/orb-agent.pid
exit 1
# Fall through to next iteration which will start agent
else
# Process is running, wait
sleep 5
fi
else
# pid file doesn't exist, start agent
nohup /run-agent.sh "${agent_args[@]}" &
sleep 2
if [ -f "/nohup.out" ]; then
tail -f /nohup.out &
fi
sleep 5
fi
done
19 changes: 17 additions & 2 deletions agent/otlpbridge/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"net"
"strings"
"sync"
"syscall"

collectorlogs "go.opentelemetry.io/proto/otlp/collector/logs/v1"
collectormetrics "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
collectortrace "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sys/unix"
"google.golang.org/grpc"

"github.com/netboxlabs/orb-agent/agent/policies"
Expand Down Expand Up @@ -104,8 +106,21 @@ func (s *BridgeServer) GetPolicyRepo() policies.PolicyRepo {

// Start starts the gRPC server without establishing MQTT.
// Publisher and topic should be set before OTLP data arrives.
func (s *BridgeServer) Start(_ context.Context) error {
lis, err := net.Listen("tcp", s.cfg.ListenAddr)
func (s *BridgeServer) Start(ctx context.Context) error {
// Use ListenConfig to enable SO_REUSEADDR for faster port reuse after restart
lc := net.ListenConfig{
Control: func(_, _ string, c syscall.RawConn) error {
var sockOptErr error
if err := c.Control(func(fd uintptr) {
// Enable SO_REUSEADDR to allow binding to TIME_WAIT sockets
sockOptErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
}); err != nil {
return err
}
return sockOptErr
},
}
lis, err := lc.Listen(ctx, "tcp", s.cfg.ListenAddr)
if err != nil {
return fmt.Errorf("failed to listen on %s (port may be in use by another service): %w", s.cfg.ListenAddr, err)
}
Expand Down
Loading