1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
// SPDX-FileCopyrightText: 2023 Greenbone AG
//
// SPDX-License-Identifier: AGPL-3.0-or-later
package mqtt
import (
"context"
"encoding/json"
"fmt"
"net"
"github.com/eclipse/paho.golang/paho"
)
// TopicData is a tuple for Topic and Message.
type TopicData struct {
Topic string
Message []byte
}
// MQTT is connection type for
type MQTT struct {
client *paho.Client
connectProperties *paho.Connect
qos byte
incoming chan *TopicData // Is used to send respons messages of a handler downwards
}
func (m MQTT) Incoming() <-chan *TopicData {
return m.incoming
}
func (m MQTT) Close() error {
close(m.incoming)
return m.client.Disconnect(&paho.Disconnect{ReasonCode: 0})
}
func (m MQTT) register(topic string) error {
m.client.Router.RegisterHandler(topic, func(p *paho.Publish) {
m.incoming <- &TopicData{Topic: topic, Message: p.Payload}
})
_, err := m.client.Subscribe(context.Background(), &paho.Subscribe{
// we need NoLocal otherwise we would consum our own messages
// again and ack them.
Subscriptions: map[string]paho.SubscribeOptions{
topic: {QoS: m.qos, NoLocal: true},
},
},
)
return err
}
func (m MQTT) Subscribe(topics ...string) error {
for _, t := range topics {
if err := m.register(t); err != nil {
return err
}
}
return nil
}
func (m MQTT) Publish(topic string, message interface{}) error {
b, err := json.Marshal(message)
if err != nil {
return err
}
props := &paho.PublishProperties{}
pb := &paho.Publish{
Topic: topic,
QoS: m.qos,
Payload: b,
Properties: props,
}
_, err = m.client.Publish(context.Background(), pb)
return err
}
func (m MQTT) Connect() error {
ca, err := m.client.Connect(context.Background(), m.connectProperties)
if err != nil {
return err
}
if ca.ReasonCode != 0 {
return fmt.Errorf(
"failed to connect to %s : %d - %s",
m.client.Conn.RemoteAddr().String(),
ca.ReasonCode,
ca.Properties.ReasonString,
)
}
return nil
}
// Configuration holds information for MQTT
type Configuration struct {
ClientID string // The ID to be used when connecting to a broker
Username string // Username to be used as authentication; empty for anonymous
Password string // Password to be used as authentication with Username
CleanStart bool // CleanStart when false and SessionExpiry set to > 1 it will reuse a session
SessionExpiry uint64 // Amount of seconds a session is valid; WARNING when set to 0 it is effectively a cleanstart.
QOS byte
KeepAlive uint16
Inflight uint
}
func New(conn net.Conn,
cfg Configuration,
) (*MQTT, error) {
c := paho.NewClient(paho.ClientConfig{
Router: paho.NewStandardRouter(),
Conn: conn,
})
cp := &paho.Connect{
KeepAlive: cfg.KeepAlive,
ClientID: cfg.ClientID,
CleanStart: cfg.CleanStart,
Username: cfg.Username,
Password: []byte(cfg.Password),
}
if cfg.Username != "" {
cp.UsernameFlag = true
}
if cfg.Password != "" {
cp.PasswordFlag = true
}
return &MQTT{
client: c,
connectProperties: cp,
qos: cfg.QOS,
incoming: make(chan *TopicData, cfg.Inflight),
}, nil
}
|