2017-02-07 22:33:23 +01:00
/ *
Copyright 2015 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package cache
import (
"fmt"
"sync"
"time"
2018-02-14 16:56:04 +08:00
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
2019-03-14 15:56:06 +01:00
"k8s.io/client-go/util/retry"
2019-08-05 18:24:03 +02:00
"k8s.io/utils/buffer"
2017-02-07 22:33:23 +01:00
2019-08-05 18:24:03 +02:00
"k8s.io/klog"
2017-02-07 22:33:23 +01:00
)
2019-08-05 18:24:03 +02:00
// SharedInformer provides eventually consistent linkage of its
// clients to the authoritative state of a given collection of
// objects. An object is identified by its API group, kind/resource,
// namespace, and name. One SharedInfomer provides linkage to objects
// of a particular API group and kind/resource. The linked object
// collection of a SharedInformer may be further restricted to one
// namespace and/or by label selector and/or field selector.
//
// The authoritative state of an object is what apiservers provide
// access to, and an object goes through a strict sequence of states.
// A state is either "absent" or present with a ResourceVersion and
// other appropriate content.
//
// A SharedInformer maintains a local cache, exposed by Store(), of
// the state of each relevant object. This cache is eventually
// consistent with the authoritative state. This means that, unless
// prevented by persistent communication problems, if ever a
// particular object ID X is authoritatively associated with a state S
// then for every SharedInformer I whose collection includes (X, S)
// eventually either (1) I's cache associates X with S or a later
// state of X, (2) I is stopped, or (3) the authoritative state
// service for X terminates. To be formally complete, we say that the
// absent state meets any restriction by label selector or field
// selector.
//
// As a simple example, if a collection of objects is henceforeth
// unchanging and a SharedInformer is created that links to that
// collection then that SharedInformer's cache eventually holds an
// exact copy of that collection (unless it is stopped too soon, the
// authoritative state service ends, or communication problems between
// the two persistently thwart achievement).
//
// As another simple example, if the local cache ever holds a
// non-absent state for some object ID and the object is eventually
// removed from the authoritative state then eventually the object is
// removed from the local cache (unless the SharedInformer is stopped
// too soon, the authoritative state service emnds, or communication
// problems persistently thwart the desired result).
//
// The keys in Store() are of the form namespace/name for namespaced
// objects, and are simply the name for non-namespaced objects.
//
// A client is identified here by a ResourceEventHandler. For every
// update to the SharedInformer's local cache and for every client,
// eventually either the SharedInformer is stopped or the client is
// notified of the update. These notifications happen after the
// corresponding cache update and, in the case of a
// SharedIndexInformer, after the corresponding index updates. It is
// possible that additional cache and index updates happen before such
// a prescribed notification. For a given SharedInformer and client,
// all notifications are delivered sequentially. For a given
// SharedInformer, client, and object ID, the notifications are
// delivered in order.
//
// A delete notification exposes the last locally known non-absent
// state, except that its ResourceVersion is replaced with a
// ResourceVersion in which the object is actually absent.
2017-02-07 22:33:23 +01:00
type SharedInformer interface {
2018-02-14 16:56:04 +08:00
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler ( handler ResourceEventHandler )
2019-08-05 18:24:03 +02:00
// AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer using the specified resync period. The resync
// operation consists of delivering to the handler a create
// notification for every object in the informer's local cache; it
// does not add any interactions with the authoritative storage.
2018-02-14 16:56:04 +08:00
AddEventHandlerWithResyncPeriod ( handler ResourceEventHandler , resyncPeriod time . Duration )
2019-08-05 18:24:03 +02:00
// GetStore returns the informer's local cache as a Store.
2017-02-07 22:33:23 +01:00
GetStore ( ) Store
// GetController gives back a synthetic interface that "votes" to start the informer
2018-02-14 16:56:04 +08:00
GetController ( ) Controller
2019-08-05 18:24:03 +02:00
// Run starts and runs the shared informer, returning after it stops.
// The informer will be stopped when stopCh is closed.
2017-02-07 22:33:23 +01:00
Run ( stopCh <- chan struct { } )
2019-08-05 18:24:03 +02:00
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
2017-02-07 22:33:23 +01:00
HasSynced ( ) bool
2018-02-14 16:56:04 +08:00
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
2017-02-07 22:33:23 +01:00
LastSyncResourceVersion ( ) string
}
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers ( indexers Indexers ) error
GetIndexer ( ) Indexer
}
// NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer ( lw ListerWatcher , objType runtime . Object , resyncPeriod time . Duration ) SharedInformer {
return NewSharedIndexInformer ( lw , objType , resyncPeriod , Indexers { } )
}
// NewSharedIndexInformer creates a new instance for the listwatcher.
2018-02-14 16:56:04 +08:00
func NewSharedIndexInformer ( lw ListerWatcher , objType runtime . Object , defaultEventHandlerResyncPeriod time . Duration , indexers Indexers ) SharedIndexInformer {
realClock := & clock . RealClock { }
2017-02-07 22:33:23 +01:00
sharedIndexInformer := & sharedIndexInformer {
2018-02-14 16:56:04 +08:00
processor : & sharedProcessor { clock : realClock } ,
indexer : NewIndexer ( DeletionHandlingMetaNamespaceKeyFunc , indexers ) ,
listerWatcher : lw ,
objectType : objType ,
resyncCheckPeriod : defaultEventHandlerResyncPeriod ,
defaultEventHandlerResyncPeriod : defaultEventHandlerResyncPeriod ,
cacheMutationDetector : NewCacheMutationDetector ( fmt . Sprintf ( "%T" , objType ) ) ,
2019-08-05 18:24:03 +02:00
clock : realClock ,
2017-02-07 22:33:23 +01:00
}
return sharedIndexInformer
}
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type InformerSynced func ( ) bool
2018-02-14 16:56:04 +08:00
const (
// syncedPollPeriod controls how often you look at the status of your sync funcs
syncedPollPeriod = 100 * time . Millisecond
// initialBufferSize is the initial number of event notifications that can be buffered.
initialBufferSize = 1024
)
2017-02-07 22:33:23 +01:00
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
2018-02-14 16:56:04 +08:00
// if the controller should shutdown
2017-02-07 22:33:23 +01:00
func WaitForCacheSync ( stopCh <- chan struct { } , cacheSyncs ... InformerSynced ) bool {
err := wait . PollUntil ( syncedPollPeriod ,
func ( ) ( bool , error ) {
for _ , syncFunc := range cacheSyncs {
if ! syncFunc ( ) {
return false , nil
}
}
return true , nil
} ,
stopCh )
if err != nil {
2019-08-05 18:24:03 +02:00
klog . V ( 2 ) . Infof ( "stop requested" )
2017-02-07 22:33:23 +01:00
return false
}
2019-08-05 18:24:03 +02:00
klog . V ( 4 ) . Infof ( "caches populated" )
2017-02-07 22:33:23 +01:00
return true
}
type sharedIndexInformer struct {
indexer Indexer
2018-02-14 16:56:04 +08:00
controller Controller
2017-02-07 22:33:23 +01:00
2017-04-07 11:49:53 +01:00
processor * sharedProcessor
cacheMutationDetector CacheMutationDetector
2017-02-07 22:33:23 +01:00
// This block is tracked to handle late initialization of the controller
2018-02-14 16:56:04 +08:00
listerWatcher ListerWatcher
objectType runtime . Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time . Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time . Duration
// clock allows for testability
clock clock . Clock
started , stopped bool
startedLock sync . Mutex
2017-02-07 22:33:23 +01:00
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync . Mutex
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
2018-02-14 16:56:04 +08:00
// where a caller can `Run`. The run method is disconnected in this case, because higher
2017-02-07 22:33:23 +01:00
// level logic will decide when to start the SharedInformer and related controller.
// Because returning information back is always asynchronous, the legacy callers shouldn't
// notice any change in behavior.
type dummyController struct {
informer * sharedIndexInformer
}
func ( v * dummyController ) Run ( stopCh <- chan struct { } ) {
}
func ( v * dummyController ) HasSynced ( ) bool {
return v . informer . HasSynced ( )
}
2018-02-14 16:56:04 +08:00
func ( c * dummyController ) LastSyncResourceVersion ( ) string {
return ""
}
2017-02-07 22:33:23 +01:00
type updateNotification struct {
oldObj interface { }
newObj interface { }
}
type addNotification struct {
newObj interface { }
}
type deleteNotification struct {
oldObj interface { }
}
func ( s * sharedIndexInformer ) Run ( stopCh <- chan struct { } ) {
defer utilruntime . HandleCrash ( )
2019-03-14 15:56:06 +01:00
fifo := NewDeltaFIFO ( MetaNamespaceKeyFunc , s . indexer )
2017-02-07 22:33:23 +01:00
cfg := & Config {
Queue : fifo ,
ListerWatcher : s . listerWatcher ,
ObjectType : s . objectType ,
2018-02-14 16:56:04 +08:00
FullResyncPeriod : s . resyncCheckPeriod ,
2017-02-07 22:33:23 +01:00
RetryOnError : false ,
2018-02-14 16:56:04 +08:00
ShouldResync : s . processor . shouldResync ,
2017-02-07 22:33:23 +01:00
Process : s . HandleDeltas ,
}
func ( ) {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
s . controller = New ( cfg )
2018-02-14 16:56:04 +08:00
s . controller . ( * controller ) . clock = s . clock
2017-02-07 22:33:23 +01:00
s . started = true
} ( )
2018-02-14 16:56:04 +08:00
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make ( chan struct { } )
var wg wait . Group
defer wg . Wait ( ) // Wait for Processor to stop
defer close ( processorStopCh ) // Tell Processor to stop
wg . StartWithChannel ( processorStopCh , s . cacheMutationDetector . Run )
wg . StartWithChannel ( processorStopCh , s . processor . run )
2017-02-07 22:33:23 +01:00
2018-02-14 16:56:04 +08:00
defer func ( ) {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
s . stopped = true // Don't want any new listeners
} ( )
s . controller . Run ( stopCh )
2017-02-07 22:33:23 +01:00
}
func ( s * sharedIndexInformer ) HasSynced ( ) bool {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . controller == nil {
return false
}
return s . controller . HasSynced ( )
}
func ( s * sharedIndexInformer ) LastSyncResourceVersion ( ) string {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
2018-02-14 16:56:04 +08:00
if s . controller == nil {
2017-02-07 22:33:23 +01:00
return ""
}
2018-02-14 16:56:04 +08:00
return s . controller . LastSyncResourceVersion ( )
2017-02-07 22:33:23 +01:00
}
func ( s * sharedIndexInformer ) GetStore ( ) Store {
return s . indexer
}
func ( s * sharedIndexInformer ) GetIndexer ( ) Indexer {
return s . indexer
}
func ( s * sharedIndexInformer ) AddIndexers ( indexers Indexers ) error {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . started {
return fmt . Errorf ( "informer has already started" )
}
return s . indexer . AddIndexers ( indexers )
}
2018-02-14 16:56:04 +08:00
func ( s * sharedIndexInformer ) GetController ( ) Controller {
2017-02-07 22:33:23 +01:00
return & dummyController { informer : s }
}
2018-02-14 16:56:04 +08:00
func ( s * sharedIndexInformer ) AddEventHandler ( handler ResourceEventHandler ) {
s . AddEventHandlerWithResyncPeriod ( handler , s . defaultEventHandlerResyncPeriod )
}
func determineResyncPeriod ( desired , check time . Duration ) time . Duration {
if desired == 0 {
return desired
}
if check == 0 {
2019-08-05 18:24:03 +02:00
klog . Warningf ( "The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing" , desired )
2018-02-14 16:56:04 +08:00
return 0
}
if desired < check {
2019-08-05 18:24:03 +02:00
klog . Warningf ( "The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v" , desired , check )
2018-02-14 16:56:04 +08:00
return check
}
return desired
}
const minimumResyncPeriod = 1 * time . Second
func ( s * sharedIndexInformer ) AddEventHandlerWithResyncPeriod ( handler ResourceEventHandler , resyncPeriod time . Duration ) {
2017-02-07 22:33:23 +01:00
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
2018-02-14 16:56:04 +08:00
if s . stopped {
2019-08-05 18:24:03 +02:00
klog . V ( 2 ) . Infof ( "Handler %v was not added to shared informer because it has stopped already" , handler )
2018-02-14 16:56:04 +08:00
return
}
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
2019-08-05 18:24:03 +02:00
klog . Warningf ( "resyncPeriod %d is too small. Changing it to the minimum allowed value of %d" , resyncPeriod , minimumResyncPeriod )
2018-02-14 16:56:04 +08:00
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s . resyncCheckPeriod {
if s . started {
2019-08-05 18:24:03 +02:00
klog . Warningf ( "resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d" , resyncPeriod , s . resyncCheckPeriod , s . resyncCheckPeriod )
2018-02-14 16:56:04 +08:00
resyncPeriod = s . resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s . resyncCheckPeriod = resyncPeriod
s . processor . resyncCheckPeriodChanged ( resyncPeriod )
}
}
}
listener := newProcessListener ( handler , resyncPeriod , determineResyncPeriod ( resyncPeriod , s . resyncCheckPeriod ) , s . clock . Now ( ) , initialBufferSize )
2017-02-07 22:33:23 +01:00
if ! s . started {
2018-02-14 16:56:04 +08:00
s . processor . addListener ( listener )
return
2017-02-07 22:33:23 +01:00
}
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s . blockDeltas . Lock ( )
defer s . blockDeltas . Unlock ( )
2019-03-14 15:56:06 +01:00
s . processor . addListener ( listener )
2018-02-14 16:56:04 +08:00
for _ , item := range s . indexer . List ( ) {
listener . add ( addNotification { newObj : item } )
2017-02-07 22:33:23 +01:00
}
}
func ( s * sharedIndexInformer ) HandleDeltas ( obj interface { } ) error {
s . blockDeltas . Lock ( )
defer s . blockDeltas . Unlock ( )
// from oldest to newest
for _ , d := range obj . ( Deltas ) {
switch d . Type {
case Sync , Added , Updated :
2018-02-14 16:56:04 +08:00
isSync := d . Type == Sync
2017-04-07 11:49:53 +01:00
s . cacheMutationDetector . AddObject ( d . Object )
2017-02-07 22:33:23 +01:00
if old , exists , err := s . indexer . Get ( d . Object ) ; err == nil && exists {
if err := s . indexer . Update ( d . Object ) ; err != nil {
return err
}
2018-02-14 16:56:04 +08:00
s . processor . distribute ( updateNotification { oldObj : old , newObj : d . Object } , isSync )
2017-02-07 22:33:23 +01:00
} else {
if err := s . indexer . Add ( d . Object ) ; err != nil {
return err
}
2018-02-14 16:56:04 +08:00
s . processor . distribute ( addNotification { newObj : d . Object } , isSync )
2017-02-07 22:33:23 +01:00
}
case Deleted :
if err := s . indexer . Delete ( d . Object ) ; err != nil {
return err
}
2018-02-14 16:56:04 +08:00
s . processor . distribute ( deleteNotification { oldObj : d . Object } , false )
2017-02-07 22:33:23 +01:00
}
}
return nil
}
type sharedProcessor struct {
2019-03-14 15:56:06 +01:00
listenersStarted bool
2018-02-14 16:56:04 +08:00
listenersLock sync . RWMutex
listeners [ ] * processorListener
syncingListeners [ ] * processorListener
clock clock . Clock
wg wait . Group
2017-02-07 22:33:23 +01:00
}
2018-02-14 16:56:04 +08:00
func ( p * sharedProcessor ) addListener ( listener * processorListener ) {
p . listenersLock . Lock ( )
defer p . listenersLock . Unlock ( )
p . addListenerLocked ( listener )
2019-03-14 15:56:06 +01:00
if p . listenersStarted {
p . wg . Start ( listener . run )
p . wg . Start ( listener . pop )
}
2018-02-14 16:56:04 +08:00
}
func ( p * sharedProcessor ) addListenerLocked ( listener * processorListener ) {
p . listeners = append ( p . listeners , listener )
p . syncingListeners = append ( p . syncingListeners , listener )
}
func ( p * sharedProcessor ) distribute ( obj interface { } , sync bool ) {
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
if sync {
for _ , listener := range p . syncingListeners {
listener . add ( obj )
}
} else {
for _ , listener := range p . listeners {
listener . add ( obj )
}
2017-02-07 22:33:23 +01:00
}
}
func ( p * sharedProcessor ) run ( stopCh <- chan struct { } ) {
2018-02-14 16:56:04 +08:00
func ( ) {
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
for _ , listener := range p . listeners {
p . wg . Start ( listener . run )
p . wg . Start ( listener . pop )
}
2019-03-14 15:56:06 +01:00
p . listenersStarted = true
2018-02-14 16:56:04 +08:00
} ( )
<- stopCh
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
2017-02-07 22:33:23 +01:00
for _ , listener := range p . listeners {
2018-02-14 16:56:04 +08:00
close ( listener . addCh ) // Tell .pop() to stop. .pop() will tell .run() to stop
2017-02-07 22:33:23 +01:00
}
2018-02-14 16:56:04 +08:00
p . wg . Wait ( ) // Wait for all .pop() and .run() to stop
2017-02-07 22:33:23 +01:00
}
2018-02-14 16:56:04 +08:00
// shouldResync queries every listener to determine if any of them need a resync, based on each
// listener's resyncPeriod.
func ( p * sharedProcessor ) shouldResync ( ) bool {
p . listenersLock . Lock ( )
defer p . listenersLock . Unlock ( )
2017-02-07 22:33:23 +01:00
2018-02-14 16:56:04 +08:00
p . syncingListeners = [ ] * processorListener { }
resyncNeeded := false
now := p . clock . Now ( )
for _ , listener := range p . listeners {
// need to loop through all the listeners to see if they need to resync so we can prepare any
// listeners that are going to be resyncing.
if listener . shouldResync ( now ) {
resyncNeeded = true
p . syncingListeners = append ( p . syncingListeners , listener )
listener . determineNextResync ( now )
}
}
return resyncNeeded
}
func ( p * sharedProcessor ) resyncCheckPeriodChanged ( resyncCheckPeriod time . Duration ) {
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
2017-02-07 22:33:23 +01:00
2018-02-14 16:56:04 +08:00
for _ , listener := range p . listeners {
resyncPeriod := determineResyncPeriod ( listener . requestedResyncPeriod , resyncCheckPeriod )
listener . setResyncPeriod ( resyncPeriod )
}
}
type processorListener struct {
2017-02-07 22:33:23 +01:00
nextCh chan interface { }
2018-02-14 16:56:04 +08:00
addCh chan interface { }
2017-02-07 22:33:23 +01:00
handler ResourceEventHandler
2018-02-14 16:56:04 +08:00
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer . RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time . Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
resyncPeriod time . Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time . Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync . Mutex
2017-02-07 22:33:23 +01:00
}
2018-02-14 16:56:04 +08:00
func newProcessListener ( handler ResourceEventHandler , requestedResyncPeriod , resyncPeriod time . Duration , now time . Time , bufferSize int ) * processorListener {
2017-02-07 22:33:23 +01:00
ret := & processorListener {
2018-02-14 16:56:04 +08:00
nextCh : make ( chan interface { } ) ,
addCh : make ( chan interface { } ) ,
handler : handler ,
pendingNotifications : * buffer . NewRingGrowing ( bufferSize ) ,
requestedResyncPeriod : requestedResyncPeriod ,
resyncPeriod : resyncPeriod ,
2017-02-07 22:33:23 +01:00
}
2018-02-14 16:56:04 +08:00
ret . determineNextResync ( now )
2017-02-07 22:33:23 +01:00
return ret
}
func ( p * processorListener ) add ( notification interface { } ) {
2018-02-14 16:56:04 +08:00
p . addCh <- notification
2017-02-07 22:33:23 +01:00
}
2018-02-14 16:56:04 +08:00
func ( p * processorListener ) pop ( ) {
2017-02-07 22:33:23 +01:00
defer utilruntime . HandleCrash ( )
2018-02-14 16:56:04 +08:00
defer close ( p . nextCh ) // Tell .run() to stop
2017-02-07 22:33:23 +01:00
2018-02-14 16:56:04 +08:00
var nextCh chan <- interface { }
var notification interface { }
2017-02-07 22:33:23 +01:00
for {
select {
2018-02-14 16:56:04 +08:00
case nextCh <- notification :
// Notification dispatched
var ok bool
notification , ok = p . pendingNotifications . ReadOne ( )
if ! ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd , ok := <- p . addCh :
if ! ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p . nextCh
} else { // There is already a notification waiting to be dispatched
p . pendingNotifications . WriteOne ( notificationToAdd )
}
2017-02-07 22:33:23 +01:00
}
}
}
2018-02-14 16:56:04 +08:00
func ( p * processorListener ) run ( ) {
2019-03-14 15:56:06 +01:00
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make ( chan struct { } )
wait . Until ( func ( ) {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait . ExponentialBackoff ( retry . DefaultRetry , func ( ) ( bool , error ) {
for next := range p . nextCh {
switch notification := next . ( type ) {
case updateNotification :
p . handler . OnUpdate ( notification . oldObj , notification . newObj )
case addNotification :
p . handler . OnAdd ( notification . newObj )
case deleteNotification :
p . handler . OnDelete ( notification . oldObj )
default :
2019-08-05 18:24:03 +02:00
utilruntime . HandleError ( fmt . Errorf ( "unrecognized notification: %T" , next ) )
2019-03-14 15:56:06 +01:00
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true , nil
} )
2017-02-07 22:33:23 +01:00
2019-03-14 15:56:06 +01:00
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close ( stopCh )
2017-02-07 22:33:23 +01:00
}
2019-03-14 15:56:06 +01:00
} , 1 * time . Minute , stopCh )
2017-02-07 22:33:23 +01:00
}
2018-02-14 16:56:04 +08:00
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
// this always returns false.
func ( p * processorListener ) shouldResync ( now time . Time ) bool {
p . resyncLock . Lock ( )
defer p . resyncLock . Unlock ( )
if p . resyncPeriod == 0 {
return false
}
return now . After ( p . nextResync ) || now . Equal ( p . nextResync )
}
func ( p * processorListener ) determineNextResync ( now time . Time ) {
p . resyncLock . Lock ( )
defer p . resyncLock . Unlock ( )
p . nextResync = now . Add ( p . resyncPeriod )
}
func ( p * processorListener ) setResyncPeriod ( resyncPeriod time . Duration ) {
p . resyncLock . Lock ( )
defer p . resyncLock . Unlock ( )
p . resyncPeriod = resyncPeriod
}