Merge pull request #239 from goguardian/kv-watch-tree
Support libkv.WatchTree chan errors:
This commit is contained in:
commit
3af21612b6
2 changed files with 77 additions and 21 deletions
|
@ -35,6 +35,27 @@ type KvTLS struct {
|
||||||
InsecureSkipVerify bool
|
InsecureSkipVerify bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string) {
|
||||||
|
for {
|
||||||
|
chanKeys, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to WatchTree %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for range chanKeys {
|
||||||
|
configuration := provider.loadConfig()
|
||||||
|
if configuration != nil {
|
||||||
|
configurationChan <- types.ConfigMessage{
|
||||||
|
ProviderName: string(provider.storeType),
|
||||||
|
Configuration: configuration,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Warnf("Intermittent failure to WatchTree KV. Retrying.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error {
|
func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error {
|
||||||
storeConfig := &store.Config{
|
storeConfig := &store.Config{
|
||||||
ConnectionTimeout: 30 * time.Second,
|
ConnectionTimeout: 30 * time.Second,
|
||||||
|
@ -80,24 +101,7 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error
|
||||||
}
|
}
|
||||||
provider.kvclient = kv
|
provider.kvclient = kv
|
||||||
if provider.Watch {
|
if provider.Watch {
|
||||||
stopCh := make(chan struct{})
|
go provider.watchKv(configurationChan, provider.Prefix)
|
||||||
chanKeys, err := kv.WatchTree(provider.Prefix, stopCh)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
<-chanKeys
|
|
||||||
configuration := provider.loadConfig()
|
|
||||||
if configuration != nil {
|
|
||||||
configurationChan <- types.ConfigMessage{
|
|
||||||
ProviderName: string(provider.storeType),
|
|
||||||
Configuration: configuration,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
defer close(stopCh)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
configuration := provider.loadConfig()
|
configuration := provider.loadConfig()
|
||||||
configurationChan <- types.ConfigMessage{
|
configurationChan <- types.ConfigMessage{
|
||||||
|
|
|
@ -2,8 +2,10 @@ package provider
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/containous/traefik/types"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/docker/libkv/store"
|
"github.com/docker/libkv/store"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
@ -231,10 +233,60 @@ func TestKvLast(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type KvMock struct {
|
||||||
|
Kv
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *KvMock) loadConfig() *types.Configuration {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKvWatchTree(t *testing.T) {
|
||||||
|
returnedChans := make(chan chan []*store.KVPair)
|
||||||
|
provider := &KvMock{
|
||||||
|
Kv{
|
||||||
|
kvclient: &Mock{
|
||||||
|
WatchTreeMethod: func() <-chan []*store.KVPair {
|
||||||
|
c := make(chan []*store.KVPair, 10)
|
||||||
|
returnedChans <- c
|
||||||
|
return c
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
configChan := make(chan types.ConfigMessage)
|
||||||
|
go provider.watchKv(configChan, "prefix")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c1 := <-returnedChans:
|
||||||
|
c1 <- []*store.KVPair{}
|
||||||
|
<-configChan
|
||||||
|
close(c1) // WatchTree chans can close due to error
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("Failed to create a new WatchTree chan")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c2 := <-returnedChans:
|
||||||
|
c2 <- []*store.KVPair{}
|
||||||
|
<-configChan
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("Failed to create a new WatchTree chan")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _ = <-configChan:
|
||||||
|
t.Fatalf("configChan should be empty")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Extremely limited mock store so we can test initialization
|
// Extremely limited mock store so we can test initialization
|
||||||
type Mock struct {
|
type Mock struct {
|
||||||
Error bool
|
Error bool
|
||||||
KVPairs []*store.KVPair
|
KVPairs []*store.KVPair
|
||||||
|
WatchTreeMethod func() <-chan []*store.KVPair
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Mock) Put(key string, value []byte, opts *store.WriteOptions) error {
|
func (s *Mock) Put(key string, value []byte, opts *store.WriteOptions) error {
|
||||||
|
@ -269,7 +321,7 @@ func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair,
|
||||||
|
|
||||||
// WatchTree mock
|
// WatchTree mock
|
||||||
func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
|
func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
|
||||||
return nil, errors.New("WatchTree not supported")
|
return s.WatchTreeMethod(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLock mock
|
// NewLock mock
|
||||||
|
|
Loading…
Reference in a new issue