1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
|
package ami
import (
"context"
"time"
"github.com/pkg/errors"
)
const attrActionID = "ActionID"
type requestOptions struct {
Timeout time.Duration
OnComplete func(ctx context.Context, msg *Message, err error)
}
type RequestOption func(o *requestOptions) error
func RequestTimeout(d time.Duration) RequestOption {
return func(o *requestOptions) error {
o.Timeout = d
return nil
}
}
// RequestResponseCallback will case Conn.Request run async, will not timeout
func RequestResponseCallback(cb func(ctx context.Context, msg *Message, err error)) RequestOption {
return func(o *requestOptions) error {
o.OnComplete = cb
return nil
}
}
func (c *Conn) Request(r interface{}, opts ...RequestOption) (resp *Message, err error) {
var msg *Message
msg, err = ConvertToMessage(r)
if err != nil {
return nil, err
}
if msg.Type != MessageTypeAction {
return nil, errors.Errorf("can only request action: %v", msg.Type)
}
async := &asyncMsg{
id: c.nextID(),
msg: msg,
result: make(chan *asyncMsg, 1),
ctx: context.Background(),
}
o := requestOptions{
Timeout: time.Second * 30,
}
for _, opt := range opts {
if err = opt(&o); err != nil {
return
}
}
onComplete := o.OnComplete
if onComplete != nil {
async.cb = func(v *asyncMsg) {
onComplete(v.ctx, v.resp, v.err)
}
}
msg.SetAttr(attrActionID, async.id)
if async.cb == nil {
var cancel context.CancelFunc
// todo allowed custom timeout
async.ctx, cancel = context.WithTimeout(async.ctx, o.Timeout)
defer cancel()
}
// enqueue
c.pending <- async
if async.cb != nil {
return nil, errors.New("No response yet")
}
select {
case <-async.ctx.Done():
return nil, async.ctx.Err()
case <-async.result:
err = async.err
if err == nil && async.resp != nil {
err = async.resp.Error()
}
return async.resp, err
}
}
|