Throttle Kubernetes config refresh
This commit is contained in:
parent
6e8138e19b
commit
43af0b051f
8 changed files with 169 additions and 13 deletions
|
@ -168,6 +168,27 @@ Value of `kubernetes.io/ingress.class` annotation that identifies Ingress object
|
||||||
If the parameter is non-empty, only Ingresses containing an annotation with the same value are processed.
|
If the parameter is non-empty, only Ingresses containing an annotation with the same value are processed.
|
||||||
Otherwise, Ingresses missing the annotation, having an empty value, or the value `traefik` are processed.
|
Otherwise, Ingresses missing the annotation, having an empty value, or the value `traefik` are processed.
|
||||||
|
|
||||||
|
### `throttleDuration`
|
||||||
|
|
||||||
|
_Optional, Default: 0 (no throttling)_
|
||||||
|
|
||||||
|
```toml tab="File (TOML)"
|
||||||
|
[providers.kubernetesCRD]
|
||||||
|
throttleDuration = "10s"
|
||||||
|
# ...
|
||||||
|
```
|
||||||
|
|
||||||
|
```yaml tab="File (YAML)"
|
||||||
|
providers:
|
||||||
|
kubernetesCRD:
|
||||||
|
throttleDuration: "10s"
|
||||||
|
# ...
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash tab="CLI"
|
||||||
|
--providers.kubernetescrd.throttleDuration="10s"
|
||||||
|
```
|
||||||
|
|
||||||
## Resource Configuration
|
## Resource Configuration
|
||||||
|
|
||||||
If you're in a hurry, maybe you'd rather go through the [dynamic](../reference/dynamic-configuration/kubernetes-crd.md) configuration reference.
|
If you're in a hurry, maybe you'd rather go through the [dynamic](../reference/dynamic-configuration/kubernetes-crd.md) configuration reference.
|
||||||
|
|
|
@ -305,6 +305,27 @@ providers:
|
||||||
|
|
||||||
Published Kubernetes Service to copy status from.
|
Published Kubernetes Service to copy status from.
|
||||||
|
|
||||||
|
### `throttleDuration`
|
||||||
|
|
||||||
|
_Optional, Default: 0 (no throttling)_
|
||||||
|
|
||||||
|
```toml tab="File (TOML)"
|
||||||
|
[providers.kubernetesIngress]
|
||||||
|
throttleDuration = "10s"
|
||||||
|
# ...
|
||||||
|
```
|
||||||
|
|
||||||
|
```yaml tab="File (YAML)"
|
||||||
|
providers:
|
||||||
|
kubernetesIngress:
|
||||||
|
throttleDuration: "10s"
|
||||||
|
# ...
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash tab="CLI"
|
||||||
|
--providers.kubernetesingress.throttleDuration="10s"
|
||||||
|
```
|
||||||
|
|
||||||
## Further
|
## Further
|
||||||
|
|
||||||
If one wants to know more about the various aspects of the Ingress spec that Traefik supports, many examples of Ingresses definitions are located in the tests [data](https://github.com/containous/traefik/tree/v2.0/pkg/provider/kubernetes/ingress/fixtures) of the Traefik repository.
|
If one wants to know more about the various aspects of the Ingress spec that Traefik supports, many examples of Ingresses definitions are located in the tests [data](https://github.com/containous/traefik/tree/v2.0/pkg/provider/kubernetes/ingress/fixtures) of the Traefik repository.
|
||||||
|
|
|
@ -46,7 +46,7 @@ Activate dashboard. (Default: ```true```)
|
||||||
Enable additional endpoints for debugging and profiling. (Default: ```false```)
|
Enable additional endpoints for debugging and profiling. (Default: ```false```)
|
||||||
|
|
||||||
`--api.insecure`:
|
`--api.insecure`:
|
||||||
Activate API on an insecure entryPoints named traefik. (Default: ```false```)
|
Activate API directly on the entryPoint named traefik. (Default: ```false```)
|
||||||
|
|
||||||
`--certificatesresolvers.<name>`:
|
`--certificatesresolvers.<name>`:
|
||||||
Certificates resolvers configuration. (Default: ```false```)
|
Certificates resolvers configuration. (Default: ```false```)
|
||||||
|
@ -312,6 +312,9 @@ Kubernetes label selector to use.
|
||||||
`--providers.kubernetescrd.namespaces`:
|
`--providers.kubernetescrd.namespaces`:
|
||||||
Kubernetes namespaces.
|
Kubernetes namespaces.
|
||||||
|
|
||||||
|
`--providers.kubernetescrd.throttleduration`:
|
||||||
|
Ingress refresh throttle duration (Default: ```0```)
|
||||||
|
|
||||||
`--providers.kubernetescrd.token`:
|
`--providers.kubernetescrd.token`:
|
||||||
Kubernetes bearer token (not needed for in-cluster client).
|
Kubernetes bearer token (not needed for in-cluster client).
|
||||||
|
|
||||||
|
@ -345,6 +348,9 @@ Kubernetes Ingress label selector to use.
|
||||||
`--providers.kubernetesingress.namespaces`:
|
`--providers.kubernetesingress.namespaces`:
|
||||||
Kubernetes namespaces.
|
Kubernetes namespaces.
|
||||||
|
|
||||||
|
`--providers.kubernetesingress.throttleduration`:
|
||||||
|
Ingress refresh throttle duration (Default: ```0```)
|
||||||
|
|
||||||
`--providers.kubernetesingress.token`:
|
`--providers.kubernetesingress.token`:
|
||||||
Kubernetes bearer token (not needed for in-cluster client).
|
Kubernetes bearer token (not needed for in-cluster client).
|
||||||
|
|
||||||
|
@ -445,7 +451,7 @@ Watch provider. (Default: ```true```)
|
||||||
Enable Rest backend with default settings. (Default: ```false```)
|
Enable Rest backend with default settings. (Default: ```false```)
|
||||||
|
|
||||||
`--providers.rest.insecure`:
|
`--providers.rest.insecure`:
|
||||||
Activate REST Provider on an insecure entryPoints named traefik. (Default: ```false```)
|
Activate REST Provider directly on the entryPoint named traefik. (Default: ```false```)
|
||||||
|
|
||||||
`--serverstransport.forwardingtimeouts.dialtimeout`:
|
`--serverstransport.forwardingtimeouts.dialtimeout`:
|
||||||
The amount of time to wait until a connection to a backend server can be established. If zero, no timeout exists. (Default: ```30```)
|
The amount of time to wait until a connection to a backend server can be established. If zero, no timeout exists. (Default: ```30```)
|
||||||
|
|
|
@ -46,7 +46,7 @@ Activate dashboard. (Default: ```true```)
|
||||||
Enable additional endpoints for debugging and profiling. (Default: ```false```)
|
Enable additional endpoints for debugging and profiling. (Default: ```false```)
|
||||||
|
|
||||||
`TRAEFIK_API_INSECURE`:
|
`TRAEFIK_API_INSECURE`:
|
||||||
Activate API on an insecure entryPoints named traefik. (Default: ```false```)
|
Activate API directly on the entryPoint named traefik. (Default: ```false```)
|
||||||
|
|
||||||
`TRAEFIK_CERTIFICATESRESOLVERS_<NAME>`:
|
`TRAEFIK_CERTIFICATESRESOLVERS_<NAME>`:
|
||||||
Certificates resolvers configuration. (Default: ```false```)
|
Certificates resolvers configuration. (Default: ```false```)
|
||||||
|
@ -312,6 +312,9 @@ Kubernetes label selector to use.
|
||||||
`TRAEFIK_PROVIDERS_KUBERNETESCRD_NAMESPACES`:
|
`TRAEFIK_PROVIDERS_KUBERNETESCRD_NAMESPACES`:
|
||||||
Kubernetes namespaces.
|
Kubernetes namespaces.
|
||||||
|
|
||||||
|
`TRAEFIK_PROVIDERS_KUBERNETESCRD_THROTTLEDURATION`:
|
||||||
|
Ingress refresh throttle duration (Default: ```0```)
|
||||||
|
|
||||||
`TRAEFIK_PROVIDERS_KUBERNETESCRD_TOKEN`:
|
`TRAEFIK_PROVIDERS_KUBERNETESCRD_TOKEN`:
|
||||||
Kubernetes bearer token (not needed for in-cluster client).
|
Kubernetes bearer token (not needed for in-cluster client).
|
||||||
|
|
||||||
|
@ -345,6 +348,9 @@ Kubernetes Ingress label selector to use.
|
||||||
`TRAEFIK_PROVIDERS_KUBERNETESINGRESS_NAMESPACES`:
|
`TRAEFIK_PROVIDERS_KUBERNETESINGRESS_NAMESPACES`:
|
||||||
Kubernetes namespaces.
|
Kubernetes namespaces.
|
||||||
|
|
||||||
|
`TRAEFIK_PROVIDERS_KUBERNETESINGRESS_THROTTLEDURATION`:
|
||||||
|
Ingress refresh throttle duration (Default: ```0```)
|
||||||
|
|
||||||
`TRAEFIK_PROVIDERS_KUBERNETESINGRESS_TOKEN`:
|
`TRAEFIK_PROVIDERS_KUBERNETESINGRESS_TOKEN`:
|
||||||
Kubernetes bearer token (not needed for in-cluster client).
|
Kubernetes bearer token (not needed for in-cluster client).
|
||||||
|
|
||||||
|
@ -445,7 +451,7 @@ Watch provider. (Default: ```true```)
|
||||||
Enable Rest backend with default settings. (Default: ```false```)
|
Enable Rest backend with default settings. (Default: ```false```)
|
||||||
|
|
||||||
`TRAEFIK_PROVIDERS_REST_INSECURE`:
|
`TRAEFIK_PROVIDERS_REST_INSECURE`:
|
||||||
Activate REST Provider on an insecure entryPoints named traefik. (Default: ```false```)
|
Activate REST Provider directly on the entryPoint named traefik. (Default: ```false```)
|
||||||
|
|
||||||
`TRAEFIK_SERVERSTRANSPORT_FORWARDINGTIMEOUTS_DIALTIMEOUT`:
|
`TRAEFIK_SERVERSTRANSPORT_FORWARDINGTIMEOUTS_DIALTIMEOUT`:
|
||||||
The amount of time to wait until a connection to a backend server can be established. If zero, no timeout exists. (Default: ```30```)
|
The amount of time to wait until a connection to a backend server can be established. If zero, no timeout exists. (Default: ```30```)
|
||||||
|
|
|
@ -83,6 +83,7 @@
|
||||||
namespaces = ["foobar", "foobar"]
|
namespaces = ["foobar", "foobar"]
|
||||||
labelSelector = "foobar"
|
labelSelector = "foobar"
|
||||||
ingressClass = "foobar"
|
ingressClass = "foobar"
|
||||||
|
throttleDuration = "10s"
|
||||||
[providers.kubernetesIngress.ingressEndpoint]
|
[providers.kubernetesIngress.ingressEndpoint]
|
||||||
ip = "foobar"
|
ip = "foobar"
|
||||||
hostname = "foobar"
|
hostname = "foobar"
|
||||||
|
@ -95,6 +96,7 @@
|
||||||
namespaces = ["foobar", "foobar"]
|
namespaces = ["foobar", "foobar"]
|
||||||
labelSelector = "foobar"
|
labelSelector = "foobar"
|
||||||
ingressClass = "foobar"
|
ingressClass = "foobar"
|
||||||
|
throttleDuration = "10s"
|
||||||
[providers.rest]
|
[providers.rest]
|
||||||
insecure = true
|
insecure = true
|
||||||
[providers.rancher]
|
[providers.rancher]
|
||||||
|
|
|
@ -88,6 +88,7 @@ providers:
|
||||||
- foobar
|
- foobar
|
||||||
labelSelector: foobar
|
labelSelector: foobar
|
||||||
ingressClass: foobar
|
ingressClass: foobar
|
||||||
|
throttleDuration: 10s
|
||||||
ingressEndpoint:
|
ingressEndpoint:
|
||||||
ip: foobar
|
ip: foobar
|
||||||
hostname: foobar
|
hostname: foobar
|
||||||
|
@ -102,6 +103,7 @@ providers:
|
||||||
- foobar
|
- foobar
|
||||||
labelSelector: foobar
|
labelSelector: foobar
|
||||||
ingressClass: foobar
|
ingressClass: foobar
|
||||||
|
throttleDuration: 10s
|
||||||
rest:
|
rest:
|
||||||
insecure: true
|
insecure: true
|
||||||
rancher:
|
rancher:
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"github.com/containous/traefik/v2/pkg/provider/kubernetes/crd/traefik/v1alpha1"
|
"github.com/containous/traefik/v2/pkg/provider/kubernetes/crd/traefik/v1alpha1"
|
||||||
"github.com/containous/traefik/v2/pkg/safe"
|
"github.com/containous/traefik/v2/pkg/safe"
|
||||||
"github.com/containous/traefik/v2/pkg/tls"
|
"github.com/containous/traefik/v2/pkg/tls"
|
||||||
|
"github.com/containous/traefik/v2/pkg/types"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
)
|
)
|
||||||
|
@ -38,6 +39,7 @@ type Provider struct {
|
||||||
Namespaces []string `description:"Kubernetes namespaces." json:"namespaces,omitempty" toml:"namespaces,omitempty" yaml:"namespaces,omitempty" export:"true"`
|
Namespaces []string `description:"Kubernetes namespaces." json:"namespaces,omitempty" toml:"namespaces,omitempty" yaml:"namespaces,omitempty" export:"true"`
|
||||||
LabelSelector string `description:"Kubernetes label selector to use." json:"labelSelector,omitempty" toml:"labelSelector,omitempty" yaml:"labelSelector,omitempty" export:"true"`
|
LabelSelector string `description:"Kubernetes label selector to use." json:"labelSelector,omitempty" toml:"labelSelector,omitempty" yaml:"labelSelector,omitempty" export:"true"`
|
||||||
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for." json:"ingressClass,omitempty" toml:"ingressClass,omitempty" yaml:"ingressClass,omitempty" export:"true"`
|
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for." json:"ingressClass,omitempty" toml:"ingressClass,omitempty" yaml:"ingressClass,omitempty" export:"true"`
|
||||||
|
ThrottleDuration types.Duration `description:"Ingress refresh throttle duration" json:"throttleDuration,omitempty" toml:"throttleDuration,omitempty" yaml:"throttleDuration,omitempty"`
|
||||||
lastConfiguration safe.Safe
|
lastConfiguration safe.Safe
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,6 +97,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
|
||||||
stopWatch := make(chan struct{}, 1)
|
stopWatch := make(chan struct{}, 1)
|
||||||
defer close(stopWatch)
|
defer close(stopWatch)
|
||||||
eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch)
|
eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("Error watching kubernetes events: %v", err)
|
logger.Errorf("Error watching kubernetes events: %v", err)
|
||||||
timer := time.NewTimer(1 * time.Second)
|
timer := time.NewTimer(1 * time.Second)
|
||||||
|
@ -105,11 +108,20 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
throttleDuration := time.Duration(p.ThrottleDuration)
|
||||||
|
eventsChanToRead := throttleEvents(ctxLog, throttleDuration, stop, eventsChan)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
return nil
|
return nil
|
||||||
case event := <-eventsChan:
|
case event := <-eventsChanToRead:
|
||||||
|
// Note that event is the *first* event that came in during this
|
||||||
|
// throttling interval -- if we're hitting our throttle, we may have
|
||||||
|
// dropped events. This is fine, because we don't treat different
|
||||||
|
// event types differently. But if we do in the future, we'll need to
|
||||||
|
// track more information about the dropped events.
|
||||||
conf := p.loadConfigurationFromCRD(ctxLog, k8sClient)
|
conf := p.loadConfigurationFromCRD(ctxLog, k8sClient)
|
||||||
|
|
||||||
if reflect.DeepEqual(p.lastConfiguration.Get(), conf) {
|
if reflect.DeepEqual(p.lastConfiguration.Get(), conf) {
|
||||||
|
@ -121,6 +133,11 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
|
||||||
Configuration: conf,
|
Configuration: conf,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we're throttling, we sleep here for the throttle duration to
|
||||||
|
// enforce that we don't refresh faster than our throttle. time.Sleep
|
||||||
|
// returns immediately if p.ThrottleDuration is 0 (no throttle).
|
||||||
|
time.Sleep(throttleDuration)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -587,3 +604,36 @@ func getCABlocks(secret *corev1.Secret, namespace, secretName string) (string, e
|
||||||
|
|
||||||
return cert, nil
|
return cert, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop chan bool, eventsChan <-chan interface{}) chan interface{} {
|
||||||
|
if throttleDuration == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling)
|
||||||
|
eventsChanBuffered := make(chan interface{}, 1)
|
||||||
|
|
||||||
|
// Run a goroutine that reads events from eventChan and does a
|
||||||
|
// non-blocking write to pendingEvent. This guarantees that writing to
|
||||||
|
// eventChan will never block, and that pendingEvent will have
|
||||||
|
// something in it if there's been an event since we read from that channel.
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stop:
|
||||||
|
return
|
||||||
|
case nextEvent := <-eventsChan:
|
||||||
|
select {
|
||||||
|
case eventsChanBuffered <- nextEvent:
|
||||||
|
default:
|
||||||
|
// We already have an event in eventsChanBuffered, so we'll
|
||||||
|
// do a refresh as soon as our throttle allows us to. It's fine
|
||||||
|
// to drop the event and keep whatever's in the buffer -- we
|
||||||
|
// don't do different things for different events
|
||||||
|
log.FromContext(ctx).Debugf("Dropping event kind %T due to throttling", nextEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return eventsChanBuffered
|
||||||
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/containous/traefik/v2/pkg/log"
|
"github.com/containous/traefik/v2/pkg/log"
|
||||||
"github.com/containous/traefik/v2/pkg/safe"
|
"github.com/containous/traefik/v2/pkg/safe"
|
||||||
"github.com/containous/traefik/v2/pkg/tls"
|
"github.com/containous/traefik/v2/pkg/tls"
|
||||||
|
"github.com/containous/traefik/v2/pkg/types"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/api/extensions/v1beta1"
|
"k8s.io/api/extensions/v1beta1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
@ -39,6 +40,7 @@ type Provider struct {
|
||||||
LabelSelector string `description:"Kubernetes Ingress label selector to use." json:"labelSelector,omitempty" toml:"labelSelector,omitempty" yaml:"labelSelector,omitempty" export:"true"`
|
LabelSelector string `description:"Kubernetes Ingress label selector to use." json:"labelSelector,omitempty" toml:"labelSelector,omitempty" yaml:"labelSelector,omitempty" export:"true"`
|
||||||
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for." json:"ingressClass,omitempty" toml:"ingressClass,omitempty" yaml:"ingressClass,omitempty" export:"true"`
|
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for." json:"ingressClass,omitempty" toml:"ingressClass,omitempty" yaml:"ingressClass,omitempty" export:"true"`
|
||||||
IngressEndpoint *EndpointIngress `description:"Kubernetes Ingress Endpoint." json:"ingressEndpoint,omitempty" toml:"ingressEndpoint,omitempty" yaml:"ingressEndpoint,omitempty"`
|
IngressEndpoint *EndpointIngress `description:"Kubernetes Ingress Endpoint." json:"ingressEndpoint,omitempty" toml:"ingressEndpoint,omitempty" yaml:"ingressEndpoint,omitempty"`
|
||||||
|
ThrottleDuration types.Duration `description:"Ingress refresh throttle duration" json:"throttleDuration,omitempty" toml:"throttleDuration,omitempty" yaml:"throttleDuration,omitempty"`
|
||||||
lastConfiguration safe.Safe
|
lastConfiguration safe.Safe
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,11 +120,19 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
throttleDuration := time.Duration(p.ThrottleDuration)
|
||||||
|
eventsChanToRead := throttleEvents(ctxLog, throttleDuration, stop, eventsChan)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
return nil
|
return nil
|
||||||
case event := <-eventsChan:
|
case event := <-eventsChanToRead:
|
||||||
|
// Note that event is the *first* event that came in during this
|
||||||
|
// throttling interval -- if we're hitting our throttle, we may have
|
||||||
|
// dropped events. This is fine, because we don't treat different
|
||||||
|
// event types differently. But if we do in the future, we'll need to
|
||||||
|
// track more information about the dropped events.
|
||||||
conf := p.loadConfigurationFromIngresses(ctxLog, k8sClient)
|
conf := p.loadConfigurationFromIngresses(ctxLog, k8sClient)
|
||||||
|
|
||||||
if reflect.DeepEqual(p.lastConfiguration.Get(), conf) {
|
if reflect.DeepEqual(p.lastConfiguration.Get(), conf) {
|
||||||
|
@ -134,6 +144,11 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
|
||||||
Configuration: conf,
|
Configuration: conf,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we're throttling, we sleep here for the throttle duration to
|
||||||
|
// enforce that we don't refresh faster than our throttle. time.Sleep
|
||||||
|
// returns immediately if p.ThrottleDuration is 0 (no throttle).
|
||||||
|
time.Sleep(throttleDuration)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -478,3 +493,36 @@ func (p *Provider) updateIngressStatus(i *v1beta1.Ingress, k8sClient Client) err
|
||||||
|
|
||||||
return k8sClient.UpdateIngressStatus(i.Namespace, i.Name, service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname)
|
return k8sClient.UpdateIngressStatus(i.Namespace, i.Name, service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop chan bool, eventsChan <-chan interface{}) chan interface{} {
|
||||||
|
if throttleDuration == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling)
|
||||||
|
eventsChanBuffered := make(chan interface{}, 1)
|
||||||
|
|
||||||
|
// Run a goroutine that reads events from eventChan and does a
|
||||||
|
// non-blocking write to pendingEvent. This guarantees that writing to
|
||||||
|
// eventChan will never block, and that pendingEvent will have
|
||||||
|
// something in it if there's been an event since we read from that channel.
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stop:
|
||||||
|
return
|
||||||
|
case nextEvent := <-eventsChan:
|
||||||
|
select {
|
||||||
|
case eventsChanBuffered <- nextEvent:
|
||||||
|
default:
|
||||||
|
// We already have an event in eventsChanBuffered, so we'll
|
||||||
|
// do a refresh as soon as our throttle allows us to. It's fine
|
||||||
|
// to drop the event and keep whatever's in the buffer -- we
|
||||||
|
// don't do different things for different events
|
||||||
|
log.FromContext(ctx).Debugf("Dropping event kind %T due to throttling", nextEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return eventsChanBuffered
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue