1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
package ami
import "context"
type subscribe struct {
f SubscribeFunc
ctx context.Context
unsub bool
onSent bool
onRecv bool
}
type SubscribeFunc func(ctx context.Context, message *Message) bool
type SubscribeOption func(o *subscribe) error
func SubscribeSend() SubscribeOption {
return func(o *subscribe) error {
o.onSent = true
return nil
}
}
func SubscribeSetContext(ctx context.Context) SubscribeOption {
return func(o *subscribe) error {
o.ctx = ctx
return nil
}
}
func (c *Conn) Subscribe(cb SubscribeFunc, opts ...SubscribeOption) (func(), error) {
c.subLoc.Lock()
defer c.subLoc.Unlock()
sub := &subscribe{
f: cb,
onRecv: true,
}
for _, v := range opts {
err := v(sub)
if err != nil {
return nil, err
}
}
c.subs = append(c.subs, sub)
return func() {
sub.unsub = true
}, nil
}
func (c *Conn) cleanUnsub() {
c.subLoc.Lock()
defer c.subLoc.Unlock()
var neo []*subscribe
subs := c.subs
for _, sub := range subs {
if !sub.unsub {
neo = append(neo, sub)
}
}
c.subs = neo
}
|