1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
package mq
import (
"encoding/json"
"errors"
"fmt"
"strconv"
)
type Client struct {
AccessKey string
SecretKey string
Endpoint string
Topic string
ProducerId string
ConsumerId string
Key string
Tag string
}
type MessageResponse struct {
Body string `json:"body"`
BornTime int64 `json:"bornTime"` // UTC time in Unix
}
func NewClient(ak string, sk string, endpoint string, topic string,
producerId string, consumerId string, key string, tag string) (client *Client) {
client = &Client{
AccessKey: ak,
SecretKey: sk,
Endpoint: endpoint,
Topic: topic,
ProducerId: producerId,
ConsumerId: consumerId,
Key: key,
Tag: tag,
}
return client
}
func getSendUrl(endpoint string, topic string, time int64, tag string, key string) string {
return endpoint + "/message/?topic=" + topic + "&time=" +
strconv.FormatInt(time, 10) + "&tag=" + tag + "&key=" + key
}
func getReceiveUrl(endpoint, topic string, time int64, tag string, num int) string {
return endpoint + "/message/?topic=" + topic + "&time=" +
strconv.FormatInt(time, 10) + "&tag=" + tag + "&num=" + strconv.Itoa(num)
}
func getSendSign(topic string, producerId string, messageBody []byte, time int64, sk string) (sign string) {
signStr := topic + newline + producerId + newline + Md5(messageBody) + newline + strconv.FormatInt(time, 10)
sign = HamSha1(signStr, []byte(sk))
return sign
}
func getReceiveSign(topic string, consumerId string, time int64, sk string) string {
// [topic+ā\nā+ cid+ā\nā+time]
signStr := topic + newline + consumerId + newline + strconv.FormatInt(time, 10)
return HamSha1(signStr, []byte(sk))
}
func getReceiveHeader(ak, sign, consumerId string) (map[string]string, error) {
if consumerId == "" {
return nil, fmt.Errorf("consumer id is not provided")
}
header := make(map[string]string)
header["AccessKey"] = ak
header["Signature"] = sign
header["ConsumerId"] = consumerId
return header, nil
}
func getSendHeader(ak string, sign string, producerId string) (header map[string]string, err error) {
if producerId == "" {
return nil, fmt.Errorf("producer id is not provided")
}
header = make(map[string]string, 0)
header["AccessKey"] = ak
header["Signature"] = sign
header["ProducerId"] = producerId
return header, nil
}
func (client *Client) Send(time int64, message []byte) (msgId string, err error) {
url := getSendUrl(client.Endpoint, client.Topic, time, client.Tag, client.Key)
sign := getSendSign(client.Topic, client.ProducerId, message, time, client.SecretKey)
header, err := getSendHeader(client.AccessKey, sign, client.ProducerId)
if err != nil {
return "", err
}
response, status, err := httpPost(url, header, message)
if err != nil {
return "", err
}
fmt.Printf("receive message: %s %d", response, status)
statusMessage := getStatusCodeMessage(status)
if statusMessage != "" {
return "", errors.New(statusMessage)
}
var rs interface{}
err = json.Unmarshal(response, &rs)
if err != nil {
return "", err
}
result := rs.(map[string]interface{})
sendStatus := result["sendStatus"].(string)
if sendStatus != "SEND_OK" {
return "", errors.New(sendStatus)
}
return result["msgId"].(string), nil
}
func (client *Client) ReceiveMessage(messageChan chan string, errChan chan error) {
// only listen for the latest message
time := GetCurrentUnixMicro()
url := getReceiveUrl(client.Endpoint, client.Topic, time, client.Tag, 1)
sign := getReceiveSign(client.Topic, client.ConsumerId, time, client.SecretKey)
header, err := getReceiveHeader(client.AccessKey, sign, client.ConsumerId)
if err != nil {
errChan <- err
return
}
response, status, err := HttpGet(url, header)
if err != nil {
errChan <- err
return
}
fmt.Printf("receive message: %s %d", response, status)
statusMessage := getStatusCodeMessage(status)
if statusMessage != "" {
errChan <- errors.New(statusMessage)
return
}
messages := make([]MessageResponse, 0)
json.Unmarshal(response, &messages)
if len(messages) > 0 {
fmt.Printf("size of messages is %d", len(messages))
message := messages[0]
messageChan <- message.Body
} else {
errChan <- fmt.Errorf("no message available")
return
}
}
|