File: message.go

package info (click to toggle)
golang-github-azure-azure-sdk-for-go 10.3.0~beta-1
  • links: PTS, VCS
  • area: main
  • in suites: buster, experimental
  • size: 15,936 kB
  • ctags: 22,331
  • sloc: sh: 33; makefile: 8
file content (153 lines) | stat: -rw-r--r-- 4,628 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package storage

import (
	"encoding/xml"
	"fmt"
	"net/http"
	"net/url"
	"strconv"
	"time"
)

// Message represents an Azure message.
type Message struct {
	Queue        *Queue
	Text         string      `xml:"MessageText"`
	ID           string      `xml:"MessageId"`
	Insertion    TimeRFC1123 `xml:"InsertionTime"`
	Expiration   TimeRFC1123 `xml:"ExpirationTime"`
	PopReceipt   string      `xml:"PopReceipt"`
	NextVisible  TimeRFC1123 `xml:"TimeNextVisible"`
	DequeueCount int         `xml:"DequeueCount"`
}

func (m *Message) buildPath() string {
	return fmt.Sprintf("%s/%s", m.Queue.buildPathMessages(), m.ID)
}

// PutMessageOptions is the set of options can be specified for Put Messsage
// operation. A zero struct does not use any preferences for the request.
type PutMessageOptions struct {
	Timeout           uint
	VisibilityTimeout int
	MessageTTL        int
	RequestID         string `header:"x-ms-client-request-id"`
}

// Put operation adds a new message to the back of the message queue.
//
// See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Put-Message
func (m *Message) Put(options *PutMessageOptions) error {
	query := url.Values{}
	headers := m.Queue.qsc.client.getStandardHeaders()

	req := putMessageRequest{MessageText: m.Text}
	body, nn, err := xmlMarshal(req)
	if err != nil {
		return err
	}
	headers["Content-Length"] = strconv.Itoa(nn)

	if options != nil {
		if options.VisibilityTimeout != 0 {
			query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
		}
		if options.MessageTTL != 0 {
			query.Set("messagettl", strconv.Itoa(options.MessageTTL))
		}
		query = addTimeout(query, options.Timeout)
		headers = mergeHeaders(headers, headersFromStruct(*options))
	}

	uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.Queue.buildPathMessages(), query)
	resp, err := m.Queue.qsc.client.exec(http.MethodPost, uri, headers, body, m.Queue.qsc.auth)
	if err != nil {
		return err
	}
	defer readAndCloseBody(resp.body)

	err = xmlUnmarshal(resp.body, m)
	if err != nil {
		return err
	}
	return checkRespCode(resp.statusCode, []int{http.StatusCreated})
}

// UpdateMessageOptions is the set of options can be specified for Update Messsage
// operation. A zero struct does not use any preferences for the request.
type UpdateMessageOptions struct {
	Timeout           uint
	VisibilityTimeout int
	RequestID         string `header:"x-ms-client-request-id"`
}

// Update operation updates the specified message.
//
// See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Update-Message
func (m *Message) Update(options *UpdateMessageOptions) error {
	query := url.Values{}
	if m.PopReceipt != "" {
		query.Set("popreceipt", m.PopReceipt)
	}

	headers := m.Queue.qsc.client.getStandardHeaders()
	req := putMessageRequest{MessageText: m.Text}
	body, nn, err := xmlMarshal(req)
	if err != nil {
		return err
	}
	headers["Content-Length"] = strconv.Itoa(nn)

	if options != nil {
		if options.VisibilityTimeout != 0 {
			query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
		}
		query = addTimeout(query, options.Timeout)
		headers = mergeHeaders(headers, headersFromStruct(*options))
	}
	uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), query)

	resp, err := m.Queue.qsc.client.exec(http.MethodPut, uri, headers, body, m.Queue.qsc.auth)
	if err != nil {
		return err
	}
	defer readAndCloseBody(resp.body)

	m.PopReceipt = resp.headers.Get("x-ms-popreceipt")
	nextTimeStr := resp.headers.Get("x-ms-time-next-visible")
	if nextTimeStr != "" {
		nextTime, err := time.Parse(time.RFC1123, nextTimeStr)
		if err != nil {
			return err
		}
		m.NextVisible = TimeRFC1123(nextTime)
	}

	return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
}

// Delete operation deletes the specified message.
//
// See https://msdn.microsoft.com/en-us/library/azure/dd179347.aspx
func (m *Message) Delete(options *QueueServiceOptions) error {
	params := url.Values{"popreceipt": {m.PopReceipt}}
	headers := m.Queue.qsc.client.getStandardHeaders()

	if options != nil {
		params = addTimeout(params, options.Timeout)
		headers = mergeHeaders(headers, headersFromStruct(*options))
	}
	uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), params)

	resp, err := m.Queue.qsc.client.exec(http.MethodDelete, uri, headers, nil, m.Queue.qsc.auth)
	if err != nil {
		return err
	}
	readAndCloseBody(resp.body)
	return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
}

type putMessageRequest struct {
	XMLName     xml.Name `xml:"QueueMessage"`
	MessageText string   `xml:"MessageText"`
}