165 lines
4.9 KiB
Go
165 lines
4.9 KiB
Go
|
package sarama
|
||
|
|
||
|
import "sync"
|
||
|
|
||
|
// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
|
||
|
// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
|
||
|
// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
|
||
|
//
|
||
|
// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
|
||
|
// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
|
||
|
// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
|
||
|
//
|
||
|
// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
|
||
|
// be set to true in its configuration.
|
||
|
type SyncProducer interface {
|
||
|
|
||
|
// SendMessage produces a given message, and returns only when it either has
|
||
|
// succeeded or failed to produce. It will return the partition and the offset
|
||
|
// of the produced message, or an error if the message failed to produce.
|
||
|
SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
|
||
|
|
||
|
// SendMessages produces a given set of messages, and returns only when all
|
||
|
// messages in the set have either succeeded or failed. Note that messages
|
||
|
// can succeed and fail individually; if some succeed and some fail,
|
||
|
// SendMessages will return an error.
|
||
|
SendMessages(msgs []*ProducerMessage) error
|
||
|
|
||
|
// Close shuts down the producer and waits for any buffered messages to be
|
||
|
// flushed. You must call this function before a producer object passes out of
|
||
|
// scope, as it may otherwise leak memory. You must call this before calling
|
||
|
// Close on the underlying client.
|
||
|
Close() error
|
||
|
}
|
||
|
|
||
|
type syncProducer struct {
|
||
|
producer *asyncProducer
|
||
|
wg sync.WaitGroup
|
||
|
}
|
||
|
|
||
|
// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
|
||
|
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
|
||
|
if config == nil {
|
||
|
config = NewConfig()
|
||
|
config.Producer.Return.Successes = true
|
||
|
}
|
||
|
|
||
|
if err := verifyProducerConfig(config); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
p, err := NewAsyncProducer(addrs, config)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
|
||
|
}
|
||
|
|
||
|
// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
|
||
|
// necessary to call Close() on the underlying client when shutting down this producer.
|
||
|
func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
|
||
|
if err := verifyProducerConfig(client.Config()); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
p, err := NewAsyncProducerFromClient(client)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
|
||
|
}
|
||
|
|
||
|
func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
|
||
|
sp := &syncProducer{producer: p}
|
||
|
|
||
|
sp.wg.Add(2)
|
||
|
go withRecover(sp.handleSuccesses)
|
||
|
go withRecover(sp.handleErrors)
|
||
|
|
||
|
return sp
|
||
|
}
|
||
|
|
||
|
func verifyProducerConfig(config *Config) error {
|
||
|
if !config.Producer.Return.Errors {
|
||
|
return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
|
||
|
}
|
||
|
if !config.Producer.Return.Successes {
|
||
|
return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
|
||
|
oldMetadata := msg.Metadata
|
||
|
defer func() {
|
||
|
msg.Metadata = oldMetadata
|
||
|
}()
|
||
|
|
||
|
expectation := make(chan *ProducerError, 1)
|
||
|
msg.Metadata = expectation
|
||
|
sp.producer.Input() <- msg
|
||
|
|
||
|
if err := <-expectation; err != nil {
|
||
|
return -1, -1, err.Err
|
||
|
}
|
||
|
|
||
|
return msg.Partition, msg.Offset, nil
|
||
|
}
|
||
|
|
||
|
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
|
||
|
savedMetadata := make([]interface{}, len(msgs))
|
||
|
for i := range msgs {
|
||
|
savedMetadata[i] = msgs[i].Metadata
|
||
|
}
|
||
|
defer func() {
|
||
|
for i := range msgs {
|
||
|
msgs[i].Metadata = savedMetadata[i]
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
expectations := make(chan chan *ProducerError, len(msgs))
|
||
|
go func() {
|
||
|
for _, msg := range msgs {
|
||
|
expectation := make(chan *ProducerError, 1)
|
||
|
msg.Metadata = expectation
|
||
|
sp.producer.Input() <- msg
|
||
|
expectations <- expectation
|
||
|
}
|
||
|
close(expectations)
|
||
|
}()
|
||
|
|
||
|
var errors ProducerErrors
|
||
|
for expectation := range expectations {
|
||
|
if err := <-expectation; err != nil {
|
||
|
errors = append(errors, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if len(errors) > 0 {
|
||
|
return errors
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (sp *syncProducer) handleSuccesses() {
|
||
|
defer sp.wg.Done()
|
||
|
for msg := range sp.producer.Successes() {
|
||
|
expectation := msg.Metadata.(chan *ProducerError)
|
||
|
expectation <- nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (sp *syncProducer) handleErrors() {
|
||
|
defer sp.wg.Done()
|
||
|
for err := range sp.producer.Errors() {
|
||
|
expectation := err.Msg.Metadata.(chan *ProducerError)
|
||
|
expectation <- err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (sp *syncProducer) Close() error {
|
||
|
sp.producer.AsyncClose()
|
||
|
sp.wg.Wait()
|
||
|
return nil
|
||
|
}
|