Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 112 additions & 8 deletions internal/mjpeg/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,31 @@ import (
)

func Init() {
var cfg struct {
Mod struct {
SnapshotCache bool `yaml:"snapshot_cache"`
SnapshotCacheTimeout int `yaml:"snapshot_cache_timeout"`
SnapshotServeCachedByDefault bool `yaml:"snapshot_serve_cached_by_default"`
} `yaml:"mjpeg"`
}

// Defaults
cfg.Mod.SnapshotCache = true
cfg.Mod.SnapshotCacheTimeout = 600
cfg.Mod.SnapshotServeCachedByDefault = false

app.LoadConfig(&cfg)

// Store global config
snapshotCacheEnabled = cfg.Mod.SnapshotCache
snapshotCacheTimeout = time.Duration(cfg.Mod.SnapshotCacheTimeout) * time.Second
snapshotServeCachedByDefault = cfg.Mod.SnapshotServeCachedByDefault

// Handle special values
if cfg.Mod.SnapshotCacheTimeout < 0 {
snapshotCacheEnabled = false
}

api.HandleFunc("api/frame.jpeg", handlerKeyframe)
api.HandleFunc("api/stream.mjpeg", handlerStream)
api.HandleFunc("api/stream.ascii", handlerStream)
Expand All @@ -35,13 +60,69 @@ func Init() {

var log zerolog.Logger

var (
snapshotCacheEnabled bool
snapshotCacheTimeout time.Duration
snapshotServeCachedByDefault bool
)

func getSnapshotCacheTimeout() time.Duration {
if !snapshotCacheEnabled {
return -1 // disabled
}
return snapshotCacheTimeout
}

func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
stream, _ := streams.GetOrPatch(r.URL.Query())
if stream == nil {
http.Error(w, api.StreamNotFound, http.StatusNotFound)
return
}

query := r.URL.Query()

// Determine if client wants cached snapshot
// Priority: query param > global config
allowCached := snapshotServeCachedByDefault
if query.Has("cached") {
allowCached = query.Get("cached") != "false" && query.Get("cached") != "0"
}

// Start/reset snapshot cache (if enabled)
streamName := query.Get("src")
stream.TouchSnapshotCache(streamName, getSnapshotCacheTimeout(), transcodeToJPEG)

// Try to serve from cache if allowed
if allowCached {
if b, timestamp, exists := stream.GetCachedSnapshot(); exists {
age := time.Since(timestamp)

log.Trace().
Dur("age_ms", age).
Int("size", len(b)).
Msg("[mjpeg] serving cached snapshot")

w.Header().Set("Content-Type", "image/jpeg")
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
w.Header().Set("X-Snapshot-Age-Ms", strconv.Itoa(int(age.Milliseconds())))
w.Header().Set("X-Snapshot-Timestamp", timestamp.Format(time.RFC3339Nano))
w.Header().Set("X-Snapshot-Cached", "true")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "close")
w.Header().Set("Pragma", "no-cache")

if _, err := w.Write(b); err != nil {
log.Error().Err(err).Caller().Send()
}
return
}
}

// Client wants fresh snapshot OR no cache available yet
// Use traditional blocking approach
log.Debug().Bool("allow_cached", allowCached).Msg("[mjpeg] fetching fresh snapshot")

cons := magic.NewKeyframe()
cons.WithRequest(r)

Expand All @@ -50,7 +131,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
return
}

once := &core.OnceBuffer{} // init and first frame
once := &core.OnceBuffer{}
_, _ = cons.WriteTo(once)
b := once.Buffer()

Expand All @@ -60,7 +141,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
case core.CodecH264, core.CodecH265:
ts := time.Now()
var err error
if b, err = ffmpeg.JPEGWithQuery(b, r.URL.Query()); err != nil {
if b, err = ffmpeg.JPEGWithQuery(b, query); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -69,12 +150,14 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
b = mjpeg.FixJPEG(b)
}

h := w.Header()
h.Set("Content-Type", "image/jpeg")
h.Set("Content-Length", strconv.Itoa(len(b)))
h.Set("Cache-Control", "no-cache")
h.Set("Connection", "close")
h.Set("Pragma", "no-cache")
w.Header().Set("Content-Type", "image/jpeg")
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
w.Header().Set("X-Snapshot-Age-Ms", "0")
w.Header().Set("X-Snapshot-Timestamp", time.Now().Format(time.RFC3339Nano))
w.Header().Set("X-Snapshot-Cached", "false")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "close")
w.Header().Set("Pragma", "no-cache")

if _, err := w.Write(b); err != nil {
log.Error().Err(err).Caller().Send()
Expand Down Expand Up @@ -189,3 +272,24 @@ func apiStreamY4M(w http.ResponseWriter, r *http.Request) {

stream.RemoveConsumer(cons)
}

// transcodeToJPEG is injected into snapshot cache to avoid import cycles
func transcodeToJPEG(b []byte, codecName string) ([]byte, error) {
switch codecName {
case core.CodecH264, core.CodecH265:
// Transcode via FFmpeg (no query params for cached version)
return ffmpeg.JPEGWithScale(b, -1, -1)

case core.CodecJPEG:
// Fix JPEG headers if needed
return mjpeg.FixJPEG(b), nil

case core.CodecRAW:
// Should already be encoded by Encoder(), skip
return nil, errors.New("raw codec not supported in cache")

default:
// Unsupported codec
return nil, errors.New("unsupported codec: " + codecName)
}
}
223 changes: 223 additions & 0 deletions internal/streams/snapshot_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package streams

import (
"io"
"sync/atomic"
"time"

"github.com/AlexxIT/go2rtc/pkg/magic"
)

type SnapshotCacher struct {
name string // stream name for logging
consumer *magic.Keyframe
stream *Stream
idleTimer *time.Timer
idleTimeout time.Duration
stopped atomic.Bool
transcodeFunc func([]byte, string) ([]byte, error) // injected from mjpeg module
}

// GetCachedSnapshot returns cached JPEG and timestamp (if available)
func (s *Stream) GetCachedSnapshot() (data []byte, timestamp time.Time, exists bool) {
s.cachedJPEGMu.RLock()
defer s.cachedJPEGMu.RUnlock()

if s.cachedJPEG == nil {
return nil, time.Time{}, false
}

return s.cachedJPEG, s.cachedJPEGTime, true
}

// TouchSnapshotCache starts cacher if needed, resets idle timer
func (s *Stream) TouchSnapshotCache(name string, timeout time.Duration, transcodeFunc func([]byte, string) ([]byte, error)) {
if timeout < 0 {
return // caching disabled
}

// Check cache age BEFORE acquiring snapshotCacherMu to avoid deadlock
s.cachedJPEGMu.RLock()
cacheAge := time.Since(s.cachedJPEGTime)
s.cachedJPEGMu.RUnlock()

s.snapshotCacherMu.Lock()
defer s.snapshotCacherMu.Unlock()

if s.snapshotCacher != nil {
maxStaleTime := timeout * 2
if maxStaleTime > 0 && cacheAge > maxStaleTime {
log.Warn().Msgf("[snapshot-cache] touch: stream=%s cache is stale (age=%v > max=%v), forcing restart",
name, cacheAge, maxStaleTime)
s.snapshotCacher.stop()
s.snapshotCacher = nil
// Fall through to start new cacher
} else {
// Already running and healthy, just reset idle timer
log.Trace().Msgf("[snapshot-cache] touch: stream=%s already running (cache age=%v), resetting timer", name, cacheAge)
s.snapshotCacher.resetIdleTimer()
return
}
}

// Start new background cacher (will retry on every request if nil)
log.Trace().Msgf("[snapshot-cache] touch: stream=%s cacher is nil, attempting to start", name)
s.snapshotCacher = s.startSnapshotCacher(name, timeout, transcodeFunc)
if s.snapshotCacher == nil {
log.Warn().Msgf("[snapshot-cache] touch: stream=%s failed to start cacher, will retry on next request", name)
// Leave s.snapshotCacher as nil so next request will retry
// Old cached snapshot remains available
} else {
log.Debug().Msgf("[snapshot-cache] touch: stream=%s successfully started cacher", name)
}
}

func (s *Stream) startSnapshotCacher(name string, timeout time.Duration, transcodeFunc func([]byte, string) ([]byte, error)) *SnapshotCacher {
cons := magic.NewKeyframe()

cacher := &SnapshotCacher{
name: name,
consumer: cons,
stream: s,
idleTimeout: timeout,
transcodeFunc: transcodeFunc,
}

log.Trace().Msgf("[snapshot-cache] stream=%s creating cacher with timeout=%v", name, timeout)

// Set up idle timer (0 = never timeout)
if timeout > 0 {
cacher.idleTimer = time.AfterFunc(timeout, func() {
log.Debug().Msgf("[snapshot-cache] stream=%s idle timeout reached, stopping", name)
cacher.stop()
})
}

// Add as persistent consumer
// This may trigger producer start, or piggyback on existing connection
log.Trace().Msgf("[snapshot-cache] stream=%s attempting to add consumer", name)
if err := s.AddConsumer(cons); err != nil {
log.Warn().Err(err).Msgf("[snapshot-cache] stream=%s failed to add consumer", name)
return nil
}

// Start background processing loop
go cacher.run()

log.Trace().Msgf("[snapshot-cache] stream=%s started successfully", name)

return cacher
}

func (c *SnapshotCacher) run() {
log.Trace().Msgf("[snapshot-cache] stream=%s run loop starting", c.name)
defer log.Trace().Msgf("[snapshot-cache] stream=%s run loop exiting", c.name)

// Use custom writer that updates cache on each keyframe write
cacheWriter := &snapshotCacheWriter{cacher: c}

// WriteTo will block here and stream keyframes continuously
// until the connection closes or an error occurs
log.Trace().Msgf("[snapshot-cache] stream=%s calling consumer.WriteTo", c.name)
bytesWritten, err := c.consumer.WriteTo(cacheWriter)

if err != nil && !c.stopped.Load() {
log.Warn().Err(err).Msgf("[snapshot-cache] stream=%s consumer error after %d bytes", c.name, bytesWritten)
} else {
log.Trace().Msgf("[snapshot-cache] stream=%s consumer.WriteTo completed normally, bytes=%d", c.name, bytesWritten)
}

// Clean up on exit
c.stop()

// Clear the cacher reference so it can be restarted
c.stream.snapshotCacherMu.Lock()
c.stream.snapshotCacher = nil
c.stream.snapshotCacherMu.Unlock()
}

// snapshotCacheWriter implements io.Writer and updates cache on each Write
type snapshotCacheWriter struct {
cacher *SnapshotCacher
}

func (w *snapshotCacheWriter) Write(b []byte) (n int, err error) {
if w.cacher.stopped.Load() {
log.Trace().Msgf("[snapshot-cache] stream=%s write called but cacher stopped", w.cacher.name)
return 0, io.ErrClosedPipe
}

log.Trace().Msgf("[snapshot-cache] stream=%s received frame size=%d", w.cacher.name, len(b))

// Make a copy since b may be reused by the producer
frame := make([]byte, len(b))
copy(frame, b)

// Transcode to JPEG if needed using injected function
codecName := w.cacher.consumer.CodecName()
log.Trace().Msgf("[snapshot-cache] stream=%s codec=%s, transcoding", w.cacher.name, codecName)

if w.cacher.transcodeFunc != nil {
frame, err = w.cacher.transcodeFunc(frame, codecName)
if err != nil {
log.Warn().Err(err).Msgf("[snapshot-cache] stream=%s codec=%s transcode failed", w.cacher.name, codecName)
return len(b), nil // Return success to continue receiving frames
}
log.Trace().Msgf("[snapshot-cache] stream=%s transcode successful, jpeg size=%d", w.cacher.name, len(frame))
} else {
log.Warn().Msgf("[snapshot-cache] stream=%s codec=%s no transcode function", w.cacher.name, codecName)
return len(b), nil
}

// Update cache atomically
timestamp := time.Now()
w.cacher.stream.cachedJPEGMu.Lock()
w.cacher.stream.cachedJPEG = frame
w.cacher.stream.cachedJPEGTime = timestamp
w.cacher.stream.cachedJPEGMu.Unlock()

log.Trace().Msgf("[snapshot-cache] stream=%s codec=%s updated cache, size=%d, timestamp=%s",
w.cacher.name, codecName, len(frame), timestamp.Format(time.RFC3339Nano))

return len(b), nil
}

func (c *SnapshotCacher) resetIdleTimer() {
if c.idleTimer != nil && !c.stopped.Load() {
c.idleTimer.Reset(c.idleTimeout)
log.Trace().Msgf("[snapshot-cache] stream=%s idle timer reset to %v", c.name, c.idleTimeout)
}
}

func (c *SnapshotCacher) stop() {
if !c.stopped.CompareAndSwap(false, true) {
log.Trace().Msgf("[snapshot-cache] stream=%s stop called but already stopped", c.name)
return
}

log.Trace().Msgf("[snapshot-cache] stream=%s stopping", c.name)

// Stop the timer
if c.idleTimer != nil {
log.Trace().Msgf("[snapshot-cache] stream=%s stopping idle timer", c.name)
c.idleTimer.Stop()
}

// Close the consumer transport to unblock WriteTo
log.Trace().Msgf("[snapshot-cache] stream=%s stopping consumer", c.name)
if err := c.consumer.Stop(); err != nil {
log.Trace().Err(err).Msgf("[snapshot-cache] stream=%s consumer close error", c.name)
}

// Remove consumer (may stop producer if no other consumers)
log.Trace().Msgf("[snapshot-cache] stream=%s removing consumer from stream", c.name)
c.stream.RemoveConsumer(c.consumer)

log.Trace().Msgf("[snapshot-cache] stream=%s stop complete, cached snapshot retained in memory", c.name)
}

// stopAndClear stops the cacher and clears the reference - MUST be called with snapshotCacherMu held
func (c *SnapshotCacher) stopAndClear() {
c.stop()
// Caller must hold snapshotCacherMu and clear c.stream.snapshotCacher = nil
}
Loading