Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/backend/devicediscovery/device_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ backend.Backend = (*deviceDiscoveryBackend)(nil)
const (
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down
2 changes: 1 addition & 1 deletion agent/backend/networkdiscovery/network_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ backend.Backend = (*networkDiscoveryBackend)(nil)
const (
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
defaultAPIPort = "10222"
versionTimeout = 5
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down
24 changes: 23 additions & 1 deletion agent/backend/pktvisor/pktvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ backend.Backend = (*pktvisorBackend)(nil)

const (
defaultBinary = "pktvisord"
readinessBackoff = 10
readinessBackoff = 12
readinessTimeout = 10
applyPolicyTimeout = 10
removePolicyTimeout = 20
Expand Down Expand Up @@ -314,6 +314,18 @@ func parsePktvisorEntity(line string) (entity, name, rest string, ok bool) {
func (p *pktvisorBackend) Stop(ctx context.Context) error {
p.logger.Info("routine call to stop pktvisor", "routine", ctx.Value(config.ContextKey("routine")))
defer p.cancelFunc()

// Clean up temporary config file (but keep path for potential restart)
if p.configFile != "" {
if err := os.Remove(p.configFile); err != nil && !os.IsNotExist(err) {
p.logger.Warn("failed to remove pktvisor temp config file",
"file", p.configFile,
"error", err)
} else if err == nil {
p.logger.Debug("removed pktvisor temp config file", "file", p.configFile)
}
}

err := p.proc.Stop()
finalStatus := <-p.statusChan
if err != nil {
Expand All @@ -337,6 +349,15 @@ func (p *pktvisorBackend) Configure(logger *slog.Logger, repo policies.PolicyRep
p.adminAPIPort = defaultAPIPort
p.agentLabels = common.Otlp.AgentLabels

// Clean up old temp config file if it exists
if p.configFile != "" {
if err := os.Remove(p.configFile); err != nil && !os.IsNotExist(err) {
p.logger.Warn("failed to remove old pktvisor temp config file",
"file", p.configFile,
"error", err)
}
}

// Create temp config file
tmpDir := os.TempDir()
tmpFile, err := os.CreateTemp(tmpDir, "pktvisor-*.yaml")
Expand Down Expand Up @@ -432,6 +453,7 @@ func (p *pktvisorBackend) FullReset(ctx context.Context) error {
}
}

// Note: Stop() already cleaned up temp file, but kept path for restart
// for each policy, restart the scraper
backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, config.ContextKey("routine"), "pktvisor"))

Expand Down
2 changes: 1 addition & 1 deletion agent/backend/snmpdiscovery/snmp_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ backend.Backend = (*snmpDiscoveryBackend)(nil)
const (
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down
2 changes: 1 addition & 1 deletion agent/backend/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ backend.Backend = (*workerBackend)(nil)
const (
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessBackoff = 12
applyPolicyTimeout = 10
removePolicyTimeout = 20
statusTimeout = 5
Expand Down
26 changes: 14 additions & 12 deletions agent/docker/orb-agent-entry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,24 @@ trap agentstop2 SIGTERM
# eternal loop
while true
do
# pid file dont exist
if [ ! -f "/var/run/orb-agent.pid" ]; then
# running orb-agent in background
# pid file exists
if [ -f "/var/run/orb-agent.pid" ]; then
PID=$(cat /var/run/orb-agent.pid)
if [ ! -d "/proc/$PID" ]; then
# Process not running, clean stale PID file and continue to start agent
echo "Cleaning stale PID file for $PID (process not running)"
rm /var/run/orb-agent.pid
# Fall through to next iteration which will start agent
else
# Process is running, wait
sleep 5
fi
else
# pid file doesn't exist, start agent
nohup /run-agent.sh "${agent_args[@]}" &
sleep 2
if [ -d "/nohup.out" ]; then
tail -f /nohup.out &
fi
else
PID=$(cat /var/run/orb-agent.pid)
if [ ! -d "/proc/$PID" ]; then
# stop container
echo "$PID is not running"
rm /var/run/orb-agent.pid
exit 1
fi
sleep 5
fi
done
17 changes: 16 additions & 1 deletion agent/otlpbridge/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"net"
"strings"
"sync"
"syscall"

collectorlogs "go.opentelemetry.io/proto/otlp/collector/logs/v1"
collectormetrics "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
collectortrace "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sys/unix"
"google.golang.org/grpc"

"github.com/netboxlabs/orb-agent/agent/policies"
Expand Down Expand Up @@ -105,7 +107,20 @@ func (s *BridgeServer) GetPolicyRepo() policies.PolicyRepo {
// Start starts the gRPC server without establishing MQTT.
// Publisher and topic should be set before OTLP data arrives.
func (s *BridgeServer) Start(_ context.Context) error {
lis, err := net.Listen("tcp", s.cfg.ListenAddr)
// Use ListenConfig to enable SO_REUSEADDR for faster port reuse after restart
lc := net.ListenConfig{
Control: func(_, _ string, c syscall.RawConn) error {
var sockOptErr error
if err := c.Control(func(fd uintptr) {
// Enable SO_REUSEADDR to allow binding to TIME_WAIT sockets
sockOptErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
}); err != nil {
return err
}
return sockOptErr
},
}
lis, err := lc.Listen(context.Background(), "tcp", s.cfg.ListenAddr)
if err != nil {
return fmt.Errorf("failed to listen on %s (port may be in use by another service): %w", s.cfg.ListenAddr, err)
}
Expand Down
Loading