From 6dd2969bf0c816ab5524776009f47561527922fa Mon Sep 17 00:00:00 2001 From: Ilya Glebov Date: Fri, 6 Feb 2026 10:19:32 +0100 Subject: [PATCH] Smart reconnect on unpause and simplify audio architecture - Add smart reconnect: when cumulative pause time exceeds 5s, reconnect to get fresh audio instead of playing stale buffered data - Remove unused circular buffer code (~100 lines) - Remove buffer_seconds config option (was unused for actual buffering) - Track pause duration to determine when reconnect is needed - Clear lastError on successful playback to fix "Reconnect failed" false positive --- README.md | 1 - cmd/somafm/main.go | 8 +- internal/config/config.go | 40 +++----- internal/config/config_test.go | 44 --------- internal/player/player.go | 171 ++++++++++++++------------------- internal/player/player_test.go | 116 +++++++++------------- 6 files changed, 132 insertions(+), 248 deletions(-) diff --git a/README.md b/README.md index 9019399..d0678b8 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,6 @@ Configuration is saved automatically to `~/.config/somafm/config.yml`. ```yaml volume: 70 # Volume level (0-100) -buffer_seconds: 5 # Audio buffer size (0-60 seconds) last_station: groovesalad # Last played station ID autostart: false # Auto-play last station on launch (true/false) favorites: # List of favorite station IDs diff --git a/cmd/somafm/main.go b/cmd/somafm/main.go index 7b7e052..129f974 100644 --- a/cmd/somafm/main.go +++ b/cmd/somafm/main.go @@ -80,12 +80,6 @@ func main() { } } - cfg, err := config.Load() - if err != nil { - log.Warn().Err(err).Msg("Failed to load config, using defaults") - cfg = config.DefaultConfig() - } - if *debugFlag { if configPath, err := config.GetConfigPath(); err == nil { log.Debug().Msgf("Config: %s", configPath) @@ -97,7 +91,7 @@ func main() { apiClient := api.NewSomaFMClient() stationService := service.NewStationService(apiClient) - somaPlayer := player.NewPlayer(cfg.BufferSeconds) + somaPlayer := player.NewPlayer() somaUi := ui.NewUI(somaPlayer, stationService, *randomFlag) sigChan := make(chan os.Signal, 1) diff --git a/internal/config/config.go b/internal/config/config.go index 5f5fe0b..424e317 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -21,14 +21,11 @@ const ( AppDonateURL = "https://somafm.com/donate/" AppDonateShort = "somafm.com/donate" - ConfigDir = ".config/somafm" - ConfigFileName = "config.yml" - DefaultVolume = 70 - MinVolume = 0 - MaxVolume = 100 - DefaultBufferSecs = 5 - MinBufferSecs = 0 - MaxBufferSecs = 60 + ConfigDir = ".config/somafm" + ConfigFileName = "config.yml" + DefaultVolume = 70 + MinVolume = 0 + MaxVolume = 100 ) // ClampVolume ensures volume is within the valid range [0, 100]. @@ -63,12 +60,11 @@ type Theme struct { } type Config struct { - Volume int `yaml:"volume"` - BufferSeconds int `yaml:"buffer_seconds"` - LastStation string `yaml:"last_station"` - Autostart bool `yaml:"autostart"` - Favorites []string `yaml:"favorites"` - Theme Theme `yaml:"theme"` + Volume int `yaml:"volume"` + LastStation string `yaml:"last_station"` + Autostart bool `yaml:"autostart"` + Favorites []string `yaml:"favorites"` + Theme Theme `yaml:"theme"` } func GetConfigPath() (string, error) { @@ -103,13 +99,6 @@ func Load() (*Config, error) { cfg.Volume = ClampVolume(cfg.Volume) - if cfg.BufferSeconds < MinBufferSecs { - cfg.BufferSeconds = MinBufferSecs - } - if cfg.BufferSeconds > MaxBufferSecs { - cfg.BufferSeconds = MaxBufferSecs - } - return cfg, nil } @@ -161,11 +150,10 @@ func (c *Config) Save() error { func DefaultConfig() *Config { return &Config{ - Volume: DefaultVolume, - BufferSeconds: DefaultBufferSecs, - LastStation: "", - Autostart: false, - Favorites: []string{}, + Volume: DefaultVolume, + LastStation: "", + Autostart: false, + Favorites: []string{}, Theme: Theme{ Background: "#1a1b25", Foreground: "#a3aacb", diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 4d1076b..dd21dcb 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -412,50 +412,6 @@ func TestGetColor(t *testing.T) { } } -func TestBufferSecondsValidation(t *testing.T) { - tests := []struct { - name string - inputBuffer int - expectedBuffer int - }{ - {"valid buffer 5", 5, 5}, - {"valid buffer 0", 0, 0}, - {"valid buffer 60", 60, 60}, - {"negative buffer", -10, MinBufferSecs}, - {"buffer over 60", 100, MaxBufferSecs}, - {"buffer way over max", 1000, MaxBufferSecs}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tmpDir := t.TempDir() - originalHome := os.Getenv("HOME") - os.Setenv("HOME", tmpDir) - defer os.Setenv("HOME", originalHome) - - testCfg := &Config{ - Volume: 70, - BufferSeconds: tt.inputBuffer, - Theme: DefaultConfig().Theme, - } - - err := testCfg.Save() - if err != nil { - t.Fatalf("Save() error = %v", err) - } - - loadedCfg, err := Load() - if err != nil { - t.Fatalf("Load() error = %v", err) - } - - if loadedCfg.BufferSeconds != tt.expectedBuffer { - t.Errorf("Load().BufferSeconds = %d, want %d", loadedCfg.BufferSeconds, tt.expectedBuffer) - } - }) - } -} - func TestFavoritesPersistence(t *testing.T) { tmpDir := t.TempDir() originalHome := os.Getenv("HOME") diff --git a/internal/player/player.go b/internal/player/player.go index cd443a5..2a302bc 100644 --- a/internal/player/player.go +++ b/internal/player/player.go @@ -31,7 +31,8 @@ const ( VolumeCurveExponent = 0.5 MinVolumeDB = -10.0 ReadTimeout = 10 * time.Second - MaxErrorsToKeep = 10 // Limit error accumulation during retries + MaxErrorsToKeep = 10 + MaxPlaybackDelay = 5 * time.Second ) type PlayerState int @@ -116,7 +117,7 @@ func (cr *contextReader) Read(p []byte) (n int, err error) { } // Player handles audio streaming and playback for SomaFM radio stations. -// It manages the audio pipeline including network streaming, decoding, buffering, +// It manages the audio pipeline including network streaming, decoding, // and volume control. type Player struct { format beep.Format @@ -128,17 +129,12 @@ type Player struct { isPlaying bool speakerInit bool volumePercent int - httpClient *http.Client // Reused for all stream connections + httpClient *http.Client - buffer [][2]float64 - bufferSize int - writeIdx int64 - readBackOffset int - bufferMu sync.Mutex sampleCh chan [2]float64 wg sync.WaitGroup streamDone chan struct{} - streamDoneOnce sync.Once // Prevents double-close panic on streamDone + streamDoneOnce sync.Once streamErr chan error currentTrack string @@ -155,6 +151,9 @@ type Player struct { currentStation *station.Station streamAlive bool streamAliveMu sync.RWMutex + + pausedAt time.Time + totalPausedMs int64 } // closeStreamDone safely closes the streamDone channel exactly once. @@ -167,24 +166,14 @@ func (p *Player) closeStreamDone() { }) } -// NewPlayer creates a new Player with the specified buffer size in seconds. -func NewPlayer(bufferSeconds int) *Player { - var buffer [][2]float64 - if bufferSeconds > 0 { - bufferLen := int(DefaultSampleRate) * bufferSeconds - buffer = make([][2]float64, bufferLen) - log.Debug().Msgf("Initialized circular buffer: %d seconds (%d samples, ~%.2f MB)", - bufferSeconds, bufferLen, float64(bufferLen*2*8)/1000000) - } - - // Create a reusable HTTP client with appropriate settings for streaming +func NewPlayer() *Player { httpClient := &http.Client{ - Timeout: 0, // No timeout for streaming connections + Timeout: 0, Transport: &http.Transport{ DisableKeepAlives: false, MaxIdleConns: 10, IdleConnTimeout: 90 * time.Second, - DisableCompression: true, // Audio streams are already compressed + DisableCompression: true, }, } @@ -199,8 +188,6 @@ func NewPlayer(bufferSeconds int) *Player { isPlaying: false, volumePercent: -1, httpClient: httpClient, - buffer: buffer, - bufferSize: bufferSeconds, currentTrack: "", } } @@ -249,28 +236,78 @@ func (p *Player) Stop() { func (p *Player) TogglePause() { p.mu.Lock() - defer p.mu.Unlock() if p.ctrl == nil || !p.isPlaying { + p.mu.Unlock() return } + if p.isPaused && !p.pausedAt.IsZero() { + pauseDuration := time.Since(p.pausedAt) + p.totalPausedMs += pauseDuration.Milliseconds() + totalPaused := time.Duration(p.totalPausedMs) * time.Millisecond + + if totalPaused > MaxPlaybackDelay { + p.pausedAt = time.Time{} + p.totalPausedMs = 0 + p.mu.Unlock() + log.Debug().Msgf("Total paused %v (>%v), reconnecting", totalPaused, MaxPlaybackDelay) + go p.Reconnect() + return + } + } + speaker.Lock() p.ctrl.Paused = !p.ctrl.Paused p.isPaused = p.ctrl.Paused speaker.Unlock() if p.isPaused { + p.pausedAt = time.Now() p.stateMu.Lock() p.state = StatePaused p.stateMu.Unlock() log.Debug().Msg("Playback paused") } else { + p.pausedAt = time.Time{} p.stateMu.Lock() p.state = StatePlaying p.stateMu.Unlock() log.Debug().Msg("Playback resumed") } + + p.mu.Unlock() +} + +func (p *Player) GetPlaybackDelay() time.Duration { + p.mu.Lock() + defer p.mu.Unlock() + + total := time.Duration(p.totalPausedMs) * time.Millisecond + if !p.pausedAt.IsZero() { + total += time.Since(p.pausedAt) + } + return total +} + +func (p *Player) Reconnect() { + p.mu.Lock() + station := p.currentStation + p.mu.Unlock() + + if station == nil { + return + } + + p.setState(StateReconnecting) + p.Stop() + + err := p.Play(station) + if err != nil && !errors.Is(err, context.Canceled) { + log.Error().Err(err).Msg("Reconnect failed") + p.setState(StateError) + p.setLastError("Reconnect failed") + } } func (p *Player) SetVolume(volumePercent int) { @@ -384,23 +421,6 @@ func (p *Player) setStreamInfo(info StreamInfo) { log.Debug().Msgf("Stream info: %s %dk %dHz", info.Format, info.Bitrate, info.SampleRate) } -func (p *Player) GetBufferFillPercent() int { - if len(p.buffer) == 0 { - return 0 - } - - p.bufferMu.Lock() - defer p.bufferMu.Unlock() - - bufferLen := int64(len(p.buffer)) - fillLevel := p.writeIdx - if fillLevel > bufferLen { - fillLevel = bufferLen - } - - return int((fillLevel * 100) / bufferLen) -} - // GetBufferHealth returns the current buffer fill level as a percentage (0-100). func (p *Player) GetBufferHealth() int { p.mu.Lock() @@ -620,22 +640,12 @@ func (p *Player) playStreamURL(ctx context.Context, s *station.Station, streamUR p.mu.Lock() p.sampleCh = make(chan [2]float64, SampleChannelSize) p.streamDone = make(chan struct{}) - p.streamDoneOnce = sync.Once{} // Reset for new stream + p.streamDoneOnce = sync.Once{} p.streamErr = make(chan error, 1) + p.pausedAt = time.Time{} + p.totalPausedMs = 0 p.mu.Unlock() - if len(p.buffer) > 0 { - p.bufferMu.Lock() - for i := range p.buffer { - p.buffer[i] = [2]float64{0, 0} - } - p.writeIdx = 0 - p.readBackOffset = 0 - p.bufferMu.Unlock() - } - - // Use context-aware reader for timeout detection - // The goroutine in contextReader will exit when context is cancelled timeoutBody := &contextReader{ reader: resp.Body, ctx: ctx, @@ -711,7 +721,8 @@ func (p *Player) playStreamURL(ctx context.Context, s *station.Station, streamUR p.streamInfo.SampleRate = int(format.SampleRate) p.stateMu.Unlock() - log.Debug().Msgf("Now playing: %s (buffer: %ds)", s.Title, p.bufferSize) + p.setLastError("") + log.Debug().Msgf("Now playing: %s", s.Title) select { case <-ctx.Done(): @@ -904,13 +915,6 @@ func (p *Player) decodeAndBuffer(ctx context.Context, streamer beep.StreamSeekCl case <-p.streamDone: return case p.sampleCh <- sample: - if len(p.buffer) > 0 { - p.bufferMu.Lock() - idx := p.writeIdx % int64(len(p.buffer)) - p.buffer[idx] = sample - p.writeIdx++ - p.bufferMu.Unlock() - } } } } @@ -922,54 +926,19 @@ type bufferedStreamerWrapper struct { } func (b *bufferedStreamerWrapper) Stream(samples [][2]float64) (n int, ok bool) { - p := b.player - i := 0 - - if len(p.buffer) > 0 { - i = b.readFromBuffer(samples) - if i == len(samples) { - return i, true - } - } - - return b.readFromChannel(samples, i) -} - -func (b *bufferedStreamerWrapper) readFromBuffer(samples [][2]float64) int { - p := b.player - i := 0 - - p.bufferMu.Lock() - defer p.bufferMu.Unlock() - - for p.readBackOffset < 0 && i < len(samples) { - bufLen := int64(len(p.buffer)) - idx := (p.writeIdx + int64(p.readBackOffset) + bufLen) % bufLen - - if idx >= p.writeIdx { - break - } - - samples[i] = p.buffer[idx] - p.readBackOffset++ - i++ - } - - return i + return b.readFromChannel(samples) } -func (b *bufferedStreamerWrapper) readFromChannel(samples [][2]float64, startIdx int) (n int, ok bool) { +func (b *bufferedStreamerWrapper) readFromChannel(samples [][2]float64) (n int, ok bool) { p := b.player - i := startIdx - for i < len(samples) { + for i := 0; i < len(samples); i++ { select { case sample, more := <-p.sampleCh: if !more { return i, i > 0 } samples[i] = sample - i++ case <-p.streamDone: return i, i > 0 } diff --git a/internal/player/player_test.go b/internal/player/player_test.go index d3e7bab..b55dfb6 100644 --- a/internal/player/player_test.go +++ b/internal/player/player_test.go @@ -71,48 +71,18 @@ func TestPlayerStateString(t *testing.T) { } func TestNewPlayer(t *testing.T) { - tests := []struct { - bufferSeconds int - expectBuffer bool - }{ - {0, false}, - {5, true}, - {10, true}, - } - - for _, tt := range tests { - t.Run(fmt.Sprintf("buffer_%ds", tt.bufferSeconds), func(t *testing.T) { - p := NewPlayer(tt.bufferSeconds) - - if p == nil { - t.Fatal("NewPlayer returned nil") - } - - if tt.expectBuffer && len(p.buffer) == 0 { - t.Error("Expected buffer to be allocated") - } + p := NewPlayer() - if !tt.expectBuffer && len(p.buffer) != 0 { - t.Error("Expected no buffer") - } - - if p.isPlaying { - t.Error("New player should not be playing") - } - - if p.isPaused { - t.Error("New player should not be paused") - } - }) + if p == nil { + t.Fatal("NewPlayer returned nil") } -} -func TestNewPlayerBufferSize(t *testing.T) { - p := NewPlayer(5) - expectedSize := int(DefaultSampleRate) * 5 + if p.isPlaying { + t.Error("New player should not be playing") + } - if len(p.buffer) != expectedSize { - t.Errorf("Buffer size = %d, want %d", len(p.buffer), expectedSize) + if p.isPaused { + t.Error("New player should not be paused") } } @@ -200,7 +170,7 @@ func TestParseStreamInfoFromURL(t *testing.T) { } func TestPlayerTrackManagement(t *testing.T) { - p := NewPlayer(0) + p := NewPlayer() initial := p.GetCurrentTrack() if initial != "Waiting for track info..." { @@ -227,7 +197,7 @@ func TestPlayerTrackManagement(t *testing.T) { } func TestPlayerStateManagement(t *testing.T) { - p := NewPlayer(0) + p := NewPlayer() if p.GetState() != StateIdle { t.Errorf("Initial state = %v, want StateIdle", p.GetState()) @@ -245,7 +215,7 @@ func TestPlayerStateManagement(t *testing.T) { } func TestPlayerRetryInfo(t *testing.T) { - p := NewPlayer(0) + p := NewPlayer() current, max := p.GetRetryInfo() if current != 0 || max != 0 { @@ -260,7 +230,7 @@ func TestPlayerRetryInfo(t *testing.T) { } func TestPlayerLastError(t *testing.T) { - p := NewPlayer(0) + p := NewPlayer() if p.GetLastError() != "" { t.Errorf("Initial error = %q, want empty", p.GetLastError()) @@ -273,7 +243,7 @@ func TestPlayerLastError(t *testing.T) { } func TestPlayerSessionDuration(t *testing.T) { - p := NewPlayer(0) + p := NewPlayer() if p.GetSessionDuration() != 0 { t.Error("Initial session duration should be 0") @@ -288,34 +258,42 @@ func TestPlayerSessionDuration(t *testing.T) { } } -func TestPlayerBufferStats(t *testing.T) { - t.Run("no buffer", func(t *testing.T) { - p := NewPlayer(0) +func TestPlayerDelayTracking(t *testing.T) { + p := NewPlayer() - if p.GetBufferFillPercent() != 0 { - t.Error("Buffer fill should be 0 with no buffer") - } - if p.GetBufferHealth() != 0 { - t.Error("Buffer health should be 0 with no sample channel") - } - }) + if delay := p.GetPlaybackDelay(); delay != 0 { + t.Errorf("Initial delay = %v, want 0", delay) + } - t.Run("with buffer", func(t *testing.T) { - p := NewPlayer(1) + p.mu.Lock() + p.totalPausedMs = 3000 + p.mu.Unlock() - p.bufferMu.Lock() - p.writeIdx = int64(len(p.buffer) / 2) - p.bufferMu.Unlock() + delay := p.GetPlaybackDelay() + if delay < 2*time.Second || delay > 4*time.Second { + t.Errorf("Delay = %v, expected ~3s", delay) + } - fill := p.GetBufferFillPercent() - if fill != 50 { - t.Errorf("Buffer fill = %d%%, want 50%%", fill) - } - }) + p.mu.Lock() + p.pausedAt = time.Now().Add(-2 * time.Second) + p.mu.Unlock() + + delay = p.GetPlaybackDelay() + if delay < 4*time.Second || delay > 6*time.Second { + t.Errorf("Delay with active pause = %v, expected ~5s", delay) + } +} + +func TestPlayerBufferHealth(t *testing.T) { + p := NewPlayer() + + if p.GetBufferHealth() != 0 { + t.Error("Buffer health should be 0 with no sample channel") + } } func TestPlayerStreamInfo(t *testing.T) { - p := NewPlayer(0) + p := NewPlayer() info := p.GetStreamInfo() if info.Format != "" || info.Bitrate != 0 { @@ -336,7 +314,7 @@ func TestPlayerStreamInfo(t *testing.T) { } func TestPlayerIsPlayingIsPaused(t *testing.T) { - p := NewPlayer(0) + p := NewPlayer() if p.IsPlaying() { t.Error("New player should not be playing") @@ -381,7 +359,7 @@ Title3=Stream 3 })) defer server.Close() - p := NewPlayer(0) + p := NewPlayer() ctx := context.Background() urls, err := p.fetchAndParsePLS(ctx, server.URL) @@ -412,7 +390,7 @@ func TestFetchAndParsePLSEmpty(t *testing.T) { })) defer server.Close() - p := NewPlayer(0) + p := NewPlayer() ctx := context.Background() _, err := p.fetchAndParsePLS(ctx, server.URL) @@ -422,7 +400,7 @@ func TestFetchAndParsePLSEmpty(t *testing.T) { } func TestFetchAndParsePLSInvalidServer(t *testing.T) { - p := NewPlayer(0) + p := NewPlayer() ctx := context.Background() _, err := p.fetchAndParsePLS(ctx, "http://invalid.invalid.invalid/test.pls") @@ -495,7 +473,7 @@ func (b *blockingReader) Read(p []byte) (int, error) { } func TestGetCurrentStation(t *testing.T) { - p := NewPlayer(0) + p := NewPlayer() if p.GetCurrentStation() != nil { t.Error("Initial station should be nil")