1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
package queue
import (
"github.com/valinurovam/garagemq/amqp"
)
type MsgStorageMock struct {
add bool
update bool
del bool
purged bool
messages []*amqp.Message
index map[uint64]int
pos int
}
func NewStorageMock(msgCap int) *MsgStorageMock {
mock := &MsgStorageMock{}
mock.messages = make([]*amqp.Message, msgCap)
mock.index = make(map[uint64]int)
return mock
}
// Add append message into add-queue
func (storage *MsgStorageMock) Add(message *amqp.Message, queue string) error {
storage.add = true
if storage.messages != nil {
storage.messages[storage.pos] = message
storage.index[message.ID] = storage.pos
storage.pos++
}
return nil
}
// Update append message into update-queue
func (storage *MsgStorageMock) Update(message *amqp.Message, queue string) error {
storage.update = true
return nil
}
// Del append message into del-queue
func (storage *MsgStorageMock) Del(message *amqp.Message, queue string) error {
storage.del = true
return nil
}
// PurgeQueue delete messages
func (storage *MsgStorageMock) PurgeQueue(queue string) {
storage.purged = true
}
func (storage *MsgStorageMock) GetQueueLength(queue string) uint64 {
return uint64(len(storage.messages))
}
func (storage *MsgStorageMock) IterateByQueueFromMsgID(queue string, msgID uint64, limit uint64, fn func(message *amqp.Message)) uint64 {
if storage.messages != nil {
var startPos int
var ok bool
if startPos, ok = storage.index[msgID]; !ok {
msgID++
if startPos, ok = storage.index[msgID]; !ok {
return 0
}
}
var iterated uint64
for i := startPos; i < len(storage.messages); i++ {
fn(storage.messages[i])
iterated++
if iterated == limit {
break
}
}
return iterated
}
return 0
}
|