File: request.go

package info (click to toggle)
golang-github-wenerme-astgo 0.0~git20230926.1b5bc38-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 612 kB
  • sloc: makefile: 16
file content (91 lines) | stat: -rw-r--r-- 1,855 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package ami

import (
	"context"
	"time"

	"github.com/pkg/errors"
)

const attrActionID = "ActionID"

type requestOptions struct {
	Timeout    time.Duration
	OnComplete func(ctx context.Context, msg *Message, err error)
}
type RequestOption func(o *requestOptions) error

func RequestTimeout(d time.Duration) RequestOption {
	return func(o *requestOptions) error {
		o.Timeout = d
		return nil
	}
}

// RequestResponseCallback will case Conn.Request run async, will not timeout
func RequestResponseCallback(cb func(ctx context.Context, msg *Message, err error)) RequestOption {
	return func(o *requestOptions) error {
		o.OnComplete = cb
		return nil
	}
}

func (c *Conn) Request(r interface{}, opts ...RequestOption) (resp *Message, err error) {
	var msg *Message
	msg, err = ConvertToMessage(r)
	if err != nil {
		return nil, err
	}
	if msg.Type != MessageTypeAction {
		return nil, errors.Errorf("can only request action: %v", msg.Type)
	}

	async := &asyncMsg{
		id:     c.nextID(),
		msg:    msg,
		result: make(chan *asyncMsg, 1),
		ctx:    context.Background(),
	}
	o := requestOptions{
		Timeout: time.Second * 30,
	}
	for _, opt := range opts {
		if err = opt(&o); err != nil {
			return
		}
	}

	onComplete := o.OnComplete
	if onComplete != nil {
		async.cb = func(v *asyncMsg) {
			onComplete(v.ctx, v.resp, v.err)
		}
	}

	msg.SetAttr(attrActionID, async.id)

	if async.cb == nil {
		var cancel context.CancelFunc
		// todo allowed custom timeout
		async.ctx, cancel = context.WithTimeout(async.ctx, o.Timeout)
		defer cancel()
	}

	// enqueue
	c.pending <- async

	if async.cb != nil {
		return nil, errors.New("No response yet")
	}

	select {
	case <-async.ctx.Done():
		return nil, async.ctx.Err()
	case <-async.result:
		err = async.err
		if err == nil && async.resp != nil {
			err = async.resp.Error()
		}
		return async.resp, err
	}
}