Reload configuration when port change for one service
This commit is contained in:
parent
48b4eb5c0d
commit
b17d5b80b8
3 changed files with 145 additions and 16 deletions
|
@ -419,6 +419,46 @@ func (s *ConsulCatalogSuite) TestCircuitBreaker(c *check.C) {
|
|||
c.Assert(err, checker.IsNil)
|
||||
}
|
||||
|
||||
func (s *ConsulCatalogSuite) TestRefreshConfigPortChange(c *check.C) {
|
||||
cmd, display := s.traefikCmd(
|
||||
withConfigFile("fixtures/consul_catalog/simple.toml"),
|
||||
"--consulCatalog",
|
||||
"--consulCatalog.exposedByDefault=false",
|
||||
"--consulCatalog.watch=true",
|
||||
"--consulCatalog.endpoint="+s.consulIP+":8500",
|
||||
"--consulCatalog.domain=consul.localhost")
|
||||
defer display(c)
|
||||
err := cmd.Start()
|
||||
c.Assert(err, checker.IsNil)
|
||||
defer cmd.Process.Kill()
|
||||
|
||||
nginx := s.composeProject.Container(c, "nginx1")
|
||||
|
||||
err = s.registerService("test", nginx.NetworkSettings.IPAddress, 81, []string{"name=nginx1", "traefik.enable=true"})
|
||||
c.Assert(err, checker.IsNil, check.Commentf("Error registering service"))
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil)
|
||||
c.Assert(err, checker.IsNil)
|
||||
req.Host = "test.consul.localhost"
|
||||
|
||||
err = try.Request(req, 20*time.Second, try.StatusCodeIs(http.StatusBadGateway))
|
||||
c.Assert(err, checker.IsNil)
|
||||
|
||||
err = try.GetRequest("http://127.0.0.1:8080/api/providers/consul_catalog/backends", 5*time.Second, try.BodyContains("nginx1"))
|
||||
c.Assert(err, checker.IsNil)
|
||||
|
||||
err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{"name=nginx1", "traefik.enable=true"})
|
||||
c.Assert(err, checker.IsNil, check.Commentf("Error registering service"))
|
||||
|
||||
defer s.deregisterService("test", nginx.NetworkSettings.IPAddress)
|
||||
|
||||
err = try.GetRequest("http://127.0.0.1:8080/api/providers/consul_catalog/backends", 60*time.Second, try.BodyContains("nginx1"))
|
||||
c.Assert(err, checker.IsNil)
|
||||
|
||||
err = try.Request(req, 20*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
|
||||
c.Assert(err, checker.IsNil)
|
||||
}
|
||||
|
||||
func (s *ConsulCatalogSuite) TestRetryWithConsulServer(c *check.C) {
|
||||
//Scale consul to 0 to be able to start traefik before and test retry
|
||||
s.composeProject.Scale(c, "consul", 0)
|
||||
|
|
|
@ -78,8 +78,11 @@ func (a nodeSorter) Less(i int, j int) bool {
|
|||
}
|
||||
|
||||
func hasChanged(current map[string]Service, previous map[string]Service) bool {
|
||||
if len(current) != len(previous) {
|
||||
return true
|
||||
}
|
||||
addedServiceKeys, removedServiceKeys := getChangedServiceKeys(current, previous)
|
||||
return len(removedServiceKeys) > 0 || len(addedServiceKeys) > 0 || hasNodeOrTagsChanged(current, previous)
|
||||
return len(removedServiceKeys) > 0 || len(addedServiceKeys) > 0 || hasServiceChanged(current, previous)
|
||||
}
|
||||
|
||||
func getChangedServiceKeys(current map[string]Service, previous map[string]Service) ([]string, []string) {
|
||||
|
@ -92,20 +95,24 @@ func getChangedServiceKeys(current map[string]Service, previous map[string]Servi
|
|||
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
|
||||
}
|
||||
|
||||
func hasNodeOrTagsChanged(current map[string]Service, previous map[string]Service) bool {
|
||||
var added []string
|
||||
var removed []string
|
||||
func hasServiceChanged(current map[string]Service, previous map[string]Service) bool {
|
||||
for key, value := range current {
|
||||
if prevValue, ok := previous[key]; ok {
|
||||
addedNodesKeys, removedNodesKeys := getChangedStringKeys(value.Nodes, prevValue.Nodes)
|
||||
added = append(added, addedNodesKeys...)
|
||||
removed = append(removed, removedNodesKeys...)
|
||||
if len(addedNodesKeys) > 0 || len(removedNodesKeys) > 0 {
|
||||
return true
|
||||
}
|
||||
addedTagsKeys, removedTagsKeys := getChangedStringKeys(value.Tags, prevValue.Tags)
|
||||
added = append(added, addedTagsKeys...)
|
||||
removed = append(removed, removedTagsKeys...)
|
||||
if len(addedTagsKeys) > 0 || len(removedTagsKeys) > 0 {
|
||||
return true
|
||||
}
|
||||
addedPortsKeys, removedPortsKeys := getChangedIntKeys(value.Ports, prevValue.Ports)
|
||||
if len(addedPortsKeys) > 0 || len(removedPortsKeys) > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return len(added) > 0 || len(removed) > 0
|
||||
return false
|
||||
}
|
||||
|
||||
func getChangedStringKeys(currState []string, prevState []string) ([]string, []string) {
|
||||
|
@ -118,6 +125,16 @@ func getChangedStringKeys(currState []string, prevState []string) ([]string, []s
|
|||
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
|
||||
}
|
||||
|
||||
func getChangedIntKeys(currState []int, prevState []int) ([]int, []int) {
|
||||
currKeySet := fun.Set(currState).(map[int]bool)
|
||||
prevKeySet := fun.Set(prevState).(map[int]bool)
|
||||
|
||||
addedKeys := fun.Difference(currKeySet, prevKeySet).(map[int]bool)
|
||||
removedKeys := fun.Difference(prevKeySet, currKeySet).(map[int]bool)
|
||||
|
||||
return fun.Keys(addedKeys).([]int), fun.Keys(removedKeys).([]int)
|
||||
}
|
||||
|
||||
func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) {
|
||||
health := p.client.Health()
|
||||
catalog := p.client.Catalog()
|
||||
|
@ -194,6 +211,7 @@ type Service struct {
|
|||
Name string
|
||||
Tags []string
|
||||
Nodes []string
|
||||
Ports []int
|
||||
}
|
||||
|
||||
func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) {
|
||||
|
@ -235,14 +253,17 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c
|
|||
return
|
||||
}
|
||||
nodesID := getServiceIds(nodes)
|
||||
ports := getServicePorts(nodes)
|
||||
if service, ok := current[key]; ok {
|
||||
service.Tags = value
|
||||
service.Nodes = nodesID
|
||||
service.Ports = ports
|
||||
} else {
|
||||
service := Service{
|
||||
Name: key,
|
||||
Tags: value,
|
||||
Nodes: nodesID,
|
||||
Ports: ports,
|
||||
}
|
||||
current[key] = service
|
||||
}
|
||||
|
@ -267,6 +288,14 @@ func getServiceIds(services []*api.CatalogService) []string {
|
|||
return serviceIds
|
||||
}
|
||||
|
||||
func getServicePorts(services []*api.CatalogService) []int {
|
||||
var servicePorts []int
|
||||
for _, service := range services {
|
||||
servicePorts = append(servicePorts, service.ServicePort)
|
||||
}
|
||||
return servicePorts
|
||||
}
|
||||
|
||||
func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) {
|
||||
health := p.client.Health()
|
||||
opts := &api.QueryOptions{}
|
||||
|
@ -279,7 +308,7 @@ func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) {
|
|||
return p.nodeFilter(service, node)
|
||||
}, data).([]*api.ServiceEntry)
|
||||
|
||||
//Merge tags of nodes matching constraints, in a single slice.
|
||||
// Merge tags of nodes matching constraints, in a single slice.
|
||||
tags := fun.Foldl(func(node *api.ServiceEntry, set []string) []string {
|
||||
return fun.Keys(fun.Union(
|
||||
fun.Set(set),
|
||||
|
@ -480,8 +509,8 @@ func (p *CatalogProvider) buildConfig(catalog []catalogUpdate) *types.Configurat
|
|||
"hasMaxconnAttributes": p.hasMaxconnAttributes,
|
||||
}
|
||||
|
||||
allNodes := []*api.ServiceEntry{}
|
||||
services := []*serviceUpdate{}
|
||||
var allNodes []*api.ServiceEntry
|
||||
var services []*serviceUpdate
|
||||
for _, info := range catalog {
|
||||
if len(info.Nodes) > 0 {
|
||||
services = append(services, info.Service)
|
||||
|
@ -519,7 +548,7 @@ func (p *CatalogProvider) hasMaxconnAttributes(attributes []string) bool {
|
|||
func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate, error) {
|
||||
visited := make(map[string]bool)
|
||||
|
||||
nodes := []catalogUpdate{}
|
||||
var nodes []catalogUpdate
|
||||
for service := range index {
|
||||
name := strings.ToLower(service)
|
||||
if !strings.Contains(name, " ") && !visited[name] {
|
||||
|
@ -555,7 +584,7 @@ func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, st
|
|||
return nil
|
||||
case index, ok := <-watchCh:
|
||||
if !ok {
|
||||
return errors.New("Consul service list nil")
|
||||
return errors.New("consul service list nil")
|
||||
}
|
||||
log.Debug("List of services changed")
|
||||
nodes, err := p.getNodes(index)
|
||||
|
|
|
@ -1094,7 +1094,7 @@ func TestConsulCatalogHasNodeOrTagschanged(t *testing.T) {
|
|||
expected: false,
|
||||
},
|
||||
{
|
||||
desc: "Change detected con tags",
|
||||
desc: "Change detected on tags",
|
||||
current: map[string]Service{
|
||||
"foo-service": {
|
||||
Name: "foo",
|
||||
|
@ -1111,6 +1111,66 @@ func TestConsulCatalogHasNodeOrTagschanged(t *testing.T) {
|
|||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
desc: "Change detected on ports",
|
||||
current: map[string]Service{
|
||||
"foo-service": {
|
||||
Name: "foo",
|
||||
Nodes: []string{"node1"},
|
||||
Tags: []string{"foo=bar"},
|
||||
Ports: []int{80},
|
||||
},
|
||||
},
|
||||
previous: map[string]Service{
|
||||
"foo-service": {
|
||||
Name: "foo",
|
||||
Nodes: []string{"node1"},
|
||||
Tags: []string{"foo"},
|
||||
Ports: []int{81},
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
desc: "Change detected on ports",
|
||||
current: map[string]Service{
|
||||
"foo-service": {
|
||||
Name: "foo",
|
||||
Nodes: []string{"node1"},
|
||||
Tags: []string{"foo=bar"},
|
||||
Ports: []int{80},
|
||||
},
|
||||
},
|
||||
previous: map[string]Service{
|
||||
"foo-service": {
|
||||
Name: "foo",
|
||||
Nodes: []string{"node1"},
|
||||
Tags: []string{"foo"},
|
||||
Ports: []int{81, 82},
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
desc: "No Change detected",
|
||||
current: map[string]Service{
|
||||
"foo-service": {
|
||||
Name: "foo",
|
||||
Nodes: []string{"node1"},
|
||||
Tags: []string{"foo"},
|
||||
Ports: []int{80},
|
||||
},
|
||||
},
|
||||
previous: map[string]Service{
|
||||
"foo-service": {
|
||||
Name: "foo",
|
||||
Nodes: []string{"node1"},
|
||||
Tags: []string{"foo"},
|
||||
Ports: []int{80},
|
||||
},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
|
@ -1118,7 +1178,7 @@ func TestConsulCatalogHasNodeOrTagschanged(t *testing.T) {
|
|||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
actual := hasNodeOrTagsChanged(test.current, test.previous)
|
||||
actual := hasServiceChanged(test.current, test.previous)
|
||||
assert.Equal(t, test.expected, actual)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue