diff options
Diffstat (limited to 'kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go')
-rw-r--r-- | kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go b/kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go new file mode 100644 index 0000000..8cbd124 --- /dev/null +++ b/kube2msb/src/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go @@ -0,0 +1,326 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "sync" + "time" + + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" +) + +// Config contains all the settings for a Controller. +type Config struct { + // The queue for your objects; either a cache.FIFO or + // a cache.DeltaFIFO. Your Process() function should accept + // the output of this Oueue's Pop() method. + cache.Queue + + // Something that can list and watch your objects. + cache.ListerWatcher + + // Something that can process your objects. + Process ProcessFunc + + // The type of your objects. + ObjectType runtime.Object + + // Reprocess everything at least this often. + // Note that if it takes longer for you to clear the queue than this + // period, you will end up processing items in the order determined + // by cache.FIFO.Replace(). Currently, this is random. If this is a + // problem, we can change that replacement policy to append new + // things to the end of the queue instead of replacing the entire + // queue. + FullResyncPeriod time.Duration + + // If true, when Process() returns an error, re-enqueue the object. + // TODO: add interface to let you inject a delay/backoff or drop + // the object completely if desired. Pass the object in + // question to this interface as a parameter. + RetryOnError bool +} + +// ProcessFunc processes a single object. +type ProcessFunc func(obj interface{}) error + +// Controller is a generic controller framework. +type Controller struct { + config Config + reflector *cache.Reflector + reflectorMutex sync.RWMutex +} + +// TODO make the "Controller" private, and convert all references to use ControllerInterface instead +type ControllerInterface interface { + Run(stopCh <-chan struct{}) + HasSynced() bool +} + +// New makes a new Controller from the given Config. +func New(c *Config) *Controller { + ctlr := &Controller{ + config: *c, + } + return ctlr +} + +// Run begins processing items, and will continue until a value is sent down stopCh. +// It's an error to call Run more than once. +// Run blocks; call via go. +func (c *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + r := cache.NewReflector( + c.config.ListerWatcher, + c.config.ObjectType, + c.config.Queue, + c.config.FullResyncPeriod, + ) + + c.reflectorMutex.Lock() + c.reflector = r + c.reflectorMutex.Unlock() + + r.RunUntil(stopCh) + + wait.Until(c.processLoop, time.Second, stopCh) +} + +// Returns true once this controller has completed an initial resource listing +func (c *Controller) HasSynced() bool { + return c.config.Queue.HasSynced() +} + +// Requeue adds the provided object back into the queue if it does not already exist. +func (c *Controller) Requeue(obj interface{}) error { + return c.config.Queue.AddIfNotPresent(cache.Deltas{ + cache.Delta{ + Type: cache.Sync, + Object: obj, + }, + }) +} + +// processLoop drains the work queue. +// TODO: Consider doing the processing in parallel. This will require a little thought +// to make sure that we don't end up processing the same object multiple times +// concurrently. +func (c *Controller) processLoop() { + for { + obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process)) + if err != nil { + if c.config.RetryOnError { + // This is the safe way to re-enqueue. + c.config.Queue.AddIfNotPresent(obj) + } + } + } +} + +// ResourceEventHandler can handle notifications for events that happen to a +// resource. The events are informational only, so you can't return an +// error. +// * OnAdd is called when an object is added. +// * OnUpdate is called when an object is modified. Note that oldObj is the +// last known state of the object-- it is possible that several changes +// were combined together, so you can't use this to see every single +// change. OnUpdate is also called when a re-list happens, and it will +// get called even if nothing changed. This is useful for periodically +// evaluating or syncing something. +// * OnDelete will get the final state of the item if it is known, otherwise +// it will get an object of type cache.DeletedFinalStateUnknown. This can +// happen if the watch is closed and misses the delete event and we don't +// notice the deletion until the subsequent re-list. +type ResourceEventHandler interface { + OnAdd(obj interface{}) + OnUpdate(oldObj, newObj interface{}) + OnDelete(obj interface{}) +} + +// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or +// as few of the notification functions as you want while still implementing +// ResourceEventHandler. +type ResourceEventHandlerFuncs struct { + AddFunc func(obj interface{}) + UpdateFunc func(oldObj, newObj interface{}) + DeleteFunc func(obj interface{}) +} + +// OnAdd calls AddFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { + if r.AddFunc != nil { + r.AddFunc(obj) + } +} + +// OnUpdate calls UpdateFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { + if r.UpdateFunc != nil { + r.UpdateFunc(oldObj, newObj) + } +} + +// OnDelete calls DeleteFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { + if r.DeleteFunc != nil { + r.DeleteFunc(obj) + } +} + +// DeletionHandlingMetaNamespaceKeyFunc checks for +// cache.DeletedFinalStateUnknown objects before calling +// cache.MetaNamespaceKeyFunc. +func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + return d.Key, nil + } + return cache.MetaNamespaceKeyFunc(obj) +} + +// NewInformer returns a cache.Store and a controller for populating the store +// while also providing event notifications. You should only used the returned +// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event +// notifications to be faulty. +// +// Parameters: +// * lw is list and watch functions for the source of the resource you want to +// be informed of. +// * objType is an object of the type that you expect to receive. +// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate +// calls, even if nothing changed). Otherwise, re-list will be delayed as +// long as possible (until the upstream source closes the watch or times out, +// or you stop the controller). +// * h is the object you want notifications sent to. +// +func NewInformer( + lw cache.ListerWatcher, + objType runtime.Object, + resyncPeriod time.Duration, + h ResourceEventHandler, +) (cache.Store, *Controller) { + // This will hold the client state, as we know it. + clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc) + + // This will hold incoming changes. Note how we pass clientState in as a + // KeyLister, that way resync operations will result in the correct set + // of update/delete deltas. + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: lw, + ObjectType: objType, + FullResyncPeriod: resyncPeriod, + RetryOnError: false, + + Process: func(obj interface{}) error { + // from oldest to newest + for _, d := range obj.(cache.Deltas) { + switch d.Type { + case cache.Sync, cache.Added, cache.Updated: + if old, exists, err := clientState.Get(d.Object); err == nil && exists { + if err := clientState.Update(d.Object); err != nil { + return err + } + h.OnUpdate(old, d.Object) + } else { + if err := clientState.Add(d.Object); err != nil { + return err + } + h.OnAdd(d.Object) + } + case cache.Deleted: + if err := clientState.Delete(d.Object); err != nil { + return err + } + h.OnDelete(d.Object) + } + } + return nil + }, + } + return clientState, New(cfg) +} + +// NewIndexerInformer returns a cache.Indexer and a controller for populating the index +// while also providing event notifications. You should only used the returned +// cache.Index for Get/List operations; Add/Modify/Deletes will cause the event +// notifications to be faulty. +// +// Parameters: +// * lw is list and watch functions for the source of the resource you want to +// be informed of. +// * objType is an object of the type that you expect to receive. +// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate +// calls, even if nothing changed). Otherwise, re-list will be delayed as +// long as possible (until the upstream source closes the watch or times out, +// or you stop the controller). +// * h is the object you want notifications sent to. +// +func NewIndexerInformer( + lw cache.ListerWatcher, + objType runtime.Object, + resyncPeriod time.Duration, + h ResourceEventHandler, + indexers cache.Indexers, +) (cache.Indexer, *Controller) { + // This will hold the client state, as we know it. + clientState := cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) + + // This will hold incoming changes. Note how we pass clientState in as a + // KeyLister, that way resync operations will result in the correct set + // of update/delete deltas. + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: lw, + ObjectType: objType, + FullResyncPeriod: resyncPeriod, + RetryOnError: false, + + Process: func(obj interface{}) error { + // from oldest to newest + for _, d := range obj.(cache.Deltas) { + switch d.Type { + case cache.Sync, cache.Added, cache.Updated: + if old, exists, err := clientState.Get(d.Object); err == nil && exists { + if err := clientState.Update(d.Object); err != nil { + return err + } + h.OnUpdate(old, d.Object) + } else { + if err := clientState.Add(d.Object); err != nil { + return err + } + h.OnAdd(d.Object) + } + case cache.Deleted: + if err := clientState.Delete(d.Object); err != nil { + return err + } + h.OnDelete(d.Object) + } + } + return nil + }, + } + return clientState, New(cfg) +} |