1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
|
//
// This file is part of pluggable-discovery-protocol-handler.
//
// Copyright 2024 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to modify or
// otherwise use the software for commercial activities involving the Arduino
// software without disclosing the source code of your own applications. To purchase
// a commercial license, send an email to license@arduino.cc.
//
package discovery
import (
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/arduino/go-paths-helper"
)
// Client is a tool that detects communication ports to interact
// with the boards.
type Client struct {
id string
processArgs []string
process *paths.Process
outgoingCommandsPipe io.Writer
incomingMessagesChan <-chan *discoveryMessage
userAgent string
logger ClientLogger
// All the following fields are guarded by statusMutex
statusMutex sync.Mutex
incomingMessagesError error
eventChan chan<- *Event
}
// ClientLogger is the interface that must be implemented by a logger
// to be used in the discovery client.
type ClientLogger interface {
Debugf(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
type nullClientLogger struct{}
func (l *nullClientLogger) Debugf(format string, args ...interface{}) {}
func (l *nullClientLogger) Errorf(format string, args ...interface{}) {}
type discoveryMessage struct {
EventType string `json:"eventType"`
Message string `json:"message"`
Error bool `json:"error"`
ProtocolVersion int `json:"protocolVersion"` // Used in HELLO command
Ports []*Port `json:"ports"` // Used in LIST command
Port *Port `json:"port"` // Used in add and remove events
}
func (msg discoveryMessage) String() string {
s := fmt.Sprintf("type: %s", msg.EventType)
if msg.Message != "" {
s += fmt.Sprintf(", message: %s", msg.Message)
}
if msg.ProtocolVersion != 0 {
s += fmt.Sprintf(", protocol version: %d", msg.ProtocolVersion)
}
if len(msg.Ports) > 0 {
s += fmt.Sprintf(", ports: %s", msg.Ports)
}
if msg.Port != nil {
s += fmt.Sprintf(", port: %s", msg.Port)
}
return s
}
// Event is a pluggable discovery event
type Event struct {
Type string
Port *Port
DiscoveryID string
}
// NewClient create a new pluggable discovery client
func NewClient(id string, args ...string) *Client {
return &Client{
id: id,
processArgs: args,
userAgent: "pluggable-discovery-protocol-handler",
logger: &nullClientLogger{},
}
}
// SetUserAgent sets the user agent to be used in the discovery
func (disc *Client) SetUserAgent(userAgent string) {
disc.userAgent = userAgent
}
// SetLogger sets the logger to be used in the discovery
func (disc *Client) SetLogger(logger ClientLogger) {
disc.logger = logger
}
// GetID returns the identifier for this discovery
func (disc *Client) GetID() string {
return disc.id
}
func (disc *Client) String() string {
return disc.id
}
func (disc *Client) jsonDecodeLoop(in io.Reader, outChan chan<- *discoveryMessage) {
decoder := json.NewDecoder(in)
closeAndReportError := func(err error) {
disc.statusMutex.Lock()
disc.incomingMessagesError = err
disc.stopSync()
disc.killProcess()
disc.statusMutex.Unlock()
close(outChan)
if err != nil {
disc.logger.Errorf("Stopped decode loop: %v", err)
} else {
disc.logger.Debugf("Stopped decode loop")
}
}
for {
var msg discoveryMessage
if err := decoder.Decode(&msg); err != nil {
closeAndReportError(err)
return
}
disc.logger.Debugf("Received message %s", msg)
if msg.EventType == "add" {
if msg.Port == nil {
closeAndReportError(errors.New("invalid 'add' message: missing port"))
return
}
disc.statusMutex.Lock()
if disc.eventChan != nil {
disc.eventChan <- &Event{"add", msg.Port, disc.GetID()}
}
disc.statusMutex.Unlock()
} else if msg.EventType == "remove" {
if msg.Port == nil {
closeAndReportError(errors.New("invalid 'remove' message: missing port"))
return
}
disc.statusMutex.Lock()
if disc.eventChan != nil {
disc.eventChan <- &Event{"remove", msg.Port, disc.GetID()}
}
disc.statusMutex.Unlock()
} else {
outChan <- &msg
}
}
}
// Alive returns true if the discovery is running and false otherwise.
func (disc *Client) Alive() bool {
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
return disc.process != nil
}
func (disc *Client) waitMessage(timeout time.Duration) (*discoveryMessage, error) {
select {
case msg := <-disc.incomingMessagesChan:
if msg == nil {
disc.statusMutex.Lock()
err := disc.incomingMessagesError
disc.statusMutex.Unlock()
return nil, err
}
return msg, nil
case <-time.After(timeout):
return nil, fmt.Errorf("timeout waiting for message from %s", disc)
}
}
func (disc *Client) sendCommand(command string) error {
disc.logger.Debugf("Sending command %s", strings.TrimSpace(command))
data := []byte(command)
for {
n, err := disc.outgoingCommandsPipe.Write(data)
if err != nil {
return err
}
if n == len(data) {
return nil
}
data = data[n:]
}
}
func (disc *Client) runProcess() error {
disc.logger.Debugf("Starting discovery process")
proc, err := paths.NewProcess(nil, disc.processArgs...)
if err != nil {
return err
}
stdout, err := proc.StdoutPipe()
if err != nil {
return err
}
stdin, err := proc.StdinPipe()
if err != nil {
return err
}
disc.outgoingCommandsPipe = stdin
messageChan := make(chan *discoveryMessage)
disc.incomingMessagesChan = messageChan
go disc.jsonDecodeLoop(stdout, messageChan)
if err := proc.Start(); err != nil {
return err
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
disc.process = proc
disc.logger.Debugf("Discovery process started")
return nil
}
func (disc *Client) killProcess() {
disc.logger.Debugf("Killing discovery process")
if process := disc.process; process != nil {
disc.process = nil
if err := process.Kill(); err != nil {
disc.logger.Errorf("Killing discovery process: %v", err)
}
if err := process.Wait(); err != nil {
disc.logger.Errorf("Waiting discovery process termination: %v", err)
}
}
disc.logger.Debugf("Discovery process killed")
}
// Run starts the discovery executable process and sends the HELLO command to the discovery to agree on the
// pluggable discovery protocol. This must be the first command to run in the communication with the discovery.
// If the process is started but the HELLO command fails the process is killed.
func (disc *Client) Run() (err error) {
if err = disc.runProcess(); err != nil {
return err
}
defer func() {
// If the discovery process is started successfully but the HELLO handshake
// fails the discovery is an unusable state, we kill the process to avoid
// further issues down the line.
if err == nil {
return
}
disc.statusMutex.Lock()
disc.killProcess()
disc.statusMutex.Unlock()
}()
if err = disc.sendCommand("HELLO 1 \"arduino-cli " + disc.userAgent + "\"\n"); err != nil {
return err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return fmt.Errorf("calling HELLO: %w", err)
} else if msg.EventType != "hello" {
return fmt.Errorf("event out of sync, expected 'hello', received '%s'", msg.EventType)
} else if msg.Error {
return fmt.Errorf("command failed: %s", msg.Message)
} else if strings.ToUpper(msg.Message) != "OK" {
return fmt.Errorf("communication out of sync, expected 'OK', received '%s'", msg.Message)
} else if msg.ProtocolVersion > 1 {
return fmt.Errorf("protocol version not supported: requested 1, got %d", msg.ProtocolVersion)
}
return nil
}
// Start initializes and start the discovery internal subroutines. This command must be
// called before List.
func (disc *Client) Start() error {
if err := disc.sendCommand("START\n"); err != nil {
return err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return fmt.Errorf("calling START: %w", err)
} else if msg.EventType != "start" {
return fmt.Errorf("event out of sync, expected 'start', received '%s'", msg.EventType)
} else if msg.Error {
return fmt.Errorf("command failed: %s", msg.Message)
} else if strings.ToUpper(msg.Message) != "OK" {
return fmt.Errorf("communication out of sync, expected 'OK', received '%s'", msg.Message)
}
return nil
}
// Stop stops the discovery internal subroutines and possibly free the internally
// used resources. This command should be called if the client wants to pause the
// discovery for a while.
func (disc *Client) Stop() error {
if err := disc.sendCommand("STOP\n"); err != nil {
return err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return fmt.Errorf("calling STOP: %w", err)
} else if msg.EventType != "stop" {
return fmt.Errorf("event out of sync, expected 'stop', received '%s'", msg.EventType)
} else if msg.Error {
return fmt.Errorf("command failed: %s", msg.Message)
} else if strings.ToUpper(msg.Message) != "OK" {
return fmt.Errorf("communication out of sync, expected 'OK', received '%s'", msg.Message)
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
disc.stopSync()
return nil
}
func (disc *Client) stopSync() {
if disc.eventChan != nil {
disc.eventChan <- &Event{"stop", nil, disc.GetID()}
close(disc.eventChan)
disc.eventChan = nil
}
}
// Quit terminates the discovery. No more commands can be accepted by the discovery.
func (disc *Client) Quit() {
_ = disc.sendCommand("QUIT\n")
if _, err := disc.waitMessage(time.Second * 5); err != nil {
disc.logger.Errorf("Quitting discovery: %s", err)
}
disc.statusMutex.Lock()
disc.stopSync()
disc.killProcess()
disc.statusMutex.Unlock()
}
// List executes an enumeration of the ports and returns a list of the available
// ports at the moment of the call.
func (disc *Client) List() ([]*Port, error) {
if err := disc.sendCommand("LIST\n"); err != nil {
return nil, err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return nil, fmt.Errorf("calling LIST: %w", err)
} else if msg.EventType != "list" {
return nil, fmt.Errorf("event out of sync, expected 'list', received '%s'", msg.EventType)
} else if msg.Error {
return nil, fmt.Errorf("command failed: %s", msg.Message)
} else {
return msg.Ports, nil
}
}
// StartSync puts the discovery in "events" mode: the discovery will send "add"
// and "remove" events each time a new port is detected or removed respectively.
// After calling StartSync an initial burst of "add" events may be generated to
// report all the ports available at the moment of the start.
// It also creates a channel used to receive events from the pluggable discovery.
// The event channel must be consumed as quickly as possible since it may block the
// discovery if it becomes full. The channel size is configurable.
func (disc *Client) StartSync(size int) (<-chan *Event, error) {
if err := disc.sendCommand("START_SYNC\n"); err != nil {
return nil, err
}
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
return nil, fmt.Errorf("calling START_SYNC: %w", err)
} else if msg.EventType != "start_sync" {
return nil, fmt.Errorf("evemt out of sync, expected 'start_sync', received '%s'", msg.EventType)
} else if msg.Error {
return nil, fmt.Errorf("command failed: %s", msg.Message)
} else if strings.ToUpper(msg.Message) != "OK" {
return nil, fmt.Errorf("communication out of sync, expected 'OK', received '%s'", msg.Message)
}
// In case there is already an existing event channel in use we close it before creating a new one.
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
disc.stopSync()
c := make(chan *Event, size)
disc.eventChan = c
return c, nil
}
|