1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
|
package amqptest
import (
"sync"
"time"
"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqptest/server"
"github.com/NeowayLabs/wabbit/utils"
"github.com/pborman/uuid"
)
const (
// 1 second
defaultReconnectDelay = 1
)
// Conn is the fake AMQP connection
type Conn struct {
amqpuri string
isConnected bool
ConnID string
errSpread *utils.ErrBroadcast
errChan chan wabbit.Error
defErrDone chan bool
mu *sync.Mutex
hasAutoRedial bool
amqpServer *server.AMQPServer
dialFn func() error
}
// Dial mock the connection dialing to rabbitmq and
// returns the established connection or error if something goes wrong
func Dial(amqpuri string) (*Conn, error) {
conn := &Conn{
amqpuri: amqpuri,
errSpread: utils.NewErrBroadcast(),
errChan: make(chan wabbit.Error),
defErrDone: make(chan bool),
mu: &sync.Mutex{},
}
conn.errSpread.Add(conn.errChan)
conn.dialFn = func() error {
var err error
conn.ConnID = uuid.New()
conn.amqpServer, err = server.Connect(amqpuri, conn.ConnID, conn.errSpread)
if err != nil {
return err
}
// concurrent access with Close method
conn.mu.Lock()
conn.isConnected = true
conn.mu.Unlock()
// by default, we discard any errors
// send something to defErrDone to destroy
// this goroutine and start consume the errors
go func() {
for {
select {
case <-conn.errChan:
case <-conn.defErrDone:
conn.mu.Lock()
if conn.hasAutoRedial {
conn.mu.Unlock()
return
}
conn.mu.Unlock()
// Drain the errChan channel before
// the exit.
for {
if _, ok := <-conn.errChan; !ok {
return
}
}
}
}
}()
return nil
}
err := conn.dialFn()
if err != nil {
return nil, err
}
return conn, nil
}
// NotifyClose publishs notifications about server or client errors in the given channel
func (conn *Conn) NotifyClose(c chan wabbit.Error) chan wabbit.Error {
conn.errSpread.Add(c)
return c
}
// AutoRedial mock the reconnection faking a delay of 1 second
func (conn *Conn) AutoRedial(outChan chan wabbit.Error, done chan bool) {
if !conn.hasAutoRedial {
conn.mu.Lock()
conn.hasAutoRedial = true
conn.mu.Unlock()
conn.defErrDone <- true
}
go func() {
var err wabbit.Error
var attempts uint
select {
case amqpErr := <-conn.errChan:
err = amqpErr
if amqpErr == nil {
// Gracefull connection close
return
}
lattempts:
// send the error to client
outChan <- err
if attempts > 60 {
attempts = 0
}
// Wait n Seconds where n == attempts...
time.Sleep(time.Duration(attempts) * time.Second)
connErr := conn.dialFn()
if connErr != nil {
attempts++
goto lattempts
}
// enabled AutoRedial on the new connection
conn.AutoRedial(outChan, done)
done <- true
return
}
}()
}
// Close the fake connection
func (conn *Conn) Close() error {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.isConnected {
// Disconnect from the server.
if err := server.Close(conn.amqpuri, conn.ConnID); err != nil {
return err
}
conn.isConnected = false
conn.amqpServer = nil
}
// enables AutoRedial to gracefully shutdown
// This isn't wabbit stuff. It's the rabbitmq/amqp way of notify the shutdown
if conn.hasAutoRedial {
conn.errSpread.Write(nil)
} else {
conn.errSpread.Delete(conn.errChan)
close(conn.errChan)
conn.defErrDone <- true
}
return nil
}
// Channel creates a new fake channel
func (conn *Conn) Channel() (wabbit.Channel, error) {
return conn.amqpServer.CreateChannel(conn.ConnID, conn)
}
|