File: server.go

package info (click to toggle)
gnss-share 0.8.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 208 kB
  • sloc: makefile: 69
file content (197 lines) | stat: -rw-r--r-- 4,975 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright 2021 Clayton Craft <clayton@craftyguy.net>
// SPDX-License-Identifier: GPL-3.0-or-later

package server

import (
	"bytes"
	"errors"
	"fmt"
	"io"
	"net"
	"os"
	"os/user"
	"strconv"
	"time"

	"gitlab.com/postmarketOS/gnss-share/internal/pubsub"
)

type Server struct {
	sock      net.Listener
	socket    string
	sockGroup string
}

// New initializes a new Server. socket is the path to the unix socket that the
// server will create and listen on, and sockGroup is the group owner for the
// socket.
func New(socket string, sockGroup string) (s *Server) {
	s = &Server{
		socket:    socket,
		sockGroup: sockGroup,
	}

	return
}

func (s *Server) Stop() {
	// #nosec G104 - ignore socket close failures on purpose
	s.sock.Close()
}

func (s *Server) Start(subscriber pubsub.Subscriber, publisher io.Writer) (err error) {
	if err := os.RemoveAll(s.socket); err != nil {
		return fmt.Errorf("startServer(): %w", err)
	}

	s.sock, err = net.Listen("unix", s.socket)
	if err != nil {
		return fmt.Errorf("startServer(): %w", err)
	}

	defer func() {
		cerr := s.sock.Close()
		if err == nil {
			err = cerr
		}
	}()

	// #nosec G302 - group needs R/W access
	if err := os.Chmod(s.socket, 0660); err != nil {
		return fmt.Errorf("startServer(): %w", err)
	}

	group, err := user.LookupGroup(s.sockGroup)
	if err != nil {
		return fmt.Errorf("startServer(): %w", err)
	}

	gid, err := strconv.ParseInt(group.Gid, 10, 16)
	if err != nil {
		return fmt.Errorf("startServer(): %w", err)
	}

	if err := os.Chown(s.socket, -1, int(gid)); err != nil {
		return fmt.Errorf("startServer(): %w", err)
	}

	fmt.Println("Starting GNSS server, accepting connections at: ", s.socket)

	for {
		if conn, err := (s.sock).Accept(); err != nil {
			fmt.Println("server: connection accept failed: ", err)
			continue
		} else {
			go func() {
				if sub, err := subscriber.Subscribe(); err != nil {
					fmt.Println("server: unable to subscribe to topic: ", err)
					// #nosec G104 - ignore conn close failures on purpose
					conn.Close()
				} else {
					s.clientConnection(conn, sub, publisher)
				}
			}()
		}
	}
}

// Routine run for each client connection
func (s *Server) clientConnection(conn net.Conn, sub *pubsub.Subscription, pub io.Writer) {

	defer func() {
		// Note: close() could potentially be called twice, e.g. once on socket
		// read failure and again here, however upon inspection of the Go
		// implementation for net.Conn.UnixConn this doesn't seem to matter...
		// #nosec G104 - ignore conn close failures on purpose
		conn.Close()
		fmt.Println("Client disconnected")
		if err := sub.Unsubscribe(); err != nil {
			fmt.Println("server: connection unsubscribe failed: ", err)
		}
	}()

	fmt.Println("Client connected")

	// This "read" goroutine conveniently serves as a trigger for when the client
	// closes the connection. Without it, the "write" for loop later will hang
	// indefinitely if no data is sent to the subscription channel. The "done"
	// chan is closed when this "read" goroutine returns, which will happen if
	// the client disconnects.
	done := make(chan struct{})
	go func() {
		// #nosec G307 - ignore conn close failures on purpose
		defer conn.Close()
		defer close(done)

		for {
			buf := make([]byte, 1024)
			size, err := conn.Read(buf)
			if err != nil {
				if !errors.Is(err, io.EOF) {
					fmt.Println("server: unable to read from socket: ", err)
				}
				return
			}
			if size == 0 {
				continue
			}
			if _, err := pub.Write(buf); err != nil {
				fmt.Println("server: unable to send received data to sub: ", err)
			}
		}
	}()

	for {
		// read all data (inluding buffered) from the channel
		// if there's anything to send, send it
		// don't spin in a loop, and don't use time.Sleep, it should block on channel read

		var msg []byte
		select {
		case <-done:
			return
		case data, ok := <-sub.Messages:
			if !ok {
				fmt.Println("server: unable to read from client subscription channel")
				return
			}
			msg = data
		}
		// This receive loop will slurp up any data that is in the channel
		// buffers
	receive_loop:
		for {
			select {
			case data := <-sub.Messages:
				msg = append(msg, data...)
			case <-done:
				return
			default:
				break receive_loop
			}
		}
		if bytes.Equal(msg, []byte{}) {
			continue
		}

		if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {
			// Note: this is only supported in Go 1.16... uncomment when the
			// min version is increased to at least that version::
			// if !errors.Is(err, net.ErrClosed) {
			// 	fmt.Println("server: client unable to set socket write deadline: ", err)
			// }
			return
		}

		if _, err := conn.Write(msg); err != nil {
			if errors.Is(err, os.ErrDeadlineExceeded) {
				// Client hasn't read in a long time, so just bail for now. Maybe
				// later this could retry a few times or wait longer?
				fmt.Println("server: client hasn't read from socket in 5s, disconnecting it")
				return
			}
			return
		}
	}
}