d5cb9b50f4
Co-authored-by: Julien Salleyron <julien.salleyron@gmail.com>
853 lines
23 KiB
Go
853 lines
23 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
|
"github.com/traefik/traefik/v2/pkg/provider/aggregator"
|
|
"github.com/traefik/traefik/v2/pkg/safe"
|
|
th "github.com/traefik/traefik/v2/pkg/testhelpers"
|
|
"github.com/traefik/traefik/v2/pkg/tls"
|
|
)
|
|
|
|
type mockProvider struct {
|
|
messages []dynamic.Message
|
|
wait time.Duration
|
|
first chan struct{}
|
|
throttleDuration time.Duration
|
|
}
|
|
|
|
func (p *mockProvider) Provide(configurationChan chan<- dynamic.Message, _ *safe.Pool) error {
|
|
wait := p.wait
|
|
if wait == 0 {
|
|
wait = 20 * time.Millisecond
|
|
}
|
|
|
|
if len(p.messages) == 0 {
|
|
return errors.New("no messages available")
|
|
}
|
|
|
|
configurationChan <- p.messages[0]
|
|
|
|
if p.first != nil {
|
|
<-p.first
|
|
}
|
|
|
|
for _, message := range p.messages[1:] {
|
|
time.Sleep(wait)
|
|
configurationChan <- message
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ThrottleDuration returns the throttle duration.
|
|
func (p *mockProvider) ThrottleDuration() time.Duration {
|
|
return p.throttleDuration
|
|
}
|
|
|
|
func (p *mockProvider) Init() error {
|
|
return nil
|
|
}
|
|
|
|
func TestNewConfigurationWatcher(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
pvd := &mockProvider{
|
|
messages: []dynamic.Message{{
|
|
ProviderName: "mock",
|
|
Configuration: &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(
|
|
th.WithRouter("test",
|
|
th.WithEntryPoints("e"),
|
|
th.WithServiceName("scv"))),
|
|
),
|
|
},
|
|
}},
|
|
}
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
|
|
|
|
run := make(chan struct{})
|
|
|
|
watcher.AddListener(func(conf dynamic.Configuration) {
|
|
expected := dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(
|
|
th.WithRouter("test@mock",
|
|
th.WithEntryPoints("e"),
|
|
th.WithServiceName("scv"))),
|
|
th.WithMiddlewares(),
|
|
th.WithLoadBalancerServices(),
|
|
),
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{},
|
|
Middlewares: map[string]*dynamic.TCPMiddleware{},
|
|
Services: map[string]*dynamic.TCPService{},
|
|
},
|
|
TLS: &dynamic.TLSConfiguration{
|
|
Options: map[string]tls.Options{
|
|
"default": tls.DefaultTLSOptions,
|
|
},
|
|
Stores: map[string]tls.Store{},
|
|
},
|
|
UDP: &dynamic.UDPConfiguration{
|
|
Routers: map[string]*dynamic.UDPRouter{},
|
|
Services: map[string]*dynamic.UDPService{},
|
|
},
|
|
}
|
|
|
|
assert.Equal(t, expected, conf)
|
|
close(run)
|
|
})
|
|
|
|
watcher.Start()
|
|
<-run
|
|
}
|
|
|
|
func TestWaitForRequiredProvider(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
pvdAggregator := &mockProvider{
|
|
wait: 5 * time.Millisecond,
|
|
}
|
|
|
|
config := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
|
),
|
|
}
|
|
|
|
pvdAggregator.messages = append(pvdAggregator.messages, dynamic.Message{
|
|
ProviderName: "mock",
|
|
Configuration: config,
|
|
})
|
|
|
|
pvdAggregator.messages = append(pvdAggregator.messages, dynamic.Message{
|
|
ProviderName: "required",
|
|
Configuration: config,
|
|
})
|
|
|
|
pvdAggregator.messages = append(pvdAggregator.messages, dynamic.Message{
|
|
ProviderName: "mock2",
|
|
Configuration: config,
|
|
})
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, pvdAggregator, []string{}, "required")
|
|
|
|
publishedConfigCount := 0
|
|
watcher.AddListener(func(_ dynamic.Configuration) {
|
|
publishedConfigCount++
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// give some time so that the configuration can be processed
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
// after 20 milliseconds we should have 2 configs published
|
|
assert.Equal(t, 2, publishedConfigCount, "times configs were published")
|
|
}
|
|
|
|
func TestIgnoreTransientConfiguration(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
config := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
|
),
|
|
}
|
|
|
|
expectedConfig := dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo@mock", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar@mock")),
|
|
th.WithMiddlewares(),
|
|
),
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{},
|
|
Middlewares: map[string]*dynamic.TCPMiddleware{},
|
|
Services: map[string]*dynamic.TCPService{},
|
|
},
|
|
UDP: &dynamic.UDPConfiguration{
|
|
Routers: map[string]*dynamic.UDPRouter{},
|
|
Services: map[string]*dynamic.UDPService{},
|
|
},
|
|
TLS: &dynamic.TLSConfiguration{
|
|
Options: map[string]tls.Options{
|
|
"default": tls.DefaultTLSOptions,
|
|
},
|
|
Stores: map[string]tls.Store{},
|
|
},
|
|
}
|
|
|
|
expectedConfig3 := dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo@mock", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar-config3@mock")),
|
|
th.WithMiddlewares(),
|
|
),
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{},
|
|
Middlewares: map[string]*dynamic.TCPMiddleware{},
|
|
Services: map[string]*dynamic.TCPService{},
|
|
},
|
|
UDP: &dynamic.UDPConfiguration{
|
|
Routers: map[string]*dynamic.UDPRouter{},
|
|
Services: map[string]*dynamic.UDPService{},
|
|
},
|
|
TLS: &dynamic.TLSConfiguration{
|
|
Options: map[string]tls.Options{
|
|
"default": tls.DefaultTLSOptions,
|
|
},
|
|
Stores: map[string]tls.Store{},
|
|
},
|
|
}
|
|
|
|
config2 := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("baz", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("toto")),
|
|
),
|
|
}
|
|
|
|
config3 := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar-config3")),
|
|
),
|
|
}
|
|
watcher := NewConfigurationWatcher(routinesPool, &mockProvider{}, []string{}, "")
|
|
|
|
// To be able to "block" the writes, we change the chan to remove buffering.
|
|
watcher.allProvidersConfigs = make(chan dynamic.Message)
|
|
|
|
publishedConfigCount := 0
|
|
|
|
firstConfigHandled := make(chan struct{})
|
|
blockConfConsumer := make(chan struct{})
|
|
blockConfConsumerAssert := make(chan struct{})
|
|
watcher.AddListener(func(config dynamic.Configuration) {
|
|
publishedConfigCount++
|
|
|
|
if publishedConfigCount > 2 {
|
|
t.Fatal("More than 2 published configuration")
|
|
}
|
|
|
|
if publishedConfigCount == 1 {
|
|
assert.Equal(t, expectedConfig, config)
|
|
close(firstConfigHandled)
|
|
|
|
<-blockConfConsumer
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
|
|
if publishedConfigCount == 2 {
|
|
assert.Equal(t, expectedConfig3, config)
|
|
close(blockConfConsumerAssert)
|
|
}
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
watcher.allProvidersConfigs <- dynamic.Message{
|
|
ProviderName: "mock",
|
|
Configuration: config,
|
|
}
|
|
|
|
<-firstConfigHandled
|
|
|
|
watcher.allProvidersConfigs <- dynamic.Message{
|
|
ProviderName: "mock",
|
|
Configuration: config2,
|
|
}
|
|
|
|
watcher.allProvidersConfigs <- dynamic.Message{
|
|
ProviderName: "mock",
|
|
Configuration: config,
|
|
}
|
|
|
|
close(blockConfConsumer)
|
|
|
|
watcher.allProvidersConfigs <- dynamic.Message{
|
|
ProviderName: "mock",
|
|
Configuration: config3,
|
|
}
|
|
|
|
select {
|
|
case <-blockConfConsumerAssert:
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("Timeout")
|
|
}
|
|
}
|
|
|
|
func TestListenProvidersThrottleProviderConfigReload(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
pvd := &mockProvider{
|
|
wait: 10 * time.Millisecond,
|
|
throttleDuration: 30 * time.Millisecond,
|
|
}
|
|
|
|
for i := 0; i < 5; i++ {
|
|
pvd.messages = append(pvd.messages, dynamic.Message{
|
|
ProviderName: "mock",
|
|
Configuration: &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo"+strconv.Itoa(i), th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
|
),
|
|
},
|
|
})
|
|
}
|
|
|
|
providerAggregator := aggregator.ProviderAggregator{}
|
|
err := providerAggregator.AddProvider(pvd)
|
|
assert.NoError(t, err)
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, providerAggregator, []string{}, "")
|
|
|
|
publishedConfigCount := 0
|
|
watcher.AddListener(func(_ dynamic.Configuration) {
|
|
publishedConfigCount++
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// Give some time so that the configuration can be processed.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// To load 5 new configs it would require 150ms (5 configs * 30ms).
|
|
// In 100ms, we should only have time to load 3 configs.
|
|
assert.LessOrEqual(t, publishedConfigCount, 3, "config was applied too many times")
|
|
assert.Greater(t, publishedConfigCount, 0, "config was not applied at least once")
|
|
}
|
|
|
|
func TestListenProvidersSkipsEmptyConfigs(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
pvd := &mockProvider{
|
|
messages: []dynamic.Message{{ProviderName: "mock"}},
|
|
}
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
|
|
watcher.AddListener(func(_ dynamic.Configuration) {
|
|
t.Error("An empty configuration was published but it should not")
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// give some time so that the configuration can be processed
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
message := dynamic.Message{
|
|
ProviderName: "mock",
|
|
Configuration: &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
|
),
|
|
},
|
|
}
|
|
|
|
pvd := &mockProvider{
|
|
messages: []dynamic.Message{message, message},
|
|
}
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
|
|
|
|
var configurationReloads int
|
|
watcher.AddListener(func(_ dynamic.Configuration) {
|
|
configurationReloads++
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// give some time so that the configuration can be processed
|
|
time.Sleep(100 * time.Millisecond)
|
|
assert.Equal(t, 1, configurationReloads, "Same configuration should not be published multiple times")
|
|
}
|
|
|
|
func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
configuration := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
|
),
|
|
}
|
|
|
|
transientConfiguration := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("bad", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bad")),
|
|
),
|
|
}
|
|
|
|
pvd := &mockProvider{
|
|
wait: 5 * time.Millisecond, // The last message needs to be received before the second has been fully processed
|
|
throttleDuration: 15 * time.Millisecond,
|
|
messages: []dynamic.Message{
|
|
{ProviderName: "mock", Configuration: configuration},
|
|
{ProviderName: "mock", Configuration: transientConfiguration},
|
|
{ProviderName: "mock", Configuration: configuration},
|
|
},
|
|
}
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
|
|
|
|
var lastConfig dynamic.Configuration
|
|
watcher.AddListener(func(conf dynamic.Configuration) {
|
|
lastConfig = conf
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// give some time so that the configuration can be processed
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
expected := dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo@mock", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar@mock")),
|
|
th.WithMiddlewares(),
|
|
),
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{},
|
|
Middlewares: map[string]*dynamic.TCPMiddleware{},
|
|
Services: map[string]*dynamic.TCPService{},
|
|
},
|
|
UDP: &dynamic.UDPConfiguration{
|
|
Routers: map[string]*dynamic.UDPRouter{},
|
|
Services: map[string]*dynamic.UDPService{},
|
|
},
|
|
TLS: &dynamic.TLSConfiguration{
|
|
Options: map[string]tls.Options{
|
|
"default": tls.DefaultTLSOptions,
|
|
},
|
|
Stores: map[string]tls.Store{},
|
|
},
|
|
}
|
|
|
|
assert.Equal(t, expected, lastConfig)
|
|
}
|
|
|
|
func TestListenProvidersIgnoreSameConfig(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
configuration := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
|
),
|
|
}
|
|
|
|
transientConfiguration := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("bad", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bad")),
|
|
),
|
|
}
|
|
|
|
// The transient configuration is sent alternatively with the configuration we want to be applied.
|
|
// It is intended to show that even if the configurations are different,
|
|
// those transient configurations will be ignored if they are sent in a time frame
|
|
// lower than the provider throttle duration.
|
|
pvd := &mockProvider{
|
|
wait: 1 * time.Microsecond, // Enqueue them fast
|
|
throttleDuration: time.Millisecond,
|
|
first: make(chan struct{}),
|
|
messages: []dynamic.Message{
|
|
{ProviderName: "mock", Configuration: configuration},
|
|
{ProviderName: "mock", Configuration: transientConfiguration},
|
|
{ProviderName: "mock", Configuration: configuration},
|
|
{ProviderName: "mock", Configuration: transientConfiguration},
|
|
{ProviderName: "mock", Configuration: configuration},
|
|
},
|
|
}
|
|
|
|
providerAggregator := aggregator.ProviderAggregator{}
|
|
err := providerAggregator.AddProvider(pvd)
|
|
assert.NoError(t, err)
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, providerAggregator, []string{}, "")
|
|
|
|
var configurationReloads int
|
|
var lastConfig dynamic.Configuration
|
|
var once sync.Once
|
|
watcher.AddListener(func(conf dynamic.Configuration) {
|
|
configurationReloads++
|
|
lastConfig = conf
|
|
|
|
// Allows next configurations to be sent by the mock provider as soon as the first configuration message is applied.
|
|
once.Do(func() {
|
|
pvd.first <- struct{}{}
|
|
// Wait for all configuration messages to pile in
|
|
time.Sleep(5 * time.Millisecond)
|
|
})
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// Wait long enough
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
expected := dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo@mock", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar@mock")),
|
|
th.WithMiddlewares(),
|
|
),
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{},
|
|
Middlewares: map[string]*dynamic.TCPMiddleware{},
|
|
Services: map[string]*dynamic.TCPService{},
|
|
},
|
|
UDP: &dynamic.UDPConfiguration{
|
|
Routers: map[string]*dynamic.UDPRouter{},
|
|
Services: map[string]*dynamic.UDPService{},
|
|
},
|
|
TLS: &dynamic.TLSConfiguration{
|
|
Options: map[string]tls.Options{
|
|
"default": tls.DefaultTLSOptions,
|
|
},
|
|
Stores: map[string]tls.Store{},
|
|
},
|
|
}
|
|
|
|
assert.Equal(t, expected, lastConfig)
|
|
|
|
assert.Equal(t, 1, configurationReloads)
|
|
}
|
|
|
|
func TestApplyConfigUnderStress(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, &mockProvider{}, []string{}, "")
|
|
|
|
routinesPool.GoCtx(func(ctx context.Context) {
|
|
i := 0
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case watcher.allProvidersConfigs <- dynamic.Message{ProviderName: "mock", Configuration: &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo"+strconv.Itoa(i), th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
|
),
|
|
}}:
|
|
}
|
|
i++
|
|
}
|
|
})
|
|
|
|
var configurationReloads int
|
|
watcher.AddListener(func(conf dynamic.Configuration) {
|
|
configurationReloads++
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Ensure that at least two configurations have been applied
|
|
// if we simulate being spammed configuration changes by the provider(s).
|
|
// In theory, checking at least one would be sufficient,
|
|
// but checking for two also ensures that we're looping properly,
|
|
// and that the whole algo holds, etc.
|
|
t.Log(configurationReloads)
|
|
assert.GreaterOrEqual(t, configurationReloads, 2)
|
|
}
|
|
|
|
func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
configuration := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
|
),
|
|
}
|
|
|
|
transientConfiguration := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("bad", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bad")),
|
|
),
|
|
}
|
|
|
|
transientConfiguration2 := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("bad2", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bad2")),
|
|
),
|
|
}
|
|
|
|
finalConfiguration := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("final", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("final")),
|
|
),
|
|
}
|
|
|
|
pvd := &mockProvider{
|
|
wait: 10 * time.Microsecond, // Enqueue them fast
|
|
throttleDuration: 10 * time.Millisecond,
|
|
messages: []dynamic.Message{
|
|
{ProviderName: "mock", Configuration: configuration},
|
|
{ProviderName: "mock", Configuration: transientConfiguration},
|
|
{ProviderName: "mock", Configuration: transientConfiguration2},
|
|
{ProviderName: "mock", Configuration: finalConfiguration},
|
|
},
|
|
}
|
|
|
|
providerAggregator := aggregator.ProviderAggregator{}
|
|
err := providerAggregator.AddProvider(pvd)
|
|
assert.NoError(t, err)
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, providerAggregator, []string{}, "")
|
|
|
|
var configurationReloads int
|
|
var lastConfig dynamic.Configuration
|
|
watcher.AddListener(func(conf dynamic.Configuration) {
|
|
configurationReloads++
|
|
lastConfig = conf
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// Wait long enough
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
expected := dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("final@mock", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("final@mock")),
|
|
th.WithMiddlewares(),
|
|
),
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{},
|
|
Middlewares: map[string]*dynamic.TCPMiddleware{},
|
|
Services: map[string]*dynamic.TCPService{},
|
|
},
|
|
UDP: &dynamic.UDPConfiguration{
|
|
Routers: map[string]*dynamic.UDPRouter{},
|
|
Services: map[string]*dynamic.UDPService{},
|
|
},
|
|
TLS: &dynamic.TLSConfiguration{
|
|
Options: map[string]tls.Options{
|
|
"default": tls.DefaultTLSOptions,
|
|
},
|
|
Stores: map[string]tls.Store{},
|
|
},
|
|
}
|
|
|
|
assert.Equal(t, expected, lastConfig)
|
|
|
|
assert.Equal(t, 2, configurationReloads)
|
|
}
|
|
|
|
func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
configuration := &dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
|
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
|
),
|
|
}
|
|
|
|
pvd := &mockProvider{
|
|
messages: []dynamic.Message{
|
|
{ProviderName: "mock", Configuration: configuration},
|
|
{ProviderName: "mock2", Configuration: configuration},
|
|
},
|
|
}
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
|
|
|
|
var publishedProviderConfig dynamic.Configuration
|
|
|
|
watcher.AddListener(func(conf dynamic.Configuration) {
|
|
publishedProviderConfig = conf
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// give some time so that the configuration can be processed
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
expected := dynamic.Configuration{
|
|
HTTP: th.BuildConfiguration(
|
|
th.WithRouters(
|
|
th.WithRouter("foo@mock", th.WithEntryPoints("ep")),
|
|
th.WithRouter("foo@mock2", th.WithEntryPoints("ep")),
|
|
),
|
|
th.WithLoadBalancerServices(
|
|
th.WithService("bar@mock"),
|
|
th.WithService("bar@mock2"),
|
|
),
|
|
th.WithMiddlewares(),
|
|
),
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{},
|
|
Middlewares: map[string]*dynamic.TCPMiddleware{},
|
|
Services: map[string]*dynamic.TCPService{},
|
|
},
|
|
TLS: &dynamic.TLSConfiguration{
|
|
Options: map[string]tls.Options{
|
|
"default": tls.DefaultTLSOptions,
|
|
},
|
|
Stores: map[string]tls.Store{},
|
|
},
|
|
UDP: &dynamic.UDPConfiguration{
|
|
Routers: map[string]*dynamic.UDPRouter{},
|
|
Services: map[string]*dynamic.UDPService{},
|
|
},
|
|
}
|
|
|
|
assert.Equal(t, expected, publishedProviderConfig)
|
|
}
|
|
|
|
func TestPublishConfigUpdatedByProvider(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
pvdConfiguration := dynamic.Configuration{
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{
|
|
"foo": {},
|
|
},
|
|
},
|
|
}
|
|
|
|
pvd := &mockProvider{
|
|
wait: 10 * time.Millisecond,
|
|
messages: []dynamic.Message{
|
|
{
|
|
ProviderName: "mock",
|
|
Configuration: &pvdConfiguration,
|
|
},
|
|
{
|
|
ProviderName: "mock",
|
|
Configuration: &pvdConfiguration,
|
|
},
|
|
},
|
|
}
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
|
|
|
|
publishedConfigCount := 0
|
|
watcher.AddListener(func(configuration dynamic.Configuration) {
|
|
publishedConfigCount++
|
|
|
|
// Update the provider configuration published in next dynamic Message which should trigger a new publishing.
|
|
pvdConfiguration.TCP.Routers["bar"] = &dynamic.TCPRouter{}
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// give some time so that the configuration can be processed.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
assert.Equal(t, 2, publishedConfigCount)
|
|
}
|
|
|
|
func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) {
|
|
routinesPool := safe.NewPool(context.Background())
|
|
|
|
pvd := &mockProvider{
|
|
wait: 10 * time.Millisecond,
|
|
messages: []dynamic.Message{
|
|
{
|
|
ProviderName: "mock",
|
|
Configuration: &dynamic.Configuration{
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{
|
|
"foo": {},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
ProviderName: "mock",
|
|
Configuration: &dynamic.Configuration{
|
|
TCP: &dynamic.TCPConfiguration{
|
|
Routers: map[string]*dynamic.TCPRouter{
|
|
"foo": {},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
|
|
|
|
publishedConfigCount := 0
|
|
watcher.AddListener(func(configuration dynamic.Configuration) {
|
|
publishedConfigCount++
|
|
|
|
// Modify the provided configuration.
|
|
// This should not modify the configuration stored in the configuration watcher and therefore there will be no new publishing.
|
|
configuration.TCP.Routers["foo@mock"].Rule = "bar"
|
|
})
|
|
|
|
watcher.Start()
|
|
|
|
t.Cleanup(watcher.Stop)
|
|
t.Cleanup(routinesPool.Stop)
|
|
|
|
// give some time so that the configuration can be processed.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
assert.Equal(t, 1, publishedConfigCount)
|
|
}
|