File: exchange.go

package info (click to toggle)
golang-github-neowaylabs-wabbit 0.0~git20210927.0.73ad61d-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-backports, bookworm-proposed-updates, experimental, forky, sid, trixie
  • size: 272 kB
  • sloc: sh: 24; makefile: 4
file content (117 lines) | stat: -rw-r--r-- 3,276 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main

import (
	"flag"
	"github.com/NeowayLabs/wabbit"
	"github.com/NeowayLabs/wabbit/amqp"
	"log"
)

var (
	uri          = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
	queueName    = flag.String("queue", "test-queue", "Ephemeral AMQP queue name")
	exchange     = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
	exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
	body         = flag.String("body", "body test", "Body of message")
	reliable     = flag.Bool("reliable", true, "Wait for the publisher confirmation before exiting")
)

func init() {
	flag.Parse()
}

func main() {
	publish(*uri, *queueName, *exchange, *exchangeType, *body, *reliable)
}

func publish(uri string, queueName string, exchange string, exchangeType string, body string, reliable bool) {
	log.Println("[-] Connecting to", uri)
	connection, err := connect(uri)

	if err != nil {
		log.Fatalf("[x] AMQP connection error: %s", err)
	}

	log.Println("[√] Connected successfully")

	channel, err := connection.Channel()

	if err != nil {
		log.Fatalf("[x] Failed to open a channel: %s", err)
	}

	defer channel.Close()

	log.Println("[-] Declaring Exchange", exchangeType, exchange)
	err = channel.ExchangeDeclare(exchange, exchangeType, nil)

	if err != nil {
		log.Fatalf("[x] Failed to declare exchange: %s", err)
	}
	log.Println("[√] Exchange", exchange, "has been declared successfully")

	log.Println("[-] Declaring queue", queueName, "into channel")
	queue, err := declareQueue(queueName, channel)

	if err != nil {
		log.Fatalf("[x] Queue could not be declared. Error: %s", err.Error())
	}
	log.Println("[√] Queue", queueName, "has been declared successfully")

	if reliable {
		log.Printf("[-] Enabling publishing confirms.")
		if err := channel.Confirm(false); err != nil {
			log.Fatalf("[x] Channel could not be put into confirm mode: %s", err)
		}

		confirms := channel.NotifyPublish(make(chan wabbit.Confirmation, 1))

		defer confirmOne(confirms)
	}

	log.Println("[-] Sending message to queue:", queueName, "- exchange:", exchange)
	log.Println("\t", body)

	err = publishMessage(body, exchange, queue, channel)

	if err != nil {
		log.Fatalf("[x] Failed to publish a message. Error: %s", err.Error())
	}
}

func connect(uri string) (*amqp.Conn, error) {
	return amqp.Dial(uri)
}

func declareQueue(queueName string, channel wabbit.Channel) (wabbit.Queue, error) {
	return channel.QueueDeclare(
		queueName,
		wabbit.Option{
			"durable":    true,
			"autoDelete": false,
			"exclusive":  false,
			"noWait":     false,
		},
	)
}

func publishMessage(body string, exchange string, queue wabbit.Queue, channel wabbit.Channel) error {
	return channel.Publish(
		exchange,     // exchange
		queue.Name(), // routing key
		[]byte(body),
		wabbit.Option{
			"deliveryMode": 2,
			"contentType":  "text/plain",
		})
}

func confirmOne(confirms <-chan wabbit.Confirmation) {
	log.Printf("[-] Waiting for confirmation of one publishing")

	if confirmed := <-confirms; confirmed.Ack() {
		log.Printf("[√] Confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
	} else {
		log.Printf("[x] Failed delivery of delivery tag: %d", confirmed.DeliveryTag)
	}
}