Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/streams/add_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
continue
}

if cons.IsClosed() {
log.Trace().Msgf("[streams] consumer closed during dial cons=%d prod=%d", consN, prodN)
break producers
}

// Step 2. Get producer medias (not tracks yet)
for _, prodMedia := range prod.GetMedias() {
log.Trace().Msgf("[streams] check cons=%d prod=%d media=%s", consN, prodN, prodMedia)
Expand Down Expand Up @@ -98,6 +103,11 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
s.stopProducers()
}

// Check if consumer was closed during dial
if cons.IsClosed() {
return errors.New("streams: consumer closed during dial")
}

if len(prodStarts) == 0 {
return formatError(consMedias, prodMedias, prodErrors)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/webrtc/webrtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ func asyncHandler(tr *ws.Transport, msg *ws.Message) (err error) {
conn.Mode = mode
conn.Protocol = "ws"
conn.UserAgent = tr.Request.UserAgent()

tr.OnClose(func() {
conn.CloseTransport()
})

conn.Listen(func(msg any) {
switch msg := msg.(type) {
case pion.PeerConnectionState:
Expand Down
4 changes: 4 additions & 0 deletions pkg/bubble/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,7 @@ func (c *Client) Handle() error {
func (c *Client) Close() error {
return c.conn.Close()
}

func (c *Client) IsClosed() bool {
return c.conn == nil
}
4 changes: 4 additions & 0 deletions pkg/core/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (c *Connection) GetSource() string {
return c.Source
}

func (c *Connection) IsClosed() bool {
return false
}

// Create like os.Create, init Consumer with existing Transport
func Create(w io.Writer) (*Connection, error) {
return &Connection{Transport: w}, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Consumer interface {

AddTrack(media *Media, codec *Codec, track *Receiver) error

IsClosed() bool

// Deprecated: rename to Close()
Stop() error
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/hass/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (c *Client) Stop() error {
return c.conn.Stop()
}

func (c *Client) IsClosed() bool {
return c.conn.IsClosed()
}

func (c *Client) MarshalJSON() ([]byte, error) {
return c.conn.MarshalJSON()
}
4 changes: 4 additions & 0 deletions pkg/isapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,7 @@ func (c *Client) Close() (err error) {
// Enabled string `xml:"enabled"`
// Codec string `xml:"audioCompressionType"`
//}

func (c *Client) IsClosed() bool {
return c.conn == nil
}
4 changes: 4 additions & 0 deletions pkg/nest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (c *WebRTCClient) Stop() error {
return c.conn.Stop()
}

func (c *WebRTCClient) IsClosed() bool {
return c.conn.IsClosed()
}

func (c *WebRTCClient) MarshalJSON() ([]byte, error) {
return c.conn.MarshalJSON()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ring/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ func (c *Client) Stop() error {
return nil
}

func (c *Client) IsClosed() bool {
return c.closed
}

func (c *Client) MarshalJSON() ([]byte, error) {
return json.Marshal(c.prod)
}
7 changes: 7 additions & 0 deletions pkg/roborock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,10 @@ func (c *Client) Request(method string, args any) (err error) {

return
}

func (c *Client) IsClosed() bool {
if c.conn == nil {
return true
}
return c.conn.IsClosed()
}
4 changes: 4 additions & 0 deletions pkg/tapo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,7 @@ func securityEncode(s string) string {

return string(b)
}

func (c *Client) IsClosed() bool {
return c.conn1 == nil
}
4 changes: 4 additions & 0 deletions pkg/tuya/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ func (c *Client) Close(err error) error {
return c.Stop()
}

func (c *Client) IsClosed() bool {
return c.closed
}

func (c *Client) MarshalJSON() ([]byte, error) {
return c.conn.MarshalJSON()
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/webrtc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type Conn struct {

pc *webrtc.PeerConnection

offer string
closed core.Waiter
offer string
closed core.Waiter
transportClosed bool
}

func NewConn(pc *webrtc.PeerConnection) *Conn {
Expand Down Expand Up @@ -156,6 +157,20 @@ func (c *Conn) Close() error {
return c.pc.Close()
}

func (c *Conn) IsClosed() bool {
if c.transportClosed {
return true
}
state := c.pc.ConnectionState()
return state == webrtc.PeerConnectionStateDisconnected ||
state == webrtc.PeerConnectionStateFailed ||
state == webrtc.PeerConnectionStateClosed
}

func (c *Conn) CloseTransport() {
c.transportClosed = true
}

func (c *Conn) AddCandidate(candidate string) error {
// pion uses only candidate value from json/object candidate struct
return c.pc.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate})
Expand Down