diff options
Diffstat (limited to 'src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller')
4 files changed, 989 insertions, 0 deletions
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go new file mode 100644 index 0000000..8cbd124 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go @@ -0,0 +1,326 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "sync" + "time" + + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" +) + +// Config contains all the settings for a Controller. +type Config struct { + // The queue for your objects; either a cache.FIFO or + // a cache.DeltaFIFO. Your Process() function should accept + // the output of this Oueue's Pop() method. + cache.Queue + + // Something that can list and watch your objects. + cache.ListerWatcher + + // Something that can process your objects. + Process ProcessFunc + + // The type of your objects. + ObjectType runtime.Object + + // Reprocess everything at least this often. + // Note that if it takes longer for you to clear the queue than this + // period, you will end up processing items in the order determined + // by cache.FIFO.Replace(). Currently, this is random. If this is a + // problem, we can change that replacement policy to append new + // things to the end of the queue instead of replacing the entire + // queue. + FullResyncPeriod time.Duration + + // If true, when Process() returns an error, re-enqueue the object. + // TODO: add interface to let you inject a delay/backoff or drop + // the object completely if desired. Pass the object in + // question to this interface as a parameter. + RetryOnError bool +} + +// ProcessFunc processes a single object. +type ProcessFunc func(obj interface{}) error + +// Controller is a generic controller framework. +type Controller struct { + config Config + reflector *cache.Reflector + reflectorMutex sync.RWMutex +} + +// TODO make the "Controller" private, and convert all references to use ControllerInterface instead +type ControllerInterface interface { + Run(stopCh <-chan struct{}) + HasSynced() bool +} + +// New makes a new Controller from the given Config. +func New(c *Config) *Controller { + ctlr := &Controller{ + config: *c, + } + return ctlr +} + +// Run begins processing items, and will continue until a value is sent down stopCh. +// It's an error to call Run more than once. +// Run blocks; call via go. +func (c *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + r := cache.NewReflector( + c.config.ListerWatcher, + c.config.ObjectType, + c.config.Queue, + c.config.FullResyncPeriod, + ) + + c.reflectorMutex.Lock() + c.reflector = r + c.reflectorMutex.Unlock() + + r.RunUntil(stopCh) + + wait.Until(c.processLoop, time.Second, stopCh) +} + +// Returns true once this controller has completed an initial resource listing +func (c *Controller) HasSynced() bool { + return c.config.Queue.HasSynced() +} + +// Requeue adds the provided object back into the queue if it does not already exist. +func (c *Controller) Requeue(obj interface{}) error { + return c.config.Queue.AddIfNotPresent(cache.Deltas{ + cache.Delta{ + Type: cache.Sync, + Object: obj, + }, + }) +} + +// processLoop drains the work queue. +// TODO: Consider doing the processing in parallel. This will require a little thought +// to make sure that we don't end up processing the same object multiple times +// concurrently. +func (c *Controller) processLoop() { + for { + obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process)) + if err != nil { + if c.config.RetryOnError { + // This is the safe way to re-enqueue. + c.config.Queue.AddIfNotPresent(obj) + } + } + } +} + +// ResourceEventHandler can handle notifications for events that happen to a +// resource. The events are informational only, so you can't return an +// error. +// * OnAdd is called when an object is added. +// * OnUpdate is called when an object is modified. Note that oldObj is the +// last known state of the object-- it is possible that several changes +// were combined together, so you can't use this to see every single +// change. OnUpdate is also called when a re-list happens, and it will +// get called even if nothing changed. This is useful for periodically +// evaluating or syncing something. +// * OnDelete will get the final state of the item if it is known, otherwise +// it will get an object of type cache.DeletedFinalStateUnknown. This can +// happen if the watch is closed and misses the delete event and we don't +// notice the deletion until the subsequent re-list. +type ResourceEventHandler interface { + OnAdd(obj interface{}) + OnUpdate(oldObj, newObj interface{}) + OnDelete(obj interface{}) +} + +// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or +// as few of the notification functions as you want while still implementing +// ResourceEventHandler. +type ResourceEventHandlerFuncs struct { + AddFunc func(obj interface{}) + UpdateFunc func(oldObj, newObj interface{}) + DeleteFunc func(obj interface{}) +} + +// OnAdd calls AddFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { + if r.AddFunc != nil { + r.AddFunc(obj) + } +} + +// OnUpdate calls UpdateFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { + if r.UpdateFunc != nil { + r.UpdateFunc(oldObj, newObj) + } +} + +// OnDelete calls DeleteFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { + if r.DeleteFunc != nil { + r.DeleteFunc(obj) + } +} + +// DeletionHandlingMetaNamespaceKeyFunc checks for +// cache.DeletedFinalStateUnknown objects before calling +// cache.MetaNamespaceKeyFunc. +func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + return d.Key, nil + } + return cache.MetaNamespaceKeyFunc(obj) +} + +// NewInformer returns a cache.Store and a controller for populating the store +// while also providing event notifications. You should only used the returned +// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event +// notifications to be faulty. +// +// Parameters: +// * lw is list and watch functions for the source of the resource you want to +// be informed of. +// * objType is an object of the type that you expect to receive. +// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate +// calls, even if nothing changed). Otherwise, re-list will be delayed as +// long as possible (until the upstream source closes the watch or times out, +// or you stop the controller). +// * h is the object you want notifications sent to. +// +func NewInformer( + lw cache.ListerWatcher, + objType runtime.Object, + resyncPeriod time.Duration, + h ResourceEventHandler, +) (cache.Store, *Controller) { + // This will hold the client state, as we know it. + clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc) + + // This will hold incoming changes. Note how we pass clientState in as a + // KeyLister, that way resync operations will result in the correct set + // of update/delete deltas. + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: lw, + ObjectType: objType, + FullResyncPeriod: resyncPeriod, + RetryOnError: false, + + Process: func(obj interface{}) error { + // from oldest to newest + for _, d := range obj.(cache.Deltas) { + switch d.Type { + case cache.Sync, cache.Added, cache.Updated: + if old, exists, err := clientState.Get(d.Object); err == nil && exists { + if err := clientState.Update(d.Object); err != nil { + return err + } + h.OnUpdate(old, d.Object) + } else { + if err := clientState.Add(d.Object); err != nil { + return err + } + h.OnAdd(d.Object) + } + case cache.Deleted: + if err := clientState.Delete(d.Object); err != nil { + return err + } + h.OnDelete(d.Object) + } + } + return nil + }, + } + return clientState, New(cfg) +} + +// NewIndexerInformer returns a cache.Indexer and a controller for populating the index +// while also providing event notifications. You should only used the returned +// cache.Index for Get/List operations; Add/Modify/Deletes will cause the event +// notifications to be faulty. +// +// Parameters: +// * lw is list and watch functions for the source of the resource you want to +// be informed of. +// * objType is an object of the type that you expect to receive. +// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate +// calls, even if nothing changed). Otherwise, re-list will be delayed as +// long as possible (until the upstream source closes the watch or times out, +// or you stop the controller). +// * h is the object you want notifications sent to. +// +func NewIndexerInformer( + lw cache.ListerWatcher, + objType runtime.Object, + resyncPeriod time.Duration, + h ResourceEventHandler, + indexers cache.Indexers, +) (cache.Indexer, *Controller) { + // This will hold the client state, as we know it. + clientState := cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) + + // This will hold incoming changes. Note how we pass clientState in as a + // KeyLister, that way resync operations will result in the correct set + // of update/delete deltas. + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: lw, + ObjectType: objType, + FullResyncPeriod: resyncPeriod, + RetryOnError: false, + + Process: func(obj interface{}) error { + // from oldest to newest + for _, d := range obj.(cache.Deltas) { + switch d.Type { + case cache.Sync, cache.Added, cache.Updated: + if old, exists, err := clientState.Get(d.Object); err == nil && exists { + if err := clientState.Update(d.Object); err != nil { + return err + } + h.OnUpdate(old, d.Object) + } else { + if err := clientState.Add(d.Object); err != nil { + return err + } + h.OnAdd(d.Object) + } + case cache.Deleted: + if err := clientState.Delete(d.Object); err != nil { + return err + } + h.OnDelete(d.Object) + } + } + return nil + }, + } + return clientState, New(cfg) +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/doc.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/doc.go new file mode 100644 index 0000000..feceba3 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package framework implements all the grunt work involved in running a simple controller. +package framework diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/fake_controller_source.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/fake_controller_source.go new file mode 100644 index 0000000..ee00c05 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/fake_controller_source.go @@ -0,0 +1,262 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "errors" + "math/rand" + "strconv" + "sync" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/watch" +) + +func NewFakeControllerSource() *FakeControllerSource { + return &FakeControllerSource{ + Items: map[nnu]runtime.Object{}, + Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), + } +} + +func NewFakePVControllerSource() *FakePVControllerSource { + return &FakePVControllerSource{ + FakeControllerSource{ + Items: map[nnu]runtime.Object{}, + Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), + }} +} + +func NewFakePVCControllerSource() *FakePVCControllerSource { + return &FakePVCControllerSource{ + FakeControllerSource{ + Items: map[nnu]runtime.Object{}, + Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), + }} +} + +// FakeControllerSource implements listing/watching for testing. +type FakeControllerSource struct { + lock sync.RWMutex + Items map[nnu]runtime.Object + changes []watch.Event // one change per resourceVersion + Broadcaster *watch.Broadcaster +} + +type FakePVControllerSource struct { + FakeControllerSource +} + +type FakePVCControllerSource struct { + FakeControllerSource +} + +// namespace, name, uid to be used as a key. +type nnu struct { + namespace, name string + uid types.UID +} + +// Add adds an object to the set and sends an add event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) Add(obj runtime.Object) { + f.Change(watch.Event{Type: watch.Added, Object: obj}, 1) +} + +// Modify updates an object in the set and sends a modified event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) Modify(obj runtime.Object) { + f.Change(watch.Event{Type: watch.Modified, Object: obj}, 1) +} + +// Delete deletes an object from the set and sends a delete event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) Delete(lastValue runtime.Object) { + f.Change(watch.Event{Type: watch.Deleted, Object: lastValue}, 1) +} + +// AddDropWatch adds an object to the set but forgets to send an add event to +// watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) AddDropWatch(obj runtime.Object) { + f.Change(watch.Event{Type: watch.Added, Object: obj}, 0) +} + +// ModifyDropWatch updates an object in the set but forgets to send a modify +// event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) ModifyDropWatch(obj runtime.Object) { + f.Change(watch.Event{Type: watch.Modified, Object: obj}, 0) +} + +// DeleteDropWatch deletes an object from the set but forgets to send a delete +// event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) DeleteDropWatch(lastValue runtime.Object) { + f.Change(watch.Event{Type: watch.Deleted, Object: lastValue}, 0) +} + +func (f *FakeControllerSource) key(accessor meta.Object) nnu { + return nnu{accessor.GetNamespace(), accessor.GetName(), accessor.GetUID()} +} + +// Change records the given event (setting the object's resource version) and +// sends a watch event with the specified probability. +func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) { + f.lock.Lock() + defer f.lock.Unlock() + + accessor, err := meta.Accessor(e.Object) + if err != nil { + panic(err) // this is test code only + } + + resourceVersion := len(f.changes) + 1 + accessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + f.changes = append(f.changes, e) + key := f.key(accessor) + switch e.Type { + case watch.Added, watch.Modified: + f.Items[key] = e.Object + case watch.Deleted: + delete(f.Items, key) + } + + if rand.Float64() < watchProbability { + f.Broadcaster.Action(e.Type, e.Object) + } +} + +func (f *FakeControllerSource) getListItemsLocked() ([]runtime.Object, error) { + list := make([]runtime.Object, 0, len(f.Items)) + for _, obj := range f.Items { + // Must make a copy to allow clients to modify the object. + // Otherwise, if they make a change and write it back, they + // will inadvertently change our canonical copy (in + // addition to racing with other clients). + objCopy, err := api.Scheme.DeepCopy(obj) + if err != nil { + return nil, err + } + list = append(list, objCopy.(runtime.Object)) + } + return list, nil +} + +// List returns a list object, with its resource version set. +func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, error) { + f.lock.RLock() + defer f.lock.RUnlock() + list, err := f.getListItemsLocked() + if err != nil { + return nil, err + } + listObj := &api.List{} + if err := meta.SetList(listObj, list); err != nil { + return nil, err + } + objMeta, err := api.ListMetaFor(listObj) + if err != nil { + return nil, err + } + resourceVersion := len(f.changes) + objMeta.ResourceVersion = strconv.Itoa(resourceVersion) + return listObj, nil +} + +// List returns a list object, with its resource version set. +func (f *FakePVControllerSource) List(options api.ListOptions) (runtime.Object, error) { + f.lock.RLock() + defer f.lock.RUnlock() + list, err := f.FakeControllerSource.getListItemsLocked() + if err != nil { + return nil, err + } + listObj := &api.PersistentVolumeList{} + if err := meta.SetList(listObj, list); err != nil { + return nil, err + } + objMeta, err := api.ListMetaFor(listObj) + if err != nil { + return nil, err + } + resourceVersion := len(f.changes) + objMeta.ResourceVersion = strconv.Itoa(resourceVersion) + return listObj, nil +} + +// List returns a list object, with its resource version set. +func (f *FakePVCControllerSource) List(options api.ListOptions) (runtime.Object, error) { + f.lock.RLock() + defer f.lock.RUnlock() + list, err := f.FakeControllerSource.getListItemsLocked() + if err != nil { + return nil, err + } + listObj := &api.PersistentVolumeClaimList{} + if err := meta.SetList(listObj, list); err != nil { + return nil, err + } + objMeta, err := api.ListMetaFor(listObj) + if err != nil { + return nil, err + } + resourceVersion := len(f.changes) + objMeta.ResourceVersion = strconv.Itoa(resourceVersion) + return listObj, nil +} + +// Watch returns a watch, which will be pre-populated with all changes +// after resourceVersion. +func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface, error) { + f.lock.RLock() + defer f.lock.RUnlock() + rc, err := strconv.Atoi(options.ResourceVersion) + if err != nil { + return nil, err + } + if rc < len(f.changes) { + changes := []watch.Event{} + for _, c := range f.changes[rc:] { + // Must make a copy to allow clients to modify the + // object. Otherwise, if they make a change and write + // it back, they will inadvertently change the our + // canonical copy (in addition to racing with other + // clients). + objCopy, err := api.Scheme.DeepCopy(c.Object) + if err != nil { + return nil, err + } + changes = append(changes, watch.Event{Type: c.Type, Object: objCopy.(runtime.Object)}) + } + return f.Broadcaster.WatchWithPrefix(changes), nil + } else if rc > len(f.changes) { + return nil, errors.New("resource version in the future not supported by this fake") + } + return f.Broadcaster.Watch(), nil +} + +// Shutdown closes the underlying broadcaster, waiting for events to be +// delivered. It's an error to call any method after calling shutdown. This is +// enforced by Shutdown() leaving f locked. +func (f *FakeControllerSource) Shutdown() { + f.lock.Lock() // Purposely no unlock. + f.Broadcaster.Shutdown() +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go new file mode 100644 index 0000000..87dcac6 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/shared_informer.go @@ -0,0 +1,383 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "fmt" + "sync" + "time" + + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" +) + +// if you use this, there is one behavior change compared to a standard Informer. +// When you receive a notification, the cache will be AT LEAST as fresh as the +// notification, but it MAY be more fresh. You should NOT depend on the contents +// of the cache exactly matching the notification you've received in handler +// functions. If there was a create, followed by a delete, the cache may NOT +// have your item. This has advantages over the broadcaster since it allows us +// to share a common cache across many controllers. Extending the broadcaster +// would have required us keep duplicate caches for each watch. +type SharedInformer interface { + // events to a single handler are delivered sequentially, but there is no coordination between different handlers + // You may NOT add a handler *after* the SharedInformer is running. That will result in an error being returned. + // TODO we should try to remove this restriction eventually. + AddEventHandler(handler ResourceEventHandler) error + GetStore() cache.Store + // GetController gives back a synthetic interface that "votes" to start the informer + GetController() ControllerInterface + Run(stopCh <-chan struct{}) + HasSynced() bool + LastSyncResourceVersion() string +} + +type SharedIndexInformer interface { + SharedInformer + // AddIndexers add indexers to the informer before it starts. + AddIndexers(indexers cache.Indexers) error + GetIndexer() cache.Indexer +} + +// NewSharedInformer creates a new instance for the listwatcher. +// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can +// be shared amongst all consumers. +func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { + return NewSharedIndexInformer(lw, objType, resyncPeriod, cache.Indexers{}) +} + +// NewSharedIndexInformer creates a new instance for the listwatcher. +// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can +// be shared amongst all consumers. +func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers cache.Indexers) SharedIndexInformer { + sharedIndexInformer := &sharedIndexInformer{ + processor: &sharedProcessor{}, + indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), + listerWatcher: lw, + objectType: objType, + fullResyncPeriod: resyncPeriod, + } + return sharedIndexInformer +} + +type sharedIndexInformer struct { + indexer cache.Indexer + controller *Controller + + processor *sharedProcessor + + // This block is tracked to handle late initialization of the controller + listerWatcher cache.ListerWatcher + objectType runtime.Object + fullResyncPeriod time.Duration + + started bool + startedLock sync.Mutex + + // blockDeltas gives a way to stop all event distribution so that a late event handler + // can safely join the shared informer. + blockDeltas sync.Mutex + // stopCh is the channel used to stop the main Run process. We have to track it so that + // late joiners can have a proper stop + stopCh <-chan struct{} +} + +// dummyController hides the fact that a SharedInformer is different from a dedicated one +// where a caller can `Run`. The run method is disonnected in this case, because higher +// level logic will decide when to start the SharedInformer and related controller. +// Because returning information back is always asynchronous, the legacy callers shouldn't +// notice any change in behavior. +type dummyController struct { + informer *sharedIndexInformer +} + +func (v *dummyController) Run(stopCh <-chan struct{}) { +} + +func (v *dummyController) HasSynced() bool { + return v.informer.HasSynced() +} + +type updateNotification struct { + oldObj interface{} + newObj interface{} +} + +type addNotification struct { + newObj interface{} +} + +type deleteNotification struct { + oldObj interface{} +} + +func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.indexer) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: s.listerWatcher, + ObjectType: s.objectType, + FullResyncPeriod: s.fullResyncPeriod, + RetryOnError: false, + + Process: s.HandleDeltas, + } + + func() { + s.startedLock.Lock() + defer s.startedLock.Unlock() + + s.controller = New(cfg) + s.started = true + }() + + s.stopCh = stopCh + s.processor.run(stopCh) + s.controller.Run(stopCh) +} + +func (s *sharedIndexInformer) isStarted() bool { + s.startedLock.Lock() + defer s.startedLock.Unlock() + return s.started +} + +func (s *sharedIndexInformer) HasSynced() bool { + s.startedLock.Lock() + defer s.startedLock.Unlock() + + if s.controller == nil { + return false + } + return s.controller.HasSynced() +} + +func (s *sharedIndexInformer) LastSyncResourceVersion() string { + s.startedLock.Lock() + defer s.startedLock.Unlock() + + if s.controller == nil { + return "" + } + return s.controller.reflector.LastSyncResourceVersion() +} + +func (s *sharedIndexInformer) GetStore() cache.Store { + return s.indexer +} + +func (s *sharedIndexInformer) GetIndexer() cache.Indexer { + return s.indexer +} + +func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error { + s.startedLock.Lock() + defer s.startedLock.Unlock() + + if s.started { + return fmt.Errorf("informer has already started") + } + + return s.indexer.AddIndexers(indexers) +} + +func (s *sharedIndexInformer) GetController() ControllerInterface { + return &dummyController{informer: s} +} + +func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error { + s.startedLock.Lock() + defer s.startedLock.Unlock() + + if !s.started { + listener := newProcessListener(handler) + s.processor.listeners = append(s.processor.listeners, listener) + return nil + } + + // in order to safely join, we have to + // 1. stop sending add/update/delete notifications + // 2. do a list against the store + // 3. send synthetic "Add" events to the new handler + // 4. unblock + s.blockDeltas.Lock() + defer s.blockDeltas.Unlock() + + listener := newProcessListener(handler) + s.processor.listeners = append(s.processor.listeners, listener) + + go listener.run(s.stopCh) + go listener.pop(s.stopCh) + + items := s.indexer.List() + for i := range items { + listener.add(addNotification{newObj: items[i]}) + } + + return nil +} + +func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { + s.blockDeltas.Lock() + defer s.blockDeltas.Unlock() + + // from oldest to newest + for _, d := range obj.(cache.Deltas) { + switch d.Type { + case cache.Sync, cache.Added, cache.Updated: + if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { + if err := s.indexer.Update(d.Object); err != nil { + return err + } + s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}) + } else { + if err := s.indexer.Add(d.Object); err != nil { + return err + } + s.processor.distribute(addNotification{newObj: d.Object}) + } + case cache.Deleted: + if err := s.indexer.Delete(d.Object); err != nil { + return err + } + s.processor.distribute(deleteNotification{oldObj: d.Object}) + } + } + return nil +} + +type sharedProcessor struct { + listeners []*processorListener +} + +func (p *sharedProcessor) distribute(obj interface{}) { + for _, listener := range p.listeners { + listener.add(obj) + } +} + +func (p *sharedProcessor) run(stopCh <-chan struct{}) { + for _, listener := range p.listeners { + go listener.run(stopCh) + go listener.pop(stopCh) + } +} + +type processorListener struct { + // lock/cond protects access to 'pendingNotifications'. + lock sync.RWMutex + cond sync.Cond + + // pendingNotifications is an unbounded slice that holds all notifications not yet distributed + // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications + // added until we OOM. + // TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but + // we should try to do something better + pendingNotifications []interface{} + + nextCh chan interface{} + + handler ResourceEventHandler +} + +func newProcessListener(handler ResourceEventHandler) *processorListener { + ret := &processorListener{ + pendingNotifications: []interface{}{}, + nextCh: make(chan interface{}), + handler: handler, + } + + ret.cond.L = &ret.lock + return ret +} + +func (p *processorListener) add(notification interface{}) { + p.lock.Lock() + defer p.lock.Unlock() + + p.pendingNotifications = append(p.pendingNotifications, notification) + p.cond.Broadcast() +} + +func (p *processorListener) pop(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + for { + blockingGet := func() (interface{}, bool) { + p.lock.Lock() + defer p.lock.Unlock() + + for len(p.pendingNotifications) == 0 { + // check if we're shutdown + select { + case <-stopCh: + return nil, true + default: + } + p.cond.Wait() + } + + nt := p.pendingNotifications[0] + p.pendingNotifications = p.pendingNotifications[1:] + return nt, false + } + + notification, stopped := blockingGet() + if stopped { + return + } + + select { + case <-stopCh: + return + case p.nextCh <- notification: + } + } +} + +func (p *processorListener) run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + for { + var next interface{} + select { + case <-stopCh: + func() { + p.lock.Lock() + defer p.lock.Unlock() + p.cond.Broadcast() + }() + return + case next = <-p.nextCh: + } + + switch notification := next.(type) { + case updateNotification: + p.handler.OnUpdate(notification.oldObj, notification.newObj) + case addNotification: + p.handler.OnAdd(notification.newObj) + case deleteNotification: + p.handler.OnDelete(notification.oldObj) + default: + utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) + } + } +} |