aboutsummaryrefslogtreecommitdiffstats
path: root/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller
diff options
context:
space:
mode:
Diffstat (limited to 'src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller')
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go326
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/doc.go18
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/fake_controller_source.go262
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go383
4 files changed, 989 insertions, 0 deletions
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go
new file mode 100644
index 0000000..8cbd124
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go
@@ -0,0 +1,326 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package framework
+
+import (
+ "sync"
+ "time"
+
+ "k8s.io/kubernetes/pkg/client/cache"
+ "k8s.io/kubernetes/pkg/runtime"
+ utilruntime "k8s.io/kubernetes/pkg/util/runtime"
+ "k8s.io/kubernetes/pkg/util/wait"
+)
+
+// Config contains all the settings for a Controller.
+type Config struct {
+ // The queue for your objects; either a cache.FIFO or
+ // a cache.DeltaFIFO. Your Process() function should accept
+ // the output of this Oueue's Pop() method.
+ cache.Queue
+
+ // Something that can list and watch your objects.
+ cache.ListerWatcher
+
+ // Something that can process your objects.
+ Process ProcessFunc
+
+ // The type of your objects.
+ ObjectType runtime.Object
+
+ // Reprocess everything at least this often.
+ // Note that if it takes longer for you to clear the queue than this
+ // period, you will end up processing items in the order determined
+ // by cache.FIFO.Replace(). Currently, this is random. If this is a
+ // problem, we can change that replacement policy to append new
+ // things to the end of the queue instead of replacing the entire
+ // queue.
+ FullResyncPeriod time.Duration
+
+ // If true, when Process() returns an error, re-enqueue the object.
+ // TODO: add interface to let you inject a delay/backoff or drop
+ // the object completely if desired. Pass the object in
+ // question to this interface as a parameter.
+ RetryOnError bool
+}
+
+// ProcessFunc processes a single object.
+type ProcessFunc func(obj interface{}) error
+
+// Controller is a generic controller framework.
+type Controller struct {
+ config Config
+ reflector *cache.Reflector
+ reflectorMutex sync.RWMutex
+}
+
+// TODO make the "Controller" private, and convert all references to use ControllerInterface instead
+type ControllerInterface interface {
+ Run(stopCh <-chan struct{})
+ HasSynced() bool
+}
+
+// New makes a new Controller from the given Config.
+func New(c *Config) *Controller {
+ ctlr := &Controller{
+ config: *c,
+ }
+ return ctlr
+}
+
+// Run begins processing items, and will continue until a value is sent down stopCh.
+// It's an error to call Run more than once.
+// Run blocks; call via go.
+func (c *Controller) Run(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+ r := cache.NewReflector(
+ c.config.ListerWatcher,
+ c.config.ObjectType,
+ c.config.Queue,
+ c.config.FullResyncPeriod,
+ )
+
+ c.reflectorMutex.Lock()
+ c.reflector = r
+ c.reflectorMutex.Unlock()
+
+ r.RunUntil(stopCh)
+
+ wait.Until(c.processLoop, time.Second, stopCh)
+}
+
+// Returns true once this controller has completed an initial resource listing
+func (c *Controller) HasSynced() bool {
+ return c.config.Queue.HasSynced()
+}
+
+// Requeue adds the provided object back into the queue if it does not already exist.
+func (c *Controller) Requeue(obj interface{}) error {
+ return c.config.Queue.AddIfNotPresent(cache.Deltas{
+ cache.Delta{
+ Type: cache.Sync,
+ Object: obj,
+ },
+ })
+}
+
+// processLoop drains the work queue.
+// TODO: Consider doing the processing in parallel. This will require a little thought
+// to make sure that we don't end up processing the same object multiple times
+// concurrently.
+func (c *Controller) processLoop() {
+ for {
+ obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
+ if err != nil {
+ if c.config.RetryOnError {
+ // This is the safe way to re-enqueue.
+ c.config.Queue.AddIfNotPresent(obj)
+ }
+ }
+ }
+}
+
+// ResourceEventHandler can handle notifications for events that happen to a
+// resource. The events are informational only, so you can't return an
+// error.
+// * OnAdd is called when an object is added.
+// * OnUpdate is called when an object is modified. Note that oldObj is the
+// last known state of the object-- it is possible that several changes
+// were combined together, so you can't use this to see every single
+// change. OnUpdate is also called when a re-list happens, and it will
+// get called even if nothing changed. This is useful for periodically
+// evaluating or syncing something.
+// * OnDelete will get the final state of the item if it is known, otherwise
+// it will get an object of type cache.DeletedFinalStateUnknown. This can
+// happen if the watch is closed and misses the delete event and we don't
+// notice the deletion until the subsequent re-list.
+type ResourceEventHandler interface {
+ OnAdd(obj interface{})
+ OnUpdate(oldObj, newObj interface{})
+ OnDelete(obj interface{})
+}
+
+// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
+// as few of the notification functions as you want while still implementing
+// ResourceEventHandler.
+type ResourceEventHandlerFuncs struct {
+ AddFunc func(obj interface{})
+ UpdateFunc func(oldObj, newObj interface{})
+ DeleteFunc func(obj interface{})
+}
+
+// OnAdd calls AddFunc if it's not nil.
+func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
+ if r.AddFunc != nil {
+ r.AddFunc(obj)
+ }
+}
+
+// OnUpdate calls UpdateFunc if it's not nil.
+func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
+ if r.UpdateFunc != nil {
+ r.UpdateFunc(oldObj, newObj)
+ }
+}
+
+// OnDelete calls DeleteFunc if it's not nil.
+func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
+ if r.DeleteFunc != nil {
+ r.DeleteFunc(obj)
+ }
+}
+
+// DeletionHandlingMetaNamespaceKeyFunc checks for
+// cache.DeletedFinalStateUnknown objects before calling
+// cache.MetaNamespaceKeyFunc.
+func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
+ if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
+ return d.Key, nil
+ }
+ return cache.MetaNamespaceKeyFunc(obj)
+}
+
+// NewInformer returns a cache.Store and a controller for populating the store
+// while also providing event notifications. You should only used the returned
+// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event
+// notifications to be faulty.
+//
+// Parameters:
+// * lw is list and watch functions for the source of the resource you want to
+// be informed of.
+// * objType is an object of the type that you expect to receive.
+// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
+// calls, even if nothing changed). Otherwise, re-list will be delayed as
+// long as possible (until the upstream source closes the watch or times out,
+// or you stop the controller).
+// * h is the object you want notifications sent to.
+//
+func NewInformer(
+ lw cache.ListerWatcher,
+ objType runtime.Object,
+ resyncPeriod time.Duration,
+ h ResourceEventHandler,
+) (cache.Store, *Controller) {
+ // This will hold the client state, as we know it.
+ clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc)
+
+ // This will hold incoming changes. Note how we pass clientState in as a
+ // KeyLister, that way resync operations will result in the correct set
+ // of update/delete deltas.
+ fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)
+
+ cfg := &Config{
+ Queue: fifo,
+ ListerWatcher: lw,
+ ObjectType: objType,
+ FullResyncPeriod: resyncPeriod,
+ RetryOnError: false,
+
+ Process: func(obj interface{}) error {
+ // from oldest to newest
+ for _, d := range obj.(cache.Deltas) {
+ switch d.Type {
+ case cache.Sync, cache.Added, cache.Updated:
+ if old, exists, err := clientState.Get(d.Object); err == nil && exists {
+ if err := clientState.Update(d.Object); err != nil {
+ return err
+ }
+ h.OnUpdate(old, d.Object)
+ } else {
+ if err := clientState.Add(d.Object); err != nil {
+ return err
+ }
+ h.OnAdd(d.Object)
+ }
+ case cache.Deleted:
+ if err := clientState.Delete(d.Object); err != nil {
+ return err
+ }
+ h.OnDelete(d.Object)
+ }
+ }
+ return nil
+ },
+ }
+ return clientState, New(cfg)
+}
+
+// NewIndexerInformer returns a cache.Indexer and a controller for populating the index
+// while also providing event notifications. You should only used the returned
+// cache.Index for Get/List operations; Add/Modify/Deletes will cause the event
+// notifications to be faulty.
+//
+// Parameters:
+// * lw is list and watch functions for the source of the resource you want to
+// be informed of.
+// * objType is an object of the type that you expect to receive.
+// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
+// calls, even if nothing changed). Otherwise, re-list will be delayed as
+// long as possible (until the upstream source closes the watch or times out,
+// or you stop the controller).
+// * h is the object you want notifications sent to.
+//
+func NewIndexerInformer(
+ lw cache.ListerWatcher,
+ objType runtime.Object,
+ resyncPeriod time.Duration,
+ h ResourceEventHandler,
+ indexers cache.Indexers,
+) (cache.Indexer, *Controller) {
+ // This will hold the client state, as we know it.
+ clientState := cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
+
+ // This will hold incoming changes. Note how we pass clientState in as a
+ // KeyLister, that way resync operations will result in the correct set
+ // of update/delete deltas.
+ fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)
+
+ cfg := &Config{
+ Queue: fifo,
+ ListerWatcher: lw,
+ ObjectType: objType,
+ FullResyncPeriod: resyncPeriod,
+ RetryOnError: false,
+
+ Process: func(obj interface{}) error {
+ // from oldest to newest
+ for _, d := range obj.(cache.Deltas) {
+ switch d.Type {
+ case cache.Sync, cache.Added, cache.Updated:
+ if old, exists, err := clientState.Get(d.Object); err == nil && exists {
+ if err := clientState.Update(d.Object); err != nil {
+ return err
+ }
+ h.OnUpdate(old, d.Object)
+ } else {
+ if err := clientState.Add(d.Object); err != nil {
+ return err
+ }
+ h.OnAdd(d.Object)
+ }
+ case cache.Deleted:
+ if err := clientState.Delete(d.Object); err != nil {
+ return err
+ }
+ h.OnDelete(d.Object)
+ }
+ }
+ return nil
+ },
+ }
+ return clientState, New(cfg)
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/doc.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/doc.go
new file mode 100644
index 0000000..feceba3
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package framework implements all the grunt work involved in running a simple controller.
+package framework
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/fake_controller_source.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/fake_controller_source.go
new file mode 100644
index 0000000..ee00c05
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/fake_controller_source.go
@@ -0,0 +1,262 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package framework
+
+import (
+ "errors"
+ "math/rand"
+ "strconv"
+ "sync"
+
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/api/meta"
+ "k8s.io/kubernetes/pkg/runtime"
+ "k8s.io/kubernetes/pkg/types"
+ "k8s.io/kubernetes/pkg/watch"
+)
+
+func NewFakeControllerSource() *FakeControllerSource {
+ return &FakeControllerSource{
+ Items: map[nnu]runtime.Object{},
+ Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
+ }
+}
+
+func NewFakePVControllerSource() *FakePVControllerSource {
+ return &FakePVControllerSource{
+ FakeControllerSource{
+ Items: map[nnu]runtime.Object{},
+ Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
+ }}
+}
+
+func NewFakePVCControllerSource() *FakePVCControllerSource {
+ return &FakePVCControllerSource{
+ FakeControllerSource{
+ Items: map[nnu]runtime.Object{},
+ Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
+ }}
+}
+
+// FakeControllerSource implements listing/watching for testing.
+type FakeControllerSource struct {
+ lock sync.RWMutex
+ Items map[nnu]runtime.Object
+ changes []watch.Event // one change per resourceVersion
+ Broadcaster *watch.Broadcaster
+}
+
+type FakePVControllerSource struct {
+ FakeControllerSource
+}
+
+type FakePVCControllerSource struct {
+ FakeControllerSource
+}
+
+// namespace, name, uid to be used as a key.
+type nnu struct {
+ namespace, name string
+ uid types.UID
+}
+
+// Add adds an object to the set and sends an add event to watchers.
+// obj's ResourceVersion is set.
+func (f *FakeControllerSource) Add(obj runtime.Object) {
+ f.Change(watch.Event{Type: watch.Added, Object: obj}, 1)
+}
+
+// Modify updates an object in the set and sends a modified event to watchers.
+// obj's ResourceVersion is set.
+func (f *FakeControllerSource) Modify(obj runtime.Object) {
+ f.Change(watch.Event{Type: watch.Modified, Object: obj}, 1)
+}
+
+// Delete deletes an object from the set and sends a delete event to watchers.
+// obj's ResourceVersion is set.
+func (f *FakeControllerSource) Delete(lastValue runtime.Object) {
+ f.Change(watch.Event{Type: watch.Deleted, Object: lastValue}, 1)
+}
+
+// AddDropWatch adds an object to the set but forgets to send an add event to
+// watchers.
+// obj's ResourceVersion is set.
+func (f *FakeControllerSource) AddDropWatch(obj runtime.Object) {
+ f.Change(watch.Event{Type: watch.Added, Object: obj}, 0)
+}
+
+// ModifyDropWatch updates an object in the set but forgets to send a modify
+// event to watchers.
+// obj's ResourceVersion is set.
+func (f *FakeControllerSource) ModifyDropWatch(obj runtime.Object) {
+ f.Change(watch.Event{Type: watch.Modified, Object: obj}, 0)
+}
+
+// DeleteDropWatch deletes an object from the set but forgets to send a delete
+// event to watchers.
+// obj's ResourceVersion is set.
+func (f *FakeControllerSource) DeleteDropWatch(lastValue runtime.Object) {
+ f.Change(watch.Event{Type: watch.Deleted, Object: lastValue}, 0)
+}
+
+func (f *FakeControllerSource) key(accessor meta.Object) nnu {
+ return nnu{accessor.GetNamespace(), accessor.GetName(), accessor.GetUID()}
+}
+
+// Change records the given event (setting the object's resource version) and
+// sends a watch event with the specified probability.
+func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ accessor, err := meta.Accessor(e.Object)
+ if err != nil {
+ panic(err) // this is test code only
+ }
+
+ resourceVersion := len(f.changes) + 1
+ accessor.SetResourceVersion(strconv.Itoa(resourceVersion))
+ f.changes = append(f.changes, e)
+ key := f.key(accessor)
+ switch e.Type {
+ case watch.Added, watch.Modified:
+ f.Items[key] = e.Object
+ case watch.Deleted:
+ delete(f.Items, key)
+ }
+
+ if rand.Float64() < watchProbability {
+ f.Broadcaster.Action(e.Type, e.Object)
+ }
+}
+
+func (f *FakeControllerSource) getListItemsLocked() ([]runtime.Object, error) {
+ list := make([]runtime.Object, 0, len(f.Items))
+ for _, obj := range f.Items {
+ // Must make a copy to allow clients to modify the object.
+ // Otherwise, if they make a change and write it back, they
+ // will inadvertently change our canonical copy (in
+ // addition to racing with other clients).
+ objCopy, err := api.Scheme.DeepCopy(obj)
+ if err != nil {
+ return nil, err
+ }
+ list = append(list, objCopy.(runtime.Object))
+ }
+ return list, nil
+}
+
+// List returns a list object, with its resource version set.
+func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, error) {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ list, err := f.getListItemsLocked()
+ if err != nil {
+ return nil, err
+ }
+ listObj := &api.List{}
+ if err := meta.SetList(listObj, list); err != nil {
+ return nil, err
+ }
+ objMeta, err := api.ListMetaFor(listObj)
+ if err != nil {
+ return nil, err
+ }
+ resourceVersion := len(f.changes)
+ objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
+ return listObj, nil
+}
+
+// List returns a list object, with its resource version set.
+func (f *FakePVControllerSource) List(options api.ListOptions) (runtime.Object, error) {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ list, err := f.FakeControllerSource.getListItemsLocked()
+ if err != nil {
+ return nil, err
+ }
+ listObj := &api.PersistentVolumeList{}
+ if err := meta.SetList(listObj, list); err != nil {
+ return nil, err
+ }
+ objMeta, err := api.ListMetaFor(listObj)
+ if err != nil {
+ return nil, err
+ }
+ resourceVersion := len(f.changes)
+ objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
+ return listObj, nil
+}
+
+// List returns a list object, with its resource version set.
+func (f *FakePVCControllerSource) List(options api.ListOptions) (runtime.Object, error) {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ list, err := f.FakeControllerSource.getListItemsLocked()
+ if err != nil {
+ return nil, err
+ }
+ listObj := &api.PersistentVolumeClaimList{}
+ if err := meta.SetList(listObj, list); err != nil {
+ return nil, err
+ }
+ objMeta, err := api.ListMetaFor(listObj)
+ if err != nil {
+ return nil, err
+ }
+ resourceVersion := len(f.changes)
+ objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
+ return listObj, nil
+}
+
+// Watch returns a watch, which will be pre-populated with all changes
+// after resourceVersion.
+func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface, error) {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ rc, err := strconv.Atoi(options.ResourceVersion)
+ if err != nil {
+ return nil, err
+ }
+ if rc < len(f.changes) {
+ changes := []watch.Event{}
+ for _, c := range f.changes[rc:] {
+ // Must make a copy to allow clients to modify the
+ // object. Otherwise, if they make a change and write
+ // it back, they will inadvertently change the our
+ // canonical copy (in addition to racing with other
+ // clients).
+ objCopy, err := api.Scheme.DeepCopy(c.Object)
+ if err != nil {
+ return nil, err
+ }
+ changes = append(changes, watch.Event{Type: c.Type, Object: objCopy.(runtime.Object)})
+ }
+ return f.Broadcaster.WatchWithPrefix(changes), nil
+ } else if rc > len(f.changes) {
+ return nil, errors.New("resource version in the future not supported by this fake")
+ }
+ return f.Broadcaster.Watch(), nil
+}
+
+// Shutdown closes the underlying broadcaster, waiting for events to be
+// delivered. It's an error to call any method after calling shutdown. This is
+// enforced by Shutdown() leaving f locked.
+func (f *FakeControllerSource) Shutdown() {
+ f.lock.Lock() // Purposely no unlock.
+ f.Broadcaster.Shutdown()
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go
new file mode 100644
index 0000000..87dcac6
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go
@@ -0,0 +1,383 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package framework
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "k8s.io/kubernetes/pkg/client/cache"
+ "k8s.io/kubernetes/pkg/runtime"
+ utilruntime "k8s.io/kubernetes/pkg/util/runtime"
+)
+
+// if you use this, there is one behavior change compared to a standard Informer.
+// When you receive a notification, the cache will be AT LEAST as fresh as the
+// notification, but it MAY be more fresh. You should NOT depend on the contents
+// of the cache exactly matching the notification you've received in handler
+// functions. If there was a create, followed by a delete, the cache may NOT
+// have your item. This has advantages over the broadcaster since it allows us
+// to share a common cache across many controllers. Extending the broadcaster
+// would have required us keep duplicate caches for each watch.
+type SharedInformer interface {
+ // events to a single handler are delivered sequentially, but there is no coordination between different handlers
+ // You may NOT add a handler *after* the SharedInformer is running. That will result in an error being returned.
+ // TODO we should try to remove this restriction eventually.
+ AddEventHandler(handler ResourceEventHandler) error
+ GetStore() cache.Store
+ // GetController gives back a synthetic interface that "votes" to start the informer
+ GetController() ControllerInterface
+ Run(stopCh <-chan struct{})
+ HasSynced() bool
+ LastSyncResourceVersion() string
+}
+
+type SharedIndexInformer interface {
+ SharedInformer
+ // AddIndexers add indexers to the informer before it starts.
+ AddIndexers(indexers cache.Indexers) error
+ GetIndexer() cache.Indexer
+}
+
+// NewSharedInformer creates a new instance for the listwatcher.
+// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
+// be shared amongst all consumers.
+func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
+ return NewSharedIndexInformer(lw, objType, resyncPeriod, cache.Indexers{})
+}
+
+// NewSharedIndexInformer creates a new instance for the listwatcher.
+// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
+// be shared amongst all consumers.
+func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers cache.Indexers) SharedIndexInformer {
+ sharedIndexInformer := &sharedIndexInformer{
+ processor: &sharedProcessor{},
+ indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
+ listerWatcher: lw,
+ objectType: objType,
+ fullResyncPeriod: resyncPeriod,
+ }
+ return sharedIndexInformer
+}
+
+type sharedIndexInformer struct {
+ indexer cache.Indexer
+ controller *Controller
+
+ processor *sharedProcessor
+
+ // This block is tracked to handle late initialization of the controller
+ listerWatcher cache.ListerWatcher
+ objectType runtime.Object
+ fullResyncPeriod time.Duration
+
+ started bool
+ startedLock sync.Mutex
+
+ // blockDeltas gives a way to stop all event distribution so that a late event handler
+ // can safely join the shared informer.
+ blockDeltas sync.Mutex
+ // stopCh is the channel used to stop the main Run process. We have to track it so that
+ // late joiners can have a proper stop
+ stopCh <-chan struct{}
+}
+
+// dummyController hides the fact that a SharedInformer is different from a dedicated one
+// where a caller can `Run`. The run method is disonnected in this case, because higher
+// level logic will decide when to start the SharedInformer and related controller.
+// Because returning information back is always asynchronous, the legacy callers shouldn't
+// notice any change in behavior.
+type dummyController struct {
+ informer *sharedIndexInformer
+}
+
+func (v *dummyController) Run(stopCh <-chan struct{}) {
+}
+
+func (v *dummyController) HasSynced() bool {
+ return v.informer.HasSynced()
+}
+
+type updateNotification struct {
+ oldObj interface{}
+ newObj interface{}
+}
+
+type addNotification struct {
+ newObj interface{}
+}
+
+type deleteNotification struct {
+ oldObj interface{}
+}
+
+func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+
+ fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.indexer)
+
+ cfg := &Config{
+ Queue: fifo,
+ ListerWatcher: s.listerWatcher,
+ ObjectType: s.objectType,
+ FullResyncPeriod: s.fullResyncPeriod,
+ RetryOnError: false,
+
+ Process: s.HandleDeltas,
+ }
+
+ func() {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ s.controller = New(cfg)
+ s.started = true
+ }()
+
+ s.stopCh = stopCh
+ s.processor.run(stopCh)
+ s.controller.Run(stopCh)
+}
+
+func (s *sharedIndexInformer) isStarted() bool {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+ return s.started
+}
+
+func (s *sharedIndexInformer) HasSynced() bool {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if s.controller == nil {
+ return false
+ }
+ return s.controller.HasSynced()
+}
+
+func (s *sharedIndexInformer) LastSyncResourceVersion() string {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if s.controller == nil {
+ return ""
+ }
+ return s.controller.reflector.LastSyncResourceVersion()
+}
+
+func (s *sharedIndexInformer) GetStore() cache.Store {
+ return s.indexer
+}
+
+func (s *sharedIndexInformer) GetIndexer() cache.Indexer {
+ return s.indexer
+}
+
+func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if s.started {
+ return fmt.Errorf("informer has already started")
+ }
+
+ return s.indexer.AddIndexers(indexers)
+}
+
+func (s *sharedIndexInformer) GetController() ControllerInterface {
+ return &dummyController{informer: s}
+}
+
+func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error {
+ s.startedLock.Lock()
+ defer s.startedLock.Unlock()
+
+ if !s.started {
+ listener := newProcessListener(handler)
+ s.processor.listeners = append(s.processor.listeners, listener)
+ return nil
+ }
+
+ // in order to safely join, we have to
+ // 1. stop sending add/update/delete notifications
+ // 2. do a list against the store
+ // 3. send synthetic "Add" events to the new handler
+ // 4. unblock
+ s.blockDeltas.Lock()
+ defer s.blockDeltas.Unlock()
+
+ listener := newProcessListener(handler)
+ s.processor.listeners = append(s.processor.listeners, listener)
+
+ go listener.run(s.stopCh)
+ go listener.pop(s.stopCh)
+
+ items := s.indexer.List()
+ for i := range items {
+ listener.add(addNotification{newObj: items[i]})
+ }
+
+ return nil
+}
+
+func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
+ s.blockDeltas.Lock()
+ defer s.blockDeltas.Unlock()
+
+ // from oldest to newest
+ for _, d := range obj.(cache.Deltas) {
+ switch d.Type {
+ case cache.Sync, cache.Added, cache.Updated:
+ if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
+ if err := s.indexer.Update(d.Object); err != nil {
+ return err
+ }
+ s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object})
+ } else {
+ if err := s.indexer.Add(d.Object); err != nil {
+ return err
+ }
+ s.processor.distribute(addNotification{newObj: d.Object})
+ }
+ case cache.Deleted:
+ if err := s.indexer.Delete(d.Object); err != nil {
+ return err
+ }
+ s.processor.distribute(deleteNotification{oldObj: d.Object})
+ }
+ }
+ return nil
+}
+
+type sharedProcessor struct {
+ listeners []*processorListener
+}
+
+func (p *sharedProcessor) distribute(obj interface{}) {
+ for _, listener := range p.listeners {
+ listener.add(obj)
+ }
+}
+
+func (p *sharedProcessor) run(stopCh <-chan struct{}) {
+ for _, listener := range p.listeners {
+ go listener.run(stopCh)
+ go listener.pop(stopCh)
+ }
+}
+
+type processorListener struct {
+ // lock/cond protects access to 'pendingNotifications'.
+ lock sync.RWMutex
+ cond sync.Cond
+
+ // pendingNotifications is an unbounded slice that holds all notifications not yet distributed
+ // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
+ // added until we OOM.
+ // TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
+ // we should try to do something better
+ pendingNotifications []interface{}
+
+ nextCh chan interface{}
+
+ handler ResourceEventHandler
+}
+
+func newProcessListener(handler ResourceEventHandler) *processorListener {
+ ret := &processorListener{
+ pendingNotifications: []interface{}{},
+ nextCh: make(chan interface{}),
+ handler: handler,
+ }
+
+ ret.cond.L = &ret.lock
+ return ret
+}
+
+func (p *processorListener) add(notification interface{}) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ p.pendingNotifications = append(p.pendingNotifications, notification)
+ p.cond.Broadcast()
+}
+
+func (p *processorListener) pop(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+
+ for {
+ blockingGet := func() (interface{}, bool) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ for len(p.pendingNotifications) == 0 {
+ // check if we're shutdown
+ select {
+ case <-stopCh:
+ return nil, true
+ default:
+ }
+ p.cond.Wait()
+ }
+
+ nt := p.pendingNotifications[0]
+ p.pendingNotifications = p.pendingNotifications[1:]
+ return nt, false
+ }
+
+ notification, stopped := blockingGet()
+ if stopped {
+ return
+ }
+
+ select {
+ case <-stopCh:
+ return
+ case p.nextCh <- notification:
+ }
+ }
+}
+
+func (p *processorListener) run(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+
+ for {
+ var next interface{}
+ select {
+ case <-stopCh:
+ func() {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ p.cond.Broadcast()
+ }()
+ return
+ case next = <-p.nextCh:
+ }
+
+ switch notification := next.(type) {
+ case updateNotification:
+ p.handler.OnUpdate(notification.oldObj, notification.newObj)
+ case addNotification:
+ p.handler.OnAdd(notification.newObj)
+ case deleteNotification:
+ p.handler.OnDelete(notification.oldObj)
+ default:
+ utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
+ }
+ }
+}