diff options
Diffstat (limited to 'kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go')
-rw-r--r-- | kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go | 326 |
1 files changed, 0 insertions, 326 deletions
diff --git a/kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go b/kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go deleted file mode 100644 index 8cbd124..0000000 --- a/kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go +++ /dev/null @@ -1,326 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package framework - -import ( - "sync" - "time" - - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/runtime" - utilruntime "k8s.io/kubernetes/pkg/util/runtime" - "k8s.io/kubernetes/pkg/util/wait" -) - -// Config contains all the settings for a Controller. -type Config struct { - // The queue for your objects; either a cache.FIFO or - // a cache.DeltaFIFO. Your Process() function should accept - // the output of this Oueue's Pop() method. - cache.Queue - - // Something that can list and watch your objects. - cache.ListerWatcher - - // Something that can process your objects. - Process ProcessFunc - - // The type of your objects. - ObjectType runtime.Object - - // Reprocess everything at least this often. - // Note that if it takes longer for you to clear the queue than this - // period, you will end up processing items in the order determined - // by cache.FIFO.Replace(). Currently, this is random. If this is a - // problem, we can change that replacement policy to append new - // things to the end of the queue instead of replacing the entire - // queue. - FullResyncPeriod time.Duration - - // If true, when Process() returns an error, re-enqueue the object. - // TODO: add interface to let you inject a delay/backoff or drop - // the object completely if desired. Pass the object in - // question to this interface as a parameter. - RetryOnError bool -} - -// ProcessFunc processes a single object. -type ProcessFunc func(obj interface{}) error - -// Controller is a generic controller framework. -type Controller struct { - config Config - reflector *cache.Reflector - reflectorMutex sync.RWMutex -} - -// TODO make the "Controller" private, and convert all references to use ControllerInterface instead -type ControllerInterface interface { - Run(stopCh <-chan struct{}) - HasSynced() bool -} - -// New makes a new Controller from the given Config. -func New(c *Config) *Controller { - ctlr := &Controller{ - config: *c, - } - return ctlr -} - -// Run begins processing items, and will continue until a value is sent down stopCh. -// It's an error to call Run more than once. -// Run blocks; call via go. -func (c *Controller) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - r := cache.NewReflector( - c.config.ListerWatcher, - c.config.ObjectType, - c.config.Queue, - c.config.FullResyncPeriod, - ) - - c.reflectorMutex.Lock() - c.reflector = r - c.reflectorMutex.Unlock() - - r.RunUntil(stopCh) - - wait.Until(c.processLoop, time.Second, stopCh) -} - -// Returns true once this controller has completed an initial resource listing -func (c *Controller) HasSynced() bool { - return c.config.Queue.HasSynced() -} - -// Requeue adds the provided object back into the queue if it does not already exist. -func (c *Controller) Requeue(obj interface{}) error { - return c.config.Queue.AddIfNotPresent(cache.Deltas{ - cache.Delta{ - Type: cache.Sync, - Object: obj, - }, - }) -} - -// processLoop drains the work queue. -// TODO: Consider doing the processing in parallel. This will require a little thought -// to make sure that we don't end up processing the same object multiple times -// concurrently. -func (c *Controller) processLoop() { - for { - obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process)) - if err != nil { - if c.config.RetryOnError { - // This is the safe way to re-enqueue. - c.config.Queue.AddIfNotPresent(obj) - } - } - } -} - -// ResourceEventHandler can handle notifications for events that happen to a -// resource. The events are informational only, so you can't return an -// error. -// * OnAdd is called when an object is added. -// * OnUpdate is called when an object is modified. Note that oldObj is the -// last known state of the object-- it is possible that several changes -// were combined together, so you can't use this to see every single -// change. OnUpdate is also called when a re-list happens, and it will -// get called even if nothing changed. This is useful for periodically -// evaluating or syncing something. -// * OnDelete will get the final state of the item if it is known, otherwise -// it will get an object of type cache.DeletedFinalStateUnknown. This can -// happen if the watch is closed and misses the delete event and we don't -// notice the deletion until the subsequent re-list. -type ResourceEventHandler interface { - OnAdd(obj interface{}) - OnUpdate(oldObj, newObj interface{}) - OnDelete(obj interface{}) -} - -// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or -// as few of the notification functions as you want while still implementing -// ResourceEventHandler. -type ResourceEventHandlerFuncs struct { - AddFunc func(obj interface{}) - UpdateFunc func(oldObj, newObj interface{}) - DeleteFunc func(obj interface{}) -} - -// OnAdd calls AddFunc if it's not nil. -func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { - if r.AddFunc != nil { - r.AddFunc(obj) - } -} - -// OnUpdate calls UpdateFunc if it's not nil. -func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { - if r.UpdateFunc != nil { - r.UpdateFunc(oldObj, newObj) - } -} - -// OnDelete calls DeleteFunc if it's not nil. -func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { - if r.DeleteFunc != nil { - r.DeleteFunc(obj) - } -} - -// DeletionHandlingMetaNamespaceKeyFunc checks for -// cache.DeletedFinalStateUnknown objects before calling -// cache.MetaNamespaceKeyFunc. -func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { - if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { - return d.Key, nil - } - return cache.MetaNamespaceKeyFunc(obj) -} - -// NewInformer returns a cache.Store and a controller for populating the store -// while also providing event notifications. You should only used the returned -// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event -// notifications to be faulty. -// -// Parameters: -// * lw is list and watch functions for the source of the resource you want to -// be informed of. -// * objType is an object of the type that you expect to receive. -// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate -// calls, even if nothing changed). Otherwise, re-list will be delayed as -// long as possible (until the upstream source closes the watch or times out, -// or you stop the controller). -// * h is the object you want notifications sent to. -// -func NewInformer( - lw cache.ListerWatcher, - objType runtime.Object, - resyncPeriod time.Duration, - h ResourceEventHandler, -) (cache.Store, *Controller) { - // This will hold the client state, as we know it. - clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc) - - // This will hold incoming changes. Note how we pass clientState in as a - // KeyLister, that way resync operations will result in the correct set - // of update/delete deltas. - fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) - - cfg := &Config{ - Queue: fifo, - ListerWatcher: lw, - ObjectType: objType, - FullResyncPeriod: resyncPeriod, - RetryOnError: false, - - Process: func(obj interface{}) error { - // from oldest to newest - for _, d := range obj.(cache.Deltas) { - switch d.Type { - case cache.Sync, cache.Added, cache.Updated: - if old, exists, err := clientState.Get(d.Object); err == nil && exists { - if err := clientState.Update(d.Object); err != nil { - return err - } - h.OnUpdate(old, d.Object) - } else { - if err := clientState.Add(d.Object); err != nil { - return err - } - h.OnAdd(d.Object) - } - case cache.Deleted: - if err := clientState.Delete(d.Object); err != nil { - return err - } - h.OnDelete(d.Object) - } - } - return nil - }, - } - return clientState, New(cfg) -} - -// NewIndexerInformer returns a cache.Indexer and a controller for populating the index -// while also providing event notifications. You should only used the returned -// cache.Index for Get/List operations; Add/Modify/Deletes will cause the event -// notifications to be faulty. -// -// Parameters: -// * lw is list and watch functions for the source of the resource you want to -// be informed of. -// * objType is an object of the type that you expect to receive. -// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate -// calls, even if nothing changed). Otherwise, re-list will be delayed as -// long as possible (until the upstream source closes the watch or times out, -// or you stop the controller). -// * h is the object you want notifications sent to. -// -func NewIndexerInformer( - lw cache.ListerWatcher, - objType runtime.Object, - resyncPeriod time.Duration, - h ResourceEventHandler, - indexers cache.Indexers, -) (cache.Indexer, *Controller) { - // This will hold the client state, as we know it. - clientState := cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) - - // This will hold incoming changes. Note how we pass clientState in as a - // KeyLister, that way resync operations will result in the correct set - // of update/delete deltas. - fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) - - cfg := &Config{ - Queue: fifo, - ListerWatcher: lw, - ObjectType: objType, - FullResyncPeriod: resyncPeriod, - RetryOnError: false, - - Process: func(obj interface{}) error { - // from oldest to newest - for _, d := range obj.(cache.Deltas) { - switch d.Type { - case cache.Sync, cache.Added, cache.Updated: - if old, exists, err := clientState.Get(d.Object); err == nil && exists { - if err := clientState.Update(d.Object); err != nil { - return err - } - h.OnUpdate(old, d.Object) - } else { - if err := clientState.Add(d.Object); err != nil { - return err - } - h.OnAdd(d.Object) - } - case cache.Deleted: - if err := clientState.Delete(d.Object); err != nil { - return err - } - h.OnDelete(d.Object) - } - } - return nil - }, - } - return clientState, New(cfg) -} |