File: webrtcconn.go

package info (click to toggle)
snowflake 2.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,104 kB
  • sloc: makefile: 5
file content (177 lines) | stat: -rw-r--r-- 4,031 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package snowflake_proxy

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log"
	"net"
	"regexp"
	"sync"
	"time"

	"github.com/pion/ice/v2"
	"github.com/pion/sdp/v3"
	"github.com/pion/webrtc/v3"
)

const maxBufferedAmount uint64 = 512 * 1024 // 512 KB

var remoteIPPatterns = []*regexp.Regexp{
	/* IPv4 */
	regexp.MustCompile(`(?m)^c=IN IP4 ([\d.]+)(?:(?:\/\d+)?\/\d+)?(:? |\r?\n)`),
	/* IPv6 */
	regexp.MustCompile(`(?m)^c=IN IP6 ([0-9A-Fa-f:.]+)(?:\/\d+)?(:? |\r?\n)`),
}

type webRTCConn struct {
	dc *webrtc.DataChannel
	pc *webrtc.PeerConnection
	pr *io.PipeReader

	lock sync.Mutex // Synchronization for DataChannel destruction
	once sync.Once  // Synchronization for PeerConnection destruction

	isClosing bool

	inactivityTimeout time.Duration
	activity          chan struct{}
	sendMoreCh        chan struct{}
	cancelTimeoutLoop context.CancelFunc

	bytesLogger bytesLogger
}

func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, bytesLogger bytesLogger) *webRTCConn {
	conn := &webRTCConn{pc: pc, dc: dc, pr: pr, bytesLogger: bytesLogger}
	conn.isClosing = false
	conn.activity = make(chan struct{}, 100)
	conn.sendMoreCh = make(chan struct{}, 1)
	conn.inactivityTimeout = 30 * time.Second
	ctx, cancel := context.WithCancel(context.Background())
	conn.cancelTimeoutLoop = cancel
	go conn.timeoutLoop(ctx)
	return conn
}

func (c *webRTCConn) timeoutLoop(ctx context.Context) {
	timer := time.NewTimer(c.inactivityTimeout)
	for {
		select {
		case <-timer.C:
			_ = c.Close()
			log.Println("Closed connection due to inactivity")
			return
		case <-c.activity:
			if !timer.Stop() {
				<-timer.C
			}
			timer.Reset(c.inactivityTimeout)
			continue
		case <-ctx.Done():
			return
		}
	}
}

func (c *webRTCConn) Read(b []byte) (int, error) {
	return c.pr.Read(b)
}

func (c *webRTCConn) Write(b []byte) (int, error) {
	c.bytesLogger.AddInbound(int64(len(b)))
	select {
	case c.activity <- struct{}{}:
	default:
	}
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.dc != nil {
		_ = c.dc.Send(b)
		if !c.isClosing && c.dc.BufferedAmount() >= maxBufferedAmount {
			<-c.sendMoreCh
		}
	}
	return len(b), nil
}

func (c *webRTCConn) Close() (err error) {
	c.isClosing = true
	select {
	case c.sendMoreCh <- struct{}{}:
	default:
	}
	c.once.Do(func() {
		c.cancelTimeoutLoop()
		err = errors.Join(c.pr.Close(), c.pc.Close())
	})
	return
}

func (c *webRTCConn) LocalAddr() net.Addr {
	return nil
}

func (c *webRTCConn) RemoteAddr() net.Addr {
	//Parse Remote SDP offer and extract client IP
	clientIP := remoteIPFromSDP(c.pc.RemoteDescription().SDP)
	if clientIP == nil {
		return nil
	}
	return &net.IPAddr{IP: clientIP, Zone: ""}
}

func (c *webRTCConn) SetDeadline(t time.Time) error {
	// nolint: golint
	return fmt.Errorf("SetDeadline not implemented")
}

func (c *webRTCConn) SetReadDeadline(t time.Time) error {
	// nolint: golint
	return fmt.Errorf("SetReadDeadline not implemented")
}

func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
	// nolint: golint
	return fmt.Errorf("SetWriteDeadline not implemented")
}

func remoteIPFromSDP(str string) net.IP {
	// Look for remote IP in "a=candidate" attribute fields
	// https://tools.ietf.org/html/rfc5245#section-15.1
	var desc sdp.SessionDescription
	err := desc.Unmarshal([]byte(str))
	if err != nil {
		log.Println("Error parsing SDP: ", err.Error())
		return nil
	}
	for _, m := range desc.MediaDescriptions {
		for _, a := range m.Attributes {
			if a.IsICECandidate() {
				c, err := ice.UnmarshalCandidate(a.Value)
				if err == nil {
					ip := net.ParseIP(c.Address())
					if ip != nil && isRemoteAddress(ip) {
						return ip
					}
				}
			}
		}
	}
	// Finally look for remote IP in "c=" Connection Data field
	// https://tools.ietf.org/html/rfc4566#section-5.7
	for _, pattern := range remoteIPPatterns {
		m := pattern.FindStringSubmatch(str)
		if m != nil {
			// Ignore parsing errors, ParseIP returns nil.
			ip := net.ParseIP(m[1])
			if ip != nil && isRemoteAddress(ip) {
				return ip
			}

		}
	}

	return nil
}