1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
package sarama
import "sync"
var expectationsPool = sync.Pool{
New: func() interface{} {
return make(chan *ProducerError, 1)
},
}
// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
//
// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
//
// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
// be set to true in its configuration.
type SyncProducer interface {
// SendMessage produces a given message, and returns only when it either has
// succeeded or failed to produce. It will return the partition and the offset
// of the produced message, or an error if the message failed to produce.
SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
// SendMessages produces a given set of messages, and returns only when all
// messages in the set have either succeeded or failed. Note that messages
// can succeed and fail individually; if some succeed and some fail,
// SendMessages will return an error.
SendMessages(msgs []*ProducerMessage) error
// Close shuts down the producer; you must call this function before a producer
// object passes out of scope, as it may otherwise leak memory.
// You must call this before calling Close on the underlying client.
Close() error
// TxnStatus return current producer transaction status.
TxnStatus() ProducerTxnStatusFlag
// IsTransactional return true when current producer is transactional.
IsTransactional() bool
// BeginTxn mark current transaction as ready.
BeginTxn() error
// CommitTxn commit current transaction.
CommitTxn() error
// AbortTxn abort current transaction.
AbortTxn() error
// AddOffsetsToTxn add associated offsets to current transaction.
AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
// AddMessageToTxn add message offsets to current transaction.
AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
}
type syncProducer struct {
producer *asyncProducer
wg sync.WaitGroup
}
// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
if config == nil {
config = NewConfig()
config.Producer.Return.Successes = true
}
if err := verifyProducerConfig(config); err != nil {
return nil, err
}
p, err := NewAsyncProducer(addrs, config)
if err != nil {
return nil, err
}
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
if err := verifyProducerConfig(client.Config()); err != nil {
return nil, err
}
p, err := NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
sp := &syncProducer{producer: p}
sp.wg.Add(2)
go withRecover(sp.handleSuccesses)
go withRecover(sp.handleErrors)
return sp
}
func verifyProducerConfig(config *Config) error {
if !config.Producer.Return.Errors {
return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
}
if !config.Producer.Return.Successes {
return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
}
return nil
}
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
expectation := expectationsPool.Get().(chan *ProducerError)
msg.expectation = expectation
sp.producer.Input() <- msg
pErr := <-expectation
msg.expectation = nil
expectationsPool.Put(expectation)
if pErr != nil {
return -1, -1, pErr.Err
}
return msg.Partition, msg.Offset, nil
}
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
indices := make(chan int, len(msgs))
go func() {
for i, msg := range msgs {
expectation := expectationsPool.Get().(chan *ProducerError)
msg.expectation = expectation
sp.producer.Input() <- msg
indices <- i
}
close(indices)
}()
var errors ProducerErrors
for i := range indices {
expectation := msgs[i].expectation
pErr := <-expectation
msgs[i].expectation = nil
expectationsPool.Put(expectation)
if pErr != nil {
errors = append(errors, pErr)
}
}
if len(errors) > 0 {
return errors
}
return nil
}
func (sp *syncProducer) handleSuccesses() {
defer sp.wg.Done()
for msg := range sp.producer.Successes() {
expectation := msg.expectation
expectation <- nil
}
}
func (sp *syncProducer) handleErrors() {
defer sp.wg.Done()
for err := range sp.producer.Errors() {
expectation := err.Msg.expectation
expectation <- err
}
}
func (sp *syncProducer) Close() error {
sp.producer.AsyncClose()
sp.wg.Wait()
return nil
}
func (sp *syncProducer) IsTransactional() bool {
return sp.producer.IsTransactional()
}
func (sp *syncProducer) BeginTxn() error {
return sp.producer.BeginTxn()
}
func (sp *syncProducer) CommitTxn() error {
return sp.producer.CommitTxn()
}
func (sp *syncProducer) AbortTxn() error {
return sp.producer.AbortTxn()
}
func (sp *syncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
return sp.producer.AddOffsetsToTxn(offsets, groupId)
}
func (sp *syncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
return sp.producer.AddMessageToTxn(msg, groupId, metadata)
}
func (p *syncProducer) TxnStatus() ProducerTxnStatusFlag {
return p.producer.TxnStatus()
}
|