151 lines
3.6 KiB
Go
151 lines
3.6 KiB
Go
|
package sarama
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"net"
|
||
|
)
|
||
|
|
||
|
type none struct{}
|
||
|
|
||
|
// make []int32 sortable so we can sort partition numbers
|
||
|
type int32Slice []int32
|
||
|
|
||
|
func (slice int32Slice) Len() int {
|
||
|
return len(slice)
|
||
|
}
|
||
|
|
||
|
func (slice int32Slice) Less(i, j int) bool {
|
||
|
return slice[i] < slice[j]
|
||
|
}
|
||
|
|
||
|
func (slice int32Slice) Swap(i, j int) {
|
||
|
slice[i], slice[j] = slice[j], slice[i]
|
||
|
}
|
||
|
|
||
|
func dupInt32Slice(input []int32) []int32 {
|
||
|
ret := make([]int32, 0, len(input))
|
||
|
for _, val := range input {
|
||
|
ret = append(ret, val)
|
||
|
}
|
||
|
return ret
|
||
|
}
|
||
|
|
||
|
func withRecover(fn func()) {
|
||
|
defer func() {
|
||
|
handler := PanicHandler
|
||
|
if handler != nil {
|
||
|
if err := recover(); err != nil {
|
||
|
handler(err)
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
fn()
|
||
|
}
|
||
|
|
||
|
func safeAsyncClose(b *Broker) {
|
||
|
tmp := b // local var prevents clobbering in goroutine
|
||
|
go withRecover(func() {
|
||
|
if connected, _ := tmp.Connected(); connected {
|
||
|
if err := tmp.Close(); err != nil {
|
||
|
Logger.Println("Error closing broker", tmp.ID(), ":", err)
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Encoder is a simple interface for any type that can be encoded as an array of bytes
|
||
|
// in order to be sent as the key or value of a Kafka message. Length() is provided as an
|
||
|
// optimization, and must return the same as len() on the result of Encode().
|
||
|
type Encoder interface {
|
||
|
Encode() ([]byte, error)
|
||
|
Length() int
|
||
|
}
|
||
|
|
||
|
// make strings and byte slices encodable for convenience so they can be used as keys
|
||
|
// and/or values in kafka messages
|
||
|
|
||
|
// StringEncoder implements the Encoder interface for Go strings so that they can be used
|
||
|
// as the Key or Value in a ProducerMessage.
|
||
|
type StringEncoder string
|
||
|
|
||
|
func (s StringEncoder) Encode() ([]byte, error) {
|
||
|
return []byte(s), nil
|
||
|
}
|
||
|
|
||
|
func (s StringEncoder) Length() int {
|
||
|
return len(s)
|
||
|
}
|
||
|
|
||
|
// ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
|
||
|
// as the Key or Value in a ProducerMessage.
|
||
|
type ByteEncoder []byte
|
||
|
|
||
|
func (b ByteEncoder) Encode() ([]byte, error) {
|
||
|
return b, nil
|
||
|
}
|
||
|
|
||
|
func (b ByteEncoder) Length() int {
|
||
|
return len(b)
|
||
|
}
|
||
|
|
||
|
// bufConn wraps a net.Conn with a buffer for reads to reduce the number of
|
||
|
// reads that trigger syscalls.
|
||
|
type bufConn struct {
|
||
|
net.Conn
|
||
|
buf *bufio.Reader
|
||
|
}
|
||
|
|
||
|
func newBufConn(conn net.Conn) *bufConn {
|
||
|
return &bufConn{
|
||
|
Conn: conn,
|
||
|
buf: bufio.NewReader(conn),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (bc *bufConn) Read(b []byte) (n int, err error) {
|
||
|
return bc.buf.Read(b)
|
||
|
}
|
||
|
|
||
|
// KafkaVersion instances represent versions of the upstream Kafka broker.
|
||
|
type KafkaVersion struct {
|
||
|
// it's a struct rather than just typing the array directly to make it opaque and stop people
|
||
|
// generating their own arbitrary versions
|
||
|
version [4]uint
|
||
|
}
|
||
|
|
||
|
func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
|
||
|
return KafkaVersion{
|
||
|
version: [4]uint{major, minor, veryMinor, patch},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// IsAtLeast return true if and only if the version it is called on is
|
||
|
// greater than or equal to the version passed in:
|
||
|
// V1.IsAtLeast(V2) // false
|
||
|
// V2.IsAtLeast(V1) // true
|
||
|
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
|
||
|
for i := range v.version {
|
||
|
if v.version[i] > other.version[i] {
|
||
|
return true
|
||
|
} else if v.version[i] < other.version[i] {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// Effective constants defining the supported kafka versions.
|
||
|
var (
|
||
|
V0_8_2_0 = newKafkaVersion(0, 8, 2, 0)
|
||
|
V0_8_2_1 = newKafkaVersion(0, 8, 2, 1)
|
||
|
V0_8_2_2 = newKafkaVersion(0, 8, 2, 2)
|
||
|
V0_9_0_0 = newKafkaVersion(0, 9, 0, 0)
|
||
|
V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
|
||
|
V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
|
||
|
V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
|
||
|
V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
|
||
|
V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
|
||
|
minVersion = V0_8_2_0
|
||
|
)
|