File: rawwebsocket.go

package info (click to toggle)
golang-github-igm-sockjs-go 3.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid, trixie
  • size: 512 kB
  • sloc: javascript: 39; makefile: 5
file content (134 lines) | stat: -rw-r--r-- 3,230 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package sockjs

import (
	"encoding/json"
	"net/http"
	"time"

	"github.com/gorilla/websocket"
)

func (h *handler) rawWebsocket(rw http.ResponseWriter, req *http.Request) {
	var conn *websocket.Conn
	var err error
	if h.options.WebsocketUpgrader != nil {
		conn, err = h.options.WebsocketUpgrader.Upgrade(rw, req, nil)
	} else {
		// use default as before, so that those 2 buffer size variables are used as before
		conn, err = websocket.Upgrade(rw, req, nil, WebSocketReadBufSize, WebSocketWriteBufSize)
	}

	if _, ok := err.(websocket.HandshakeError); ok {
		http.Error(rw, `Can "Upgrade" only to "WebSocket".`, http.StatusBadRequest)
		return
	} else if err != nil {
		rw.WriteHeader(http.StatusInternalServerError)
		return
	}

	sessID := ""
	sess := newSession(req, sessID, h.options.DisconnectDelay, h.options.HeartbeatDelay)
	sess.raw = true

	receiver := newRawWsReceiver(conn, h.options.WebsocketWriteTimeout)
	sess.attachReceiver(receiver)
	if h.handlerFunc != nil {
		go h.handlerFunc(sess)
	}
	readCloseCh := make(chan struct{})
	go func() {
		for {
			frameType, p, err := conn.ReadMessage()
			if err != nil {
				close(readCloseCh)
				return
			}
			if frameType == websocket.TextMessage || frameType == websocket.BinaryMessage {
				sess.accept(string(p))
			}
		}
	}()

	select {
	case <-readCloseCh:
	case <-receiver.doneNotify():
	}
	sess.close()
	conn.Close()
}

type rawWsReceiver struct {
	conn         *websocket.Conn
	closeCh      chan struct{}
	writeTimeout time.Duration
}

func newRawWsReceiver(conn *websocket.Conn, writeTimeout time.Duration) *rawWsReceiver {
	return &rawWsReceiver{
		conn:         conn,
		closeCh:      make(chan struct{}),
		writeTimeout: writeTimeout,
	}
}

func (w *rawWsReceiver) sendBulk(messages ...string) {
	if len(messages) > 0 {
		for _, m := range messages {
			if w.writeTimeout != 0 {
				w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout))
			}
			err := w.conn.WriteMessage(websocket.TextMessage, []byte(m))
			if err != nil {
				w.close()
				break
			}

		}
	}
}

func (w *rawWsReceiver) sendFrame(frame string) {
	if w.writeTimeout != 0 {
		w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout))
	}
	var err error
	if frame == "h" {
		err = w.conn.WriteMessage(websocket.PingMessage, []byte{})
	} else if len(frame) > 0 && frame[0] == 'c' {
		status, reason := parseCloseFrame(frame)
		msg := websocket.FormatCloseMessage(int(status), reason)
		err = w.conn.WriteMessage(websocket.CloseMessage, msg)
	} else {
		err = w.conn.WriteMessage(websocket.TextMessage, []byte(frame))
	}
	if err != nil {
		w.close()
	}
}

func parseCloseFrame(frame string) (status uint32, reason string) {
	var items [2]interface{}
	json.Unmarshal([]byte(frame)[1:], &items)
	statusF, _ := items[0].(float64)
	status = uint32(statusF)
	reason, _ = items[1].(string)
	return
}

func (w *rawWsReceiver) close() {
	select {
	case <-w.closeCh: // already closed
	default:
		close(w.closeCh)
	}
}
func (w *rawWsReceiver) canSend() bool {
	select {
	case <-w.closeCh: // already closed
		return false
	default:
		return true
	}
}
func (w *rawWsReceiver) doneNotify() <-chan struct{}        { return w.closeCh }
func (w *rawWsReceiver) interruptedNotify() <-chan struct{} { return nil }