72 lines
1.7 KiB
Go
72 lines
1.7 KiB
Go
|
package aggregator
|
||
|
|
||
|
import (
|
||
|
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||
|
)
|
||
|
|
||
|
// RingChannel implements a channel in a way that never blocks the writer.
|
||
|
// Specifically, if a value is written to a RingChannel when its buffer is full then the oldest
|
||
|
// value in the buffer is discarded to make room (just like a standard ring-buffer).
|
||
|
// Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling
|
||
|
// the writer before the reader, so caveat emptor.
|
||
|
type RingChannel struct {
|
||
|
input, output chan dynamic.Message
|
||
|
buffer *dynamic.Message
|
||
|
}
|
||
|
|
||
|
func newRingChannel() *RingChannel {
|
||
|
ch := &RingChannel{
|
||
|
input: make(chan dynamic.Message),
|
||
|
output: make(chan dynamic.Message),
|
||
|
}
|
||
|
go ch.ringBuffer()
|
||
|
return ch
|
||
|
}
|
||
|
|
||
|
func (ch *RingChannel) in() chan<- dynamic.Message {
|
||
|
return ch.input
|
||
|
}
|
||
|
|
||
|
func (ch *RingChannel) out() <-chan dynamic.Message {
|
||
|
return ch.output
|
||
|
}
|
||
|
|
||
|
// for all buffered cases.
|
||
|
func (ch *RingChannel) ringBuffer() {
|
||
|
var input, output chan dynamic.Message
|
||
|
var next dynamic.Message
|
||
|
input = ch.input
|
||
|
|
||
|
for input != nil || output != nil {
|
||
|
select {
|
||
|
// Prefer to write if possible, which is surprisingly effective in reducing
|
||
|
// dropped elements due to overflow. The naive read/write select chooses randomly
|
||
|
// when both channels are ready, which produces unnecessary drops 50% of the time.
|
||
|
case output <- next:
|
||
|
ch.buffer = nil
|
||
|
default:
|
||
|
select {
|
||
|
case elem, open := <-input:
|
||
|
if !open {
|
||
|
input = nil
|
||
|
break
|
||
|
}
|
||
|
|
||
|
ch.buffer = &elem
|
||
|
case output <- next:
|
||
|
ch.buffer = nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if ch.buffer == nil {
|
||
|
output = nil
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
output = ch.output
|
||
|
next = *ch.buffer
|
||
|
}
|
||
|
|
||
|
close(ch.output)
|
||
|
}
|