Use goroutine pool in throttleProvider
This commit is contained in:
parent
b1e3444798
commit
d88263dbf9
2 changed files with 7 additions and 4 deletions
|
@ -367,7 +367,7 @@ func (s *Server) preLoadConfiguration(configMsg types.ConfigMessage) {
|
||||||
providerConfigUpdateCh = make(chan types.ConfigMessage)
|
providerConfigUpdateCh = make(chan types.ConfigMessage)
|
||||||
s.providerConfigUpdateMap[configMsg.ProviderName] = providerConfigUpdateCh
|
s.providerConfigUpdateMap[configMsg.ProviderName] = providerConfigUpdateCh
|
||||||
s.routinesPool.Go(func(stop chan bool) {
|
s.routinesPool.Go(func(stop chan bool) {
|
||||||
throttleProviderConfigReload(providersThrottleDuration, s.configurationValidatedChan, providerConfigUpdateCh, stop)
|
s.throttleProviderConfigReload(providersThrottleDuration, s.configurationValidatedChan, providerConfigUpdateCh, stop)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
providerConfigUpdateCh <- configMsg
|
providerConfigUpdateCh <- configMsg
|
||||||
|
@ -378,11 +378,11 @@ func (s *Server) preLoadConfiguration(configMsg types.ConfigMessage) {
|
||||||
// It will immediately publish a new configuration and then only publish the next configuration after the throttle duration.
|
// It will immediately publish a new configuration and then only publish the next configuration after the throttle duration.
|
||||||
// Note that in the case it receives N new configs in the timeframe of the throttle duration after publishing,
|
// Note that in the case it receives N new configs in the timeframe of the throttle duration after publishing,
|
||||||
// it will publish the last of the newly received configurations.
|
// it will publish the last of the newly received configurations.
|
||||||
func throttleProviderConfigReload(throttle time.Duration, publish chan<- types.ConfigMessage, in <-chan types.ConfigMessage, stop chan bool) {
|
func (s *Server) throttleProviderConfigReload(throttle time.Duration, publish chan<- types.ConfigMessage, in <-chan types.ConfigMessage, stop chan bool) {
|
||||||
ring := channels.NewRingChannel(1)
|
ring := channels.NewRingChannel(1)
|
||||||
defer ring.Close()
|
defer ring.Close()
|
||||||
|
|
||||||
safe.Go(func() {
|
s.routinesPool.Go(func(stop chan bool) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
|
|
|
@ -297,7 +297,10 @@ func TestThrottleProviderConfigReload(t *testing.T) {
|
||||||
stop <- true
|
stop <- true
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go throttleProviderConfigReload(throttleDuration, publishConfig, providerConfig, stop)
|
globalConfig := configuration.GlobalConfiguration{}
|
||||||
|
server := NewServer(globalConfig)
|
||||||
|
|
||||||
|
go server.throttleProviderConfigReload(throttleDuration, publishConfig, providerConfig, stop)
|
||||||
|
|
||||||
publishedConfigCount := 0
|
publishedConfigCount := 0
|
||||||
stopConsumeConfigs := make(chan bool)
|
stopConsumeConfigs := make(chan bool)
|
||||||
|
|
Loading…
Reference in a new issue