From 917e94896518de8242a6195e1fba5a68a3d03021 Mon Sep 17 00:00:00 2001 From: seydx Date: Sat, 3 Jan 2026 17:24:16 +0100 Subject: [PATCH 1/4] add IsClosed() to consumer for early exit during prod.Dial() --- internal/streams/add_consumer.go | 10 ++++++++++ internal/webrtc/webrtc.go | 5 +++++ pkg/core/connection.go | 4 ++++ pkg/core/core.go | 2 ++ pkg/webrtc/conn.go | 21 +++++++++++++++++++-- 5 files changed, 40 insertions(+), 2 deletions(-) diff --git a/internal/streams/add_consumer.go b/internal/streams/add_consumer.go index 7400ce6e2..6366e6dd2 100644 --- a/internal/streams/add_consumer.go +++ b/internal/streams/add_consumer.go @@ -39,6 +39,11 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) { continue } + if cons.IsClosed() { + log.Trace().Msgf("[streams] consumer closed during dial cons=%d prod=%d", consN, prodN) + break producers + } + // Step 2. Get producer medias (not tracks yet) for _, prodMedia := range prod.GetMedias() { log.Trace().Msgf("[streams] check cons=%d prod=%d media=%s", consN, prodN, prodMedia) @@ -98,6 +103,11 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) { s.stopProducers() } + // Check if consumer was closed during dial + if cons.IsClosed() { + return errors.New("streams: consumer closed during dial") + } + if len(prodStarts) == 0 { return formatError(consMedias, prodMedias, prodErrors) } diff --git a/internal/webrtc/webrtc.go b/internal/webrtc/webrtc.go index 2a5b4ad66..0a461b730 100644 --- a/internal/webrtc/webrtc.go +++ b/internal/webrtc/webrtc.go @@ -171,6 +171,11 @@ func asyncHandler(tr *ws.Transport, msg *ws.Message) (err error) { conn.Mode = mode conn.Protocol = "ws" conn.UserAgent = tr.Request.UserAgent() + + tr.OnClose(func() { + conn.CloseTransport() + }) + conn.Listen(func(msg any) { switch msg := msg.(type) { case pion.PeerConnectionState: diff --git a/pkg/core/connection.go b/pkg/core/connection.go index cc0f43e4a..e5a7b5701 100644 --- a/pkg/core/connection.go +++ b/pkg/core/connection.go @@ -128,6 +128,10 @@ func (c *Connection) GetSource() string { return c.Source } +func (c *Connection) IsClosed() bool { + return false +} + // Create like os.Create, init Consumer with existing Transport func Create(w io.Writer) (*Connection, error) { return &Connection{Transport: w}, nil diff --git a/pkg/core/core.go b/pkg/core/core.go index 9555ecfa0..7d2b6ebcb 100644 --- a/pkg/core/core.go +++ b/pkg/core/core.go @@ -65,6 +65,8 @@ type Consumer interface { AddTrack(media *Media, codec *Codec, track *Receiver) error + IsClosed() bool + // Deprecated: rename to Close() Stop() error } diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index 924fd5504..3ca06de4b 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -20,8 +20,9 @@ type Conn struct { pc *webrtc.PeerConnection - offer string - closed core.Waiter + offer string + closed core.Waiter + transportClosed bool } func NewConn(pc *webrtc.PeerConnection) *Conn { @@ -156,6 +157,22 @@ func (c *Conn) Close() error { return c.pc.Close() } +// IsClosed returns true if the WebRTC connection is no longer active +func (c *Conn) IsClosed() bool { + if c.transportClosed { + return true + } + state := c.pc.ConnectionState() + return state == webrtc.PeerConnectionStateDisconnected || + state == webrtc.PeerConnectionStateFailed || + state == webrtc.PeerConnectionStateClosed +} + +// CloseTransport marks the transport (e.g. WebSocket) as closed +func (c *Conn) CloseTransport() { + c.transportClosed = true +} + func (c *Conn) AddCandidate(candidate string) error { // pion uses only candidate value from json/object candidate struct return c.pc.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate}) From 2361a463d15ba59a783397f525a5f584c0d2c6ea Mon Sep 17 00:00:00 2001 From: seydx Date: Sat, 3 Jan 2026 17:32:42 +0100 Subject: [PATCH 2/4] cleanup --- pkg/webrtc/conn.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index 3ca06de4b..e8c837c77 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -157,7 +157,6 @@ func (c *Conn) Close() error { return c.pc.Close() } -// IsClosed returns true if the WebRTC connection is no longer active func (c *Conn) IsClosed() bool { if c.transportClosed { return true @@ -168,7 +167,6 @@ func (c *Conn) IsClosed() bool { state == webrtc.PeerConnectionStateClosed } -// CloseTransport marks the transport (e.g. WebSocket) as closed func (c *Conn) CloseTransport() { c.transportClosed = true } From d70b35445b90d24b9c108b905b3584fc6245c834 Mon Sep 17 00:00:00 2001 From: seydx Date: Sun, 4 Jan 2026 18:14:27 +0100 Subject: [PATCH 3/4] update remaining clients with IsClosed --- pkg/bubble/client.go | 4 ++++ pkg/isapi/client.go | 4 ++++ pkg/roborock/client.go | 7 +++++++ pkg/tapo/client.go | 4 ++++ 4 files changed, 19 insertions(+) diff --git a/pkg/bubble/client.go b/pkg/bubble/client.go index 7a71d5557..3b7d1e0b3 100644 --- a/pkg/bubble/client.go +++ b/pkg/bubble/client.go @@ -264,3 +264,7 @@ func (c *Client) Handle() error { func (c *Client) Close() error { return c.conn.Close() } + +func (c *Client) IsClosed() bool { + return c.conn == nil +} diff --git a/pkg/isapi/client.go b/pkg/isapi/client.go index ba3e68874..ab713cceb 100644 --- a/pkg/isapi/client.go +++ b/pkg/isapi/client.go @@ -162,3 +162,7 @@ func (c *Client) Close() (err error) { // Enabled string `xml:"enabled"` // Codec string `xml:"audioCompressionType"` //} + +func (c *Client) IsClosed() bool { + return c.conn == nil +} diff --git a/pkg/roborock/client.go b/pkg/roborock/client.go index 4940b74cd..8c94a9e1c 100644 --- a/pkg/roborock/client.go +++ b/pkg/roborock/client.go @@ -379,3 +379,10 @@ func (c *Client) Request(method string, args any) (err error) { return } + +func (c *Client) IsClosed() bool { + if c.conn == nil { + return true + } + return c.conn.IsClosed() +} diff --git a/pkg/tapo/client.go b/pkg/tapo/client.go index e52250c33..44da5d378 100644 --- a/pkg/tapo/client.go +++ b/pkg/tapo/client.go @@ -405,3 +405,7 @@ func securityEncode(s string) string { return string(b) } + +func (c *Client) IsClosed() bool { + return c.conn1 == nil +} From b1de7bd46ee3993a9d96a831974ebcf4b4a02faa Mon Sep 17 00:00:00 2001 From: seydx Date: Thu, 8 Jan 2026 14:30:31 +0100 Subject: [PATCH 4/4] update clients with IsClosed --- pkg/hass/client.go | 4 ++++ pkg/nest/client.go | 4 ++++ pkg/ring/client.go | 4 ++++ pkg/tuya/client.go | 4 ++++ 4 files changed, 16 insertions(+) diff --git a/pkg/hass/client.go b/pkg/hass/client.go index a9ea02641..13a215200 100644 --- a/pkg/hass/client.go +++ b/pkg/hass/client.go @@ -113,6 +113,10 @@ func (c *Client) Stop() error { return c.conn.Stop() } +func (c *Client) IsClosed() bool { + return c.conn.IsClosed() +} + func (c *Client) MarshalJSON() ([]byte, error) { return c.conn.MarshalJSON() } diff --git a/pkg/nest/client.go b/pkg/nest/client.go index 6a570913a..882cfe7c8 100644 --- a/pkg/nest/client.go +++ b/pkg/nest/client.go @@ -92,6 +92,10 @@ func (c *WebRTCClient) Stop() error { return c.conn.Stop() } +func (c *WebRTCClient) IsClosed() bool { + return c.conn.IsClosed() +} + func (c *WebRTCClient) MarshalJSON() ([]byte, error) { return c.conn.MarshalJSON() } diff --git a/pkg/ring/client.go b/pkg/ring/client.go index fb77e1984..fcac7e341 100644 --- a/pkg/ring/client.go +++ b/pkg/ring/client.go @@ -350,6 +350,10 @@ func (c *Client) Stop() error { return nil } +func (c *Client) IsClosed() bool { + return c.closed +} + func (c *Client) MarshalJSON() ([]byte, error) { return json.Marshal(c.prod) } diff --git a/pkg/tuya/client.go b/pkg/tuya/client.go index 3043a8d27..cb6ed7552 100644 --- a/pkg/tuya/client.go +++ b/pkg/tuya/client.go @@ -469,6 +469,10 @@ func (c *Client) Close(err error) error { return c.Stop() } +func (c *Client) IsClosed() bool { + return c.closed +} + func (c *Client) MarshalJSON() ([]byte, error) { return c.conn.MarshalJSON() }