File: dial.go

package info (click to toggle)
golang-github-neowaylabs-wabbit 0.0~git20210927.0.73ad61d-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-backports, bookworm-proposed-updates, experimental, sid, trixie
  • size: 272 kB
  • sloc: sh: 24; makefile: 4
file content (180 lines) | stat: -rw-r--r-- 3,635 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package amqptest

import (
	"sync"
	"time"

	"github.com/NeowayLabs/wabbit"
	"github.com/NeowayLabs/wabbit/amqptest/server"
	"github.com/NeowayLabs/wabbit/utils"
	"github.com/pborman/uuid"
)

const (
	// 1 second
	defaultReconnectDelay = 1
)

// Conn is the fake AMQP connection
type Conn struct {
	amqpuri       string
	isConnected   bool
	ConnID        string
	errSpread     *utils.ErrBroadcast
	errChan       chan wabbit.Error
	defErrDone    chan bool
	mu            *sync.Mutex
	hasAutoRedial bool
	amqpServer    *server.AMQPServer

	dialFn func() error
}

// Dial mock the connection dialing to rabbitmq and
// returns the established connection or error if something goes wrong
func Dial(amqpuri string) (*Conn, error) {
	conn := &Conn{
		amqpuri:    amqpuri,
		errSpread:  utils.NewErrBroadcast(),
		errChan:    make(chan wabbit.Error),
		defErrDone: make(chan bool),
		mu:         &sync.Mutex{},
	}

	conn.errSpread.Add(conn.errChan)

	conn.dialFn = func() error {
		var err error
		conn.ConnID = uuid.New()
		conn.amqpServer, err = server.Connect(amqpuri, conn.ConnID, conn.errSpread)

		if err != nil {
			return err
		}

		// concurrent access with Close method
		conn.mu.Lock()
		conn.isConnected = true
		conn.mu.Unlock()

		// by default, we discard any errors
		// send something to defErrDone to destroy
		// this goroutine and start consume the errors
		go func() {
			for {
				select {
				case <-conn.errChan:
				case <-conn.defErrDone:
					conn.mu.Lock()
					if conn.hasAutoRedial {
						conn.mu.Unlock()
						return
					}
					conn.mu.Unlock()
					// Drain the errChan channel before
					// the exit.
					for {
						if _, ok := <-conn.errChan; !ok {
							return
						}
					}
				}
			}
		}()

		return nil
	}

	err := conn.dialFn()

	if err != nil {
		return nil, err
	}

	return conn, nil
}

// NotifyClose publishs notifications about server or client errors in the given channel
func (conn *Conn) NotifyClose(c chan wabbit.Error) chan wabbit.Error {
	conn.errSpread.Add(c)
	return c
}

// AutoRedial mock the reconnection faking a delay of 1 second
func (conn *Conn) AutoRedial(outChan chan wabbit.Error, done chan bool) {
	if !conn.hasAutoRedial {
		conn.mu.Lock()
		conn.hasAutoRedial = true
		conn.mu.Unlock()
		conn.defErrDone <- true
	}

	go func() {
		var err wabbit.Error
		var attempts uint

		select {
		case amqpErr := <-conn.errChan:
			err = amqpErr

			if amqpErr == nil {
				// Gracefull connection close
				return
			}
		lattempts:
			// send the error to client
			outChan <- err

			if attempts > 60 {
				attempts = 0
			}

			// Wait n Seconds where n == attempts...
			time.Sleep(time.Duration(attempts) * time.Second)

			connErr := conn.dialFn()

			if connErr != nil {
				attempts++
				goto lattempts
			}

			// enabled AutoRedial on the new connection
			conn.AutoRedial(outChan, done)
			done <- true
			return
		}
	}()
}

// Close the fake connection
func (conn *Conn) Close() error {
	conn.mu.Lock()
	defer conn.mu.Unlock()

	if conn.isConnected {
		// Disconnect from the server.
		if err := server.Close(conn.amqpuri, conn.ConnID); err != nil {
			return err
		}
		conn.isConnected = false
		conn.amqpServer = nil
	}

	// enables AutoRedial to gracefully shutdown
	// This isn't wabbit stuff. It's the rabbitmq/amqp way of notify the shutdown
	if conn.hasAutoRedial {
		conn.errSpread.Write(nil)
	} else {
		conn.errSpread.Delete(conn.errChan)
		close(conn.errChan)
		conn.defErrDone <- true
	}

	return nil
}

// Channel creates a new fake channel
func (conn *Conn) Channel() (wabbit.Channel, error) {
	return conn.amqpServer.CreateChannel(conn.ConnID, conn)
}