Fix memory leak in listenProviders
Signed-off-by: Emile Vauge <emile@vauge.com>
This commit is contained in:
parent
29e647763a
commit
56ef678c09
2 changed files with 16 additions and 20 deletions
|
@ -107,6 +107,15 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage, po
|
||||||
}
|
}
|
||||||
if provider.Watch {
|
if provider.Watch {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
pool.Go(func(stop chan bool) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stop:
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
f := filters.NewArgs()
|
f := filters.NewArgs()
|
||||||
f.Add("type", "container")
|
f.Add("type", "container")
|
||||||
options := dockertypes.EventsOptions{
|
options := dockertypes.EventsOptions{
|
||||||
|
@ -134,15 +143,6 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage, po
|
||||||
eventHandler.Handle("die", startStopHandle)
|
eventHandler.Handle("die", startStopHandle)
|
||||||
|
|
||||||
errChan := events.MonitorWithHandler(ctx, dockerClient, options, eventHandler)
|
errChan := events.MonitorWithHandler(ctx, dockerClient, options, eventHandler)
|
||||||
pool.Go(func(stop chan bool) {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-stop:
|
|
||||||
cancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
if err := <-errChan; err != nil {
|
if err := <-errChan; err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
18
server.go
18
server.go
|
@ -149,17 +149,13 @@ func (server *Server) listenProviders(stop chan bool) {
|
||||||
server.configurationValidatedChan <- configMsg
|
server.configurationValidatedChan <- configMsg
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
|
log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
|
||||||
server.routinesPool.Go(func(stop chan bool) {
|
safe.Go(func() {
|
||||||
select {
|
<-time.After(server.globalConfiguration.ProvidersThrottleDuration)
|
||||||
case <-stop:
|
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
|
||||||
return
|
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
|
||||||
case <-time.After(server.globalConfiguration.ProvidersThrottleDuration):
|
log.Debugf("Waited for %s config, OK", configMsg.ProviderName)
|
||||||
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
|
if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok {
|
||||||
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
|
server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage)
|
||||||
log.Debugf("Waited for %s config, OK", configMsg.ProviderName)
|
|
||||||
if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok {
|
|
||||||
server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue