Support libkv.WatchTree chan errors:

- libkv.WatchTree returns a channel whose messages represent changes
    to the watched tree. In situations where libkv cannot read from the
    underlying store, libkv will close the provided channel.
  - This PR handles this edge case and fixes #238.
This commit is contained in:
Advait Shinde 2016-03-04 22:52:08 +00:00
parent a458018aa2
commit d63d2a8a26
2 changed files with 75 additions and 21 deletions

View file

@ -35,6 +35,27 @@ type KvTLS struct {
InsecureSkipVerify bool
}
func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string) {
for {
chanKeys, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */)
if err != nil {
log.Errorf("Failed to WatchTree %s", err)
continue
}
for range chanKeys {
configuration := provider.loadConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: string(provider.storeType),
Configuration: configuration,
}
}
}
log.Warnf("Intermittent failure to WatchTree KV. Retrying.")
}
}
func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error {
storeConfig := &store.Config{
ConnectionTimeout: 30 * time.Second,
@ -80,24 +101,7 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error
}
provider.kvclient = kv
if provider.Watch {
stopCh := make(chan struct{})
chanKeys, err := kv.WatchTree(provider.Prefix, stopCh)
if err != nil {
return err
}
go func() {
for {
<-chanKeys
configuration := provider.loadConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: string(provider.storeType),
Configuration: configuration,
}
}
defer close(stopCh)
}
}()
go provider.watchKv(configurationChan, provider.Prefix)
}
configuration := provider.loadConfig()
configurationChan <- types.ConfigMessage{

View file

@ -2,8 +2,10 @@ package provider
import (
"errors"
"github.com/containous/traefik/types"
"strings"
"testing"
"time"
"github.com/docker/libkv/store"
"reflect"
@ -231,10 +233,58 @@ func TestKvLast(t *testing.T) {
}
}
type KvMock struct {
Kv
}
func (provider *KvMock) loadConfig() *types.Configuration {
return nil
}
func TestKvWatchTree(t *testing.T) {
returnedChans := make(chan chan []*store.KVPair)
provider := &KvMock{
Kv{
kvclient: &Mock{
WatchTreeMethod: func() <-chan []*store.KVPair {
c := make(chan []*store.KVPair, 10)
returnedChans <- c
return c
},
},
},
}
configChan := make(chan types.ConfigMessage)
go provider.watchKv(configChan, "prefix")
select {
case c1 := <-returnedChans:
c1 <- []*store.KVPair{}
<-configChan
close(c1) // WatchTree chans can close due to error
case <-time.After(1 * time.Second):
}
select {
case c2 := <-returnedChans:
c2 <- []*store.KVPair{}
<-configChan
case <-time.After(1 * time.Second):
}
select {
case _ = <-configChan:
t.Fatalf("configChan should be empty")
default:
}
}
// Extremely limited mock store so we can test initialization
type Mock struct {
Error bool
KVPairs []*store.KVPair
Error bool
KVPairs []*store.KVPair
WatchTreeMethod func() <-chan []*store.KVPair
}
func (s *Mock) Put(key string, value []byte, opts *store.WriteOptions) error {
@ -269,7 +319,7 @@ func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair,
// WatchTree mock
func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
return nil, errors.New("WatchTree not supported")
return s.WatchTreeMethod(), nil
}
// NewLock mock