File: client.go

package info (click to toggle)
golang-github-denverdino-aliyungo 0.0~git20180921.13fa8aa-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 1,824 kB
  • sloc: xml: 1,359; makefile: 3
file content (153 lines) | stat: -rw-r--r-- 4,231 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package mq

import (
	"encoding/json"
	"errors"
	"fmt"
	"strconv"
)

type Client struct {
	AccessKey  string
	SecretKey  string
	Endpoint   string
	Topic      string
	ProducerId string
	ConsumerId string
	Key        string
	Tag        string
}

type MessageResponse struct {
	Body     string `json:"body"`
	BornTime int64  `json:"bornTime"` // UTC time in Unix
}

func NewClient(ak string, sk string, endpoint string, topic string,
	producerId string, consumerId string, key string, tag string) (client *Client) {
	client = &Client{
		AccessKey:  ak,
		SecretKey:  sk,
		Endpoint:   endpoint,
		Topic:      topic,
		ProducerId: producerId,
		ConsumerId: consumerId,
		Key:        key,
		Tag:        tag,
	}
	return client
}

func getSendUrl(endpoint string, topic string, time int64, tag string, key string) string {
	return endpoint + "/message/?topic=" + topic + "&time=" +
		strconv.FormatInt(time, 10) + "&tag=" + tag + "&key=" + key
}

func getReceiveUrl(endpoint, topic string, time int64, tag string, num int) string {
	return endpoint + "/message/?topic=" + topic + "&time=" +
		strconv.FormatInt(time, 10) + "&tag=" + tag + "&num=" + strconv.Itoa(num)
}

func getSendSign(topic string, producerId string, messageBody []byte, time int64, sk string) (sign string) {
	signStr := topic + newline + producerId + newline + Md5(messageBody) + newline + strconv.FormatInt(time, 10)
	sign = HamSha1(signStr, []byte(sk))
	return sign
}

func getReceiveSign(topic string, consumerId string, time int64, sk string) string {
	// [topic+ā€\nā€+ cid+ā€\nā€+time]
	signStr := topic + newline + consumerId + newline + strconv.FormatInt(time, 10)
	return HamSha1(signStr, []byte(sk))
}

func getReceiveHeader(ak, sign, consumerId string) (map[string]string, error) {
	if consumerId == "" {
		return nil, fmt.Errorf("consumer id is not provided")
	}
	header := make(map[string]string)
	header["AccessKey"] = ak
	header["Signature"] = sign
	header["ConsumerId"] = consumerId
	return header, nil
}

func getSendHeader(ak string, sign string, producerId string) (header map[string]string, err error) {
	if producerId == "" {
		return nil, fmt.Errorf("producer id is not provided")
	}
	header = make(map[string]string, 0)
	header["AccessKey"] = ak
	header["Signature"] = sign
	header["ProducerId"] = producerId
	return header, nil
}

func (client *Client) Send(time int64, message []byte) (msgId string, err error) {
	url := getSendUrl(client.Endpoint, client.Topic, time, client.Tag, client.Key)
	sign := getSendSign(client.Topic, client.ProducerId, message, time, client.SecretKey)
	header, err := getSendHeader(client.AccessKey, sign, client.ProducerId)
	if err != nil {
		return "", err
	}
	response, status, err := httpPost(url, header, message)
	if err != nil {
		return "", err
	}

	fmt.Printf("receive message: %s %d", response, status)
	statusMessage := getStatusCodeMessage(status)
	if statusMessage != "" {
		return "", errors.New(statusMessage)
	}

	var rs interface{}
	err = json.Unmarshal(response, &rs)
	if err != nil {
		return "", err
	}

	result := rs.(map[string]interface{})

	sendStatus := result["sendStatus"].(string)
	if sendStatus != "SEND_OK" {
		return "", errors.New(sendStatus)
	}

	return result["msgId"].(string), nil
}

func (client *Client) ReceiveMessage(messageChan chan string, errChan chan error) {
	// only listen for the latest message
	time := GetCurrentUnixMicro()
	url := getReceiveUrl(client.Endpoint, client.Topic, time, client.Tag, 1)
	sign := getReceiveSign(client.Topic, client.ConsumerId, time, client.SecretKey)
	header, err := getReceiveHeader(client.AccessKey, sign, client.ConsumerId)
	if err != nil {
		errChan <- err
		return
	}
	response, status, err := HttpGet(url, header)
	if err != nil {
		errChan <- err
		return
	}

	fmt.Printf("receive message: %s %d", response, status)
	statusMessage := getStatusCodeMessage(status)
	if statusMessage != "" {
		errChan <- errors.New(statusMessage)
		return
	}

	messages := make([]MessageResponse, 0)
	json.Unmarshal(response, &messages)

	if len(messages) > 0 {
		fmt.Printf("size of messages is %d", len(messages))
		message := messages[0]
		messageChan <- message.Body
	} else {
		errChan <- fmt.Errorf("no message available")
		return
	}
}