aboutsummaryrefslogtreecommitdiffstats
path: root/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go')
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go326
1 files changed, 326 insertions, 0 deletions
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go
new file mode 100644
index 0000000..8cbd124
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/controller/framework/controller.go
@@ -0,0 +1,326 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package framework
+
+import (
+ "sync"
+ "time"
+
+ "k8s.io/kubernetes/pkg/client/cache"
+ "k8s.io/kubernetes/pkg/runtime"
+ utilruntime "k8s.io/kubernetes/pkg/util/runtime"
+ "k8s.io/kubernetes/pkg/util/wait"
+)
+
+// Config contains all the settings for a Controller.
+type Config struct {
+ // The queue for your objects; either a cache.FIFO or
+ // a cache.DeltaFIFO. Your Process() function should accept
+ // the output of this Oueue's Pop() method.
+ cache.Queue
+
+ // Something that can list and watch your objects.
+ cache.ListerWatcher
+
+ // Something that can process your objects.
+ Process ProcessFunc
+
+ // The type of your objects.
+ ObjectType runtime.Object
+
+ // Reprocess everything at least this often.
+ // Note that if it takes longer for you to clear the queue than this
+ // period, you will end up processing items in the order determined
+ // by cache.FIFO.Replace(). Currently, this is random. If this is a
+ // problem, we can change that replacement policy to append new
+ // things to the end of the queue instead of replacing the entire
+ // queue.
+ FullResyncPeriod time.Duration
+
+ // If true, when Process() returns an error, re-enqueue the object.
+ // TODO: add interface to let you inject a delay/backoff or drop
+ // the object completely if desired. Pass the object in
+ // question to this interface as a parameter.
+ RetryOnError bool
+}
+
+// ProcessFunc processes a single object.
+type ProcessFunc func(obj interface{}) error
+
+// Controller is a generic controller framework.
+type Controller struct {
+ config Config
+ reflector *cache.Reflector
+ reflectorMutex sync.RWMutex
+}
+
+// TODO make the "Controller" private, and convert all references to use ControllerInterface instead
+type ControllerInterface interface {
+ Run(stopCh <-chan struct{})
+ HasSynced() bool
+}
+
+// New makes a new Controller from the given Config.
+func New(c *Config) *Controller {
+ ctlr := &Controller{
+ config: *c,
+ }
+ return ctlr
+}
+
+// Run begins processing items, and will continue until a value is sent down stopCh.
+// It's an error to call Run more than once.
+// Run blocks; call via go.
+func (c *Controller) Run(stopCh <-chan struct{}) {
+ defer utilruntime.HandleCrash()
+ r := cache.NewReflector(
+ c.config.ListerWatcher,
+ c.config.ObjectType,
+ c.config.Queue,
+ c.config.FullResyncPeriod,
+ )
+
+ c.reflectorMutex.Lock()
+ c.reflector = r
+ c.reflectorMutex.Unlock()
+
+ r.RunUntil(stopCh)
+
+ wait.Until(c.processLoop, time.Second, stopCh)
+}
+
+// Returns true once this controller has completed an initial resource listing
+func (c *Controller) HasSynced() bool {
+ return c.config.Queue.HasSynced()
+}
+
+// Requeue adds the provided object back into the queue if it does not already exist.
+func (c *Controller) Requeue(obj interface{}) error {
+ return c.config.Queue.AddIfNotPresent(cache.Deltas{
+ cache.Delta{
+ Type: cache.Sync,
+ Object: obj,
+ },
+ })
+}
+
+// processLoop drains the work queue.
+// TODO: Consider doing the processing in parallel. This will require a little thought
+// to make sure that we don't end up processing the same object multiple times
+// concurrently.
+func (c *Controller) processLoop() {
+ for {
+ obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
+ if err != nil {
+ if c.config.RetryOnError {
+ // This is the safe way to re-enqueue.
+ c.config.Queue.AddIfNotPresent(obj)
+ }
+ }
+ }
+}
+
+// ResourceEventHandler can handle notifications for events that happen to a
+// resource. The events are informational only, so you can't return an
+// error.
+// * OnAdd is called when an object is added.
+// * OnUpdate is called when an object is modified. Note that oldObj is the
+// last known state of the object-- it is possible that several changes
+// were combined together, so you can't use this to see every single
+// change. OnUpdate is also called when a re-list happens, and it will
+// get called even if nothing changed. This is useful for periodically
+// evaluating or syncing something.
+// * OnDelete will get the final state of the item if it is known, otherwise
+// it will get an object of type cache.DeletedFinalStateUnknown. This can
+// happen if the watch is closed and misses the delete event and we don't
+// notice the deletion until the subsequent re-list.
+type ResourceEventHandler interface {
+ OnAdd(obj interface{})
+ OnUpdate(oldObj, newObj interface{})
+ OnDelete(obj interface{})
+}
+
+// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
+// as few of the notification functions as you want while still implementing
+// ResourceEventHandler.
+type ResourceEventHandlerFuncs struct {
+ AddFunc func(obj interface{})
+ UpdateFunc func(oldObj, newObj interface{})
+ DeleteFunc func(obj interface{})
+}
+
+// OnAdd calls AddFunc if it's not nil.
+func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
+ if r.AddFunc != nil {
+ r.AddFunc(obj)
+ }
+}
+
+// OnUpdate calls UpdateFunc if it's not nil.
+func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
+ if r.UpdateFunc != nil {
+ r.UpdateFunc(oldObj, newObj)
+ }
+}
+
+// OnDelete calls DeleteFunc if it's not nil.
+func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
+ if r.DeleteFunc != nil {
+ r.DeleteFunc(obj)
+ }
+}
+
+// DeletionHandlingMetaNamespaceKeyFunc checks for
+// cache.DeletedFinalStateUnknown objects before calling
+// cache.MetaNamespaceKeyFunc.
+func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
+ if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
+ return d.Key, nil
+ }
+ return cache.MetaNamespaceKeyFunc(obj)
+}
+
+// NewInformer returns a cache.Store and a controller for populating the store
+// while also providing event notifications. You should only used the returned
+// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event
+// notifications to be faulty.
+//
+// Parameters:
+// * lw is list and watch functions for the source of the resource you want to
+// be informed of.
+// * objType is an object of the type that you expect to receive.
+// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
+// calls, even if nothing changed). Otherwise, re-list will be delayed as
+// long as possible (until the upstream source closes the watch or times out,
+// or you stop the controller).
+// * h is the object you want notifications sent to.
+//
+func NewInformer(
+ lw cache.ListerWatcher,
+ objType runtime.Object,
+ resyncPeriod time.Duration,
+ h ResourceEventHandler,
+) (cache.Store, *Controller) {
+ // This will hold the client state, as we know it.
+ clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc)
+
+ // This will hold incoming changes. Note how we pass clientState in as a
+ // KeyLister, that way resync operations will result in the correct set
+ // of update/delete deltas.
+ fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)
+
+ cfg := &Config{
+ Queue: fifo,
+ ListerWatcher: lw,
+ ObjectType: objType,
+ FullResyncPeriod: resyncPeriod,
+ RetryOnError: false,
+
+ Process: func(obj interface{}) error {
+ // from oldest to newest
+ for _, d := range obj.(cache.Deltas) {
+ switch d.Type {
+ case cache.Sync, cache.Added, cache.Updated:
+ if old, exists, err := clientState.Get(d.Object); err == nil && exists {
+ if err := clientState.Update(d.Object); err != nil {
+ return err
+ }
+ h.OnUpdate(old, d.Object)
+ } else {
+ if err := clientState.Add(d.Object); err != nil {
+ return err
+ }
+ h.OnAdd(d.Object)
+ }
+ case cache.Deleted:
+ if err := clientState.Delete(d.Object); err != nil {
+ return err
+ }
+ h.OnDelete(d.Object)
+ }
+ }
+ return nil
+ },
+ }
+ return clientState, New(cfg)
+}
+
+// NewIndexerInformer returns a cache.Indexer and a controller for populating the index
+// while also providing event notifications. You should only used the returned
+// cache.Index for Get/List operations; Add/Modify/Deletes will cause the event
+// notifications to be faulty.
+//
+// Parameters:
+// * lw is list and watch functions for the source of the resource you want to
+// be informed of.
+// * objType is an object of the type that you expect to receive.
+// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
+// calls, even if nothing changed). Otherwise, re-list will be delayed as
+// long as possible (until the upstream source closes the watch or times out,
+// or you stop the controller).
+// * h is the object you want notifications sent to.
+//
+func NewIndexerInformer(
+ lw cache.ListerWatcher,
+ objType runtime.Object,
+ resyncPeriod time.Duration,
+ h ResourceEventHandler,
+ indexers cache.Indexers,
+) (cache.Indexer, *Controller) {
+ // This will hold the client state, as we know it.
+ clientState := cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
+
+ // This will hold incoming changes. Note how we pass clientState in as a
+ // KeyLister, that way resync operations will result in the correct set
+ // of update/delete deltas.
+ fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)
+
+ cfg := &Config{
+ Queue: fifo,
+ ListerWatcher: lw,
+ ObjectType: objType,
+ FullResyncPeriod: resyncPeriod,
+ RetryOnError: false,
+
+ Process: func(obj interface{}) error {
+ // from oldest to newest
+ for _, d := range obj.(cache.Deltas) {
+ switch d.Type {
+ case cache.Sync, cache.Added, cache.Updated:
+ if old, exists, err := clientState.Get(d.Object); err == nil && exists {
+ if err := clientState.Update(d.Object); err != nil {
+ return err
+ }
+ h.OnUpdate(old, d.Object)
+ } else {
+ if err := clientState.Add(d.Object); err != nil {
+ return err
+ }
+ h.OnAdd(d.Object)
+ }
+ case cache.Deleted:
+ if err := clientState.Delete(d.Object); err != nil {
+ return err
+ }
+ h.OnDelete(d.Object)
+ }
+ }
+ return nil
+ },
+ }
+ return clientState, New(cfg)
+}