aboutsummaryrefslogtreecommitdiffstats
path: root/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go')
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go383
1 files changed, 383 insertions, 0 deletions
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go
new file mode 100644
index 0000000..87dcac6
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go
@@ -0,0 +1,383 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package framework
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "k8s.io/kubernetes/pkg/client/cache"
+ "k8s.io/kubernetes/pkg/runtime"
+ utilruntime "k8s.io/kubernetes/pkg/util/runtime"
+)
+
+// if you use this, there is one behavior change compared to a standard Informer.
+// When you receive a notification, the cache will be AT LEAST as fresh as the
+// notification, but it MAY be more fresh. You should NOT depend on the contents
+// of the cache exactly matching the notification you've received in handler
+// functions. If there was a create, followed by a delete, the cache may NOT
+// have your item. This has advantages over the broadcaster since it allows us
+// to share a common cache across many controllers. Extending the broadcaster
+// would have required us keep duplicate caches for each watch.
+type SharedInformer interface {
+ // events to a single handler are delivered sequentially, but there is no coordination between different handlers
+ // You may NOT add a handler *after* the SharedInformer is running. That will result in an error being returned.
+ // TODO we should try to remove this restriction eventually.
+ AddEventHandler(handler ResourceEventHandler) error
+ GetStore() cache.Store
+ // GetController gives back a synthetic interface that "votes" to start the informer
+ GetController() ControllerInterface
+ Run(stopCh <-chan struct{})
+ HasSynced() bool
+ LastSyncResourceVersion() string
+}
+
+type SharedIndexInformer interface {
+ SharedInformer
+ // AddIndexers add indexers to the informer before it starts.
+ AddIndexers(indexers cache.Indexers) error
+ GetIndexer() cache.Indexer
+}
+
+// NewSharedInformer creates a new instance for the listwatcher.
+// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
+// be shared amongst all consumers.
+func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
+ return NewSharedIndexInformer(lw, objType, resyncPeriod, cache.Indexers{})
+}
+
+// NewSharedIndexInformer creates a new instance for the listwatcher.
+// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
+// be shared amongst all consumers.
+func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers cache.Indexers) SharedIndexInformer {
+ sharedIndexInformer := &sharedIndexInformer{
+ processor: &sharedProcessor{},
+ indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
+ listerWatcher: lw,
+ objectType: objType,
+ fullResyncPeriod: resyncPeriod,
+ }
+ return sharedIndexInformer
+}
+
+type sharedIndexInformer struct {
+ indexer cache.Indexer
+ controller *Controller
+
+ processor *sharedProcessor
+
+ // This block is tracked to handle late initialization of the controller
+ listerWatcher cache.ListerWatcher
+ objectType runtime.Object
+ fullResyncPeriod time.Duration
+
+ started bool
+ startedLock sync.Mutex
+
+ // blockDeltas gives a way to stop all event distribution so that a late event handler
+ // can safely join the shared informer.
+ blockDeltas sync.Mutex
+ // stopCh is the channel used to stop the main Run process. We have to track it so that
+ // late joiners can have a proper stop
+ stopCh <-chan struct{}
+}
+
+// dummyController hides the fact that a SharedInformer is different from a dedicated one
+// where a caller can `Run`. The run method is disonnected in this case, because higher
+// level logic will decide when to start the SharedInformer and related controller.
+// Because returning information back is always asynchronous, the legacy callers shouldn't
+// notice any change in behavior.
+type dummyController struct {
+ informer *sharedIndexInformer
+}
+
+func (v *dummyController) Run(stopCh <-chan struct{}) {
+}
+
+func (v *dummyController) HasSynced() bool {
+ return v.informer.HasSynced()
+}
+
+type updateNotification struct {
+ oldObj interface{}
+ newObj interface{}
+}
+
+type addNotification struct {
+ newObj interface{}
+}
+
+type deleteNotification struct {
+ oldObj interface{}
+}
+
+func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+
+ fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.indexer)
+
+ cfg := &Config{
+ Queue: fifo,
+ ListerWatcher: s.listerWatcher,
+ ObjectType: s.objectType,
+ FullResyncPeriod: s.fullResyncPeriod,
+ RetryOnError: false,
+
+ Process: s.HandleDeltas,
+ }
+
+ func() {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ s.controller = New(cfg)
+ s.started = true
+ }()
+
+ s.stopCh = stopCh
+ s.processor.run(stopCh)
+ s.controller.Run(stopCh)
+}
+
+func (s *sharedIndexInformer) isStarted() bool {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+ return s.started
+}
+
+func (s *sharedIndexInformer) HasSynced() bool {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if s.controller == nil {
+ return false
+ }
+ return s.controller.HasSynced()
+}
+
+func (s *sharedIndexInformer) LastSyncResourceVersion() string {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if s.controller == nil {
+ return ""
+ }
+ return s.controller.reflector.LastSyncResourceVersion()
+}
+
+func (s *sharedIndexInformer) GetStore() cache.Store {
+ return s.indexer
+}
+
+func (s *sharedIndexInformer) GetIndexer() cache.Indexer {
+ return s.indexer
+}
+
+func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if s.started {
+ return fmt.Errorf("informer has already started")
+ }
+
+ return s.indexer.AddIndexers(indexers)
+}
+
+func (s *sharedIndexInformer) GetController() ControllerInterface {
+ return &dummyController{informer: s}
+}
+
+func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if !s.started {
+ listener := newProcessListener(handler)
+ s.processor.listeners = append(s.processor.listeners, listener)
+ return nil
+ }
+
+ // in order to safely join, we have to
+ // 1. stop sending add/update/delete notifications
+ // 2. do a list against the store
+ // 3. send synthetic "Add" events to the new handler
+ // 4. unblock
+ s.blockDeltas.Lock()
+ defer s.blockDeltas.Unlock()
+
+ listener := newProcessListener(handler)
+ s.processor.listeners = append(s.processor.listeners, listener)
+
+ go listener.run(s.stopCh)
+ go listener.pop(s.stopCh)
+
+ items := s.indexer.List()
+ for i := range items {
+ listener.add(addNotification{newObj: items[i]})
+ }
+
+ return nil
+}
+
+func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
+ s.blockDeltas.Lock()
+ defer s.blockDeltas.Unlock()
+
+ // from oldest to newest
+ for _, d := range obj.(cache.Deltas) {
+ switch d.Type {
+ case cache.Sync, cache.Added, cache.Updated:
+ if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
+ if err := s.indexer.Update(d.Object); err != nil {
+ return err
+ }
+ s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object})
+ } else {
+ if err := s.indexer.Add(d.Object); err != nil {
+ return err
+ }
+ s.processor.distribute(addNotification{newObj: d.Object})
+ }
+ case cache.Deleted:
+ if err := s.indexer.Delete(d.Object); err != nil {
+ return err
+ }
+ s.processor.distribute(deleteNotification{oldObj: d.Object})
+ }
+ }
+ return nil
+}
+
+type sharedProcessor struct {
+ listeners []*processorListener
+}
+
+func (p *sharedProcessor) distribute(obj interface{}) {
+ for _, listener := range p.listeners {
+ listener.add(obj)
+ }
+}
+
+func (p *sharedProcessor) run(stopCh <-chan struct{}) {
+ for _, listener := range p.listeners {
+ go listener.run(stopCh)
+ go listener.pop(stopCh)
+ }
+}
+
+type processorListener struct {
+ // lock/cond protects access to 'pendingNotifications'.
+ lock sync.RWMutex
+ cond sync.Cond
+
+ // pendingNotifications is an unbounded slice that holds all notifications not yet distributed
+ // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
+ // added until we OOM.
+ // TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
+ // we should try to do something better
+ pendingNotifications []interface{}
+
+ nextCh chan interface{}
+
+ handler ResourceEventHandler
+}
+
+func newProcessListener(handler ResourceEventHandler) *processorListener {
+ ret := &processorListener{
+ pendingNotifications: []interface{}{},
+ nextCh: make(chan interface{}),
+ handler: handler,
+ }
+
+ ret.cond.L = &ret.lock
+ return ret
+}
+
+func (p *processorListener) add(notification interface{}) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ p.pendingNotifications = append(p.pendingNotifications, notification)
+ p.cond.Broadcast()
+}
+
+func (p *processorListener) pop(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+
+ for {
+ blockingGet := func() (interface{}, bool) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ for len(p.pendingNotifications) == 0 {
+ // check if we're shutdown
+ select {
+ case <-stopCh:
+ return nil, true
+ default:
+ }
+ p.cond.Wait()
+ }
+
+ nt := p.pendingNotifications[0]
+ p.pendingNotifications = p.pendingNotifications[1:]
+ return nt, false
+ }
+
+ notification, stopped := blockingGet()
+ if stopped {
+ return
+ }
+
+ select {
+ case <-stopCh:
+ return
+ case p.nextCh <- notification:
+ }
+ }
+}
+
+func (p *processorListener) run(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+
+ for {
+ var next interface{}
+ select {
+ case <-stopCh:
+ func() {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ p.cond.Broadcast()
+ }()
+ return
+ case next = <-p.nextCh:
+ }
+
+ switch notification := next.(type) {
+ case updateNotification:
+ p.handler.OnUpdate(notification.oldObj, notification.newObj)
+ case addNotification:
+ p.handler.OnAdd(notification.newObj)
+ case deleteNotification:
+ p.handler.OnDelete(notification.oldObj)
+ default:
+ utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
+ }
+ }
+}