traefik/vendor/github.com/Shopify/sarama/sync_producer.go

165 lines
4.9 KiB
Go
Raw Normal View History

2018-01-10 17:48:04 +01:00
package sarama
import "sync"
// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
//
// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
//
// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
// be set to true in its configuration.
type SyncProducer interface {
// SendMessage produces a given message, and returns only when it either has
// succeeded or failed to produce. It will return the partition and the offset
// of the produced message, or an error if the message failed to produce.
SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
// SendMessages produces a given set of messages, and returns only when all
// messages in the set have either succeeded or failed. Note that messages
// can succeed and fail individually; if some succeed and some fail,
// SendMessages will return an error.
SendMessages(msgs []*ProducerMessage) error
// Close shuts down the producer and waits for any buffered messages to be
// flushed. You must call this function before a producer object passes out of
// scope, as it may otherwise leak memory. You must call this before calling
// Close on the underlying client.
Close() error
}
type syncProducer struct {
producer *asyncProducer
wg sync.WaitGroup
}
// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
if config == nil {
config = NewConfig()
config.Producer.Return.Successes = true
}
if err := verifyProducerConfig(config); err != nil {
return nil, err
}
p, err := NewAsyncProducer(addrs, config)
if err != nil {
return nil, err
}
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
if err := verifyProducerConfig(client.Config()); err != nil {
return nil, err
}
p, err := NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
sp := &syncProducer{producer: p}
sp.wg.Add(2)
go withRecover(sp.handleSuccesses)
go withRecover(sp.handleErrors)
return sp
}
func verifyProducerConfig(config *Config) error {
if !config.Producer.Return.Errors {
return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
}
if !config.Producer.Return.Successes {
return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
}
return nil
}
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
oldMetadata := msg.Metadata
defer func() {
msg.Metadata = oldMetadata
}()
expectation := make(chan *ProducerError, 1)
msg.Metadata = expectation
sp.producer.Input() <- msg
if err := <-expectation; err != nil {
return -1, -1, err.Err
}
return msg.Partition, msg.Offset, nil
}
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
savedMetadata := make([]interface{}, len(msgs))
for i := range msgs {
savedMetadata[i] = msgs[i].Metadata
}
defer func() {
for i := range msgs {
msgs[i].Metadata = savedMetadata[i]
}
}()
expectations := make(chan chan *ProducerError, len(msgs))
go func() {
for _, msg := range msgs {
expectation := make(chan *ProducerError, 1)
msg.Metadata = expectation
sp.producer.Input() <- msg
expectations <- expectation
}
close(expectations)
}()
var errors ProducerErrors
for expectation := range expectations {
if err := <-expectation; err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return errors
}
return nil
}
func (sp *syncProducer) handleSuccesses() {
defer sp.wg.Done()
for msg := range sp.producer.Successes() {
expectation := msg.Metadata.(chan *ProducerError)
expectation <- nil
}
}
func (sp *syncProducer) handleErrors() {
defer sp.wg.Done()
for err := range sp.producer.Errors() {
expectation := err.Msg.Metadata.(chan *ProducerError)
expectation <- err
}
}
func (sp *syncProducer) Close() error {
sp.producer.AsyncClose()
sp.wg.Wait()
return nil
}