Don't throw away valid configuration updates
This commit is contained in:
parent
8bb625adb7
commit
6e43ab5897
2 changed files with 67 additions and 7 deletions
|
@ -143,8 +143,6 @@ func (c *ConfigurationWatcher) loadMessage(configMsg dynamic.Message) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) {
|
func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) {
|
||||||
currentConfigurations := c.currentConfigurations.Get().(dynamic.Configurations)
|
|
||||||
|
|
||||||
logger := log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName)
|
logger := log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName)
|
||||||
if log.GetLevel() == logrus.DebugLevel {
|
if log.GetLevel() == logrus.DebugLevel {
|
||||||
copyConf := configMsg.Configuration.DeepCopy()
|
copyConf := configMsg.Configuration.DeepCopy()
|
||||||
|
@ -172,11 +170,6 @@ func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
|
|
||||||
logger.Infof("Skipping same configuration for provider %s", configMsg.ProviderName)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
providerConfigUpdateCh, ok := c.providerConfigUpdateMap[configMsg.ProviderName]
|
providerConfigUpdateCh, ok := c.providerConfigUpdateMap[configMsg.ProviderName]
|
||||||
if !ok {
|
if !ok {
|
||||||
providerConfigUpdateCh = make(chan dynamic.Message)
|
providerConfigUpdateCh = make(chan dynamic.Message)
|
||||||
|
@ -211,11 +204,18 @@ func (c *ConfigurationWatcher) throttleProviderConfigReload(ctx context.Context,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var previousConfig dynamic.Message
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case nextConfig := <-in:
|
case nextConfig := <-in:
|
||||||
|
if reflect.DeepEqual(previousConfig, nextConfig) {
|
||||||
|
logger := log.WithoutContext().WithField(log.ProviderName, nextConfig.ProviderName)
|
||||||
|
logger.Info("Skipping same configuration")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
previousConfig = nextConfig
|
||||||
ring.In() <- nextConfig
|
ring.In() <- nextConfig
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,6 +175,66 @@ func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) {
|
||||||
|
routinesPool := safe.NewPool(context.Background())
|
||||||
|
|
||||||
|
configuration := &dynamic.Configuration{
|
||||||
|
HTTP: th.BuildConfiguration(
|
||||||
|
th.WithRouters(th.WithRouter("foo")),
|
||||||
|
th.WithLoadBalancerServices(th.WithService("bar")),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
transientConfiguration := &dynamic.Configuration{
|
||||||
|
HTTP: th.BuildConfiguration(
|
||||||
|
th.WithRouters(th.WithRouter("bad")),
|
||||||
|
th.WithLoadBalancerServices(th.WithService("bad")),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
pvd := &mockProvider{
|
||||||
|
wait: 5 * time.Millisecond, // The last message needs to be received before the second has been fully processed
|
||||||
|
messages: []dynamic.Message{
|
||||||
|
{ProviderName: "mock", Configuration: configuration},
|
||||||
|
{ProviderName: "mock", Configuration: transientConfiguration},
|
||||||
|
{ProviderName: "mock", Configuration: configuration},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
watcher := NewConfigurationWatcher(routinesPool, pvd, 15*time.Millisecond)
|
||||||
|
|
||||||
|
var lastConfig dynamic.Configuration
|
||||||
|
watcher.AddListener(func(conf dynamic.Configuration) {
|
||||||
|
lastConfig = conf
|
||||||
|
})
|
||||||
|
|
||||||
|
watcher.Start()
|
||||||
|
defer watcher.Stop()
|
||||||
|
|
||||||
|
// give some time so that the configuration can be processed
|
||||||
|
time.Sleep(40 * time.Millisecond)
|
||||||
|
|
||||||
|
expected := dynamic.Configuration{
|
||||||
|
HTTP: th.BuildConfiguration(
|
||||||
|
th.WithRouters(th.WithRouter("foo@mock")),
|
||||||
|
th.WithLoadBalancerServices(th.WithService("bar@mock")),
|
||||||
|
th.WithMiddlewares(),
|
||||||
|
),
|
||||||
|
TCP: &dynamic.TCPConfiguration{
|
||||||
|
Routers: map[string]*dynamic.TCPRouter{},
|
||||||
|
Services: map[string]*dynamic.TCPService{},
|
||||||
|
},
|
||||||
|
TLS: &dynamic.TLSConfiguration{
|
||||||
|
Options: map[string]tls.Options{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
Stores: map[string]tls.Store{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, expected, lastConfig)
|
||||||
|
}
|
||||||
|
|
||||||
func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
|
func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
|
||||||
routinesPool := safe.NewPool(context.Background())
|
routinesPool := safe.NewPool(context.Background())
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue