aboutsummaryrefslogtreecommitdiffstats
path: root/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache
diff options
context:
space:
mode:
Diffstat (limited to 'src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache')
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go613
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/doc.go24
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache.go208
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache_fakes.go54
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fake_custom_store.go102
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fifo.go321
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/index.go82
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listers.go672
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listwatch.go86
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/reflector.go423
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/store.go240
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/thread_safe_store.go288
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/undelta_store.go83
13 files changed, 3196 insertions, 0 deletions
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go
new file mode 100644
index 0000000..5a7f4a9
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go
@@ -0,0 +1,613 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ "k8s.io/kubernetes/pkg/util/sets"
+
+ "github.com/golang/glog"
+)
+
+// NewDeltaFIFO returns a Store which can be used process changes to items.
+//
+// keyFunc is used to figure out what key an object should have. (It's
+// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
+//
+// 'compressor' may compress as many or as few items as it wants
+// (including returning an empty slice), but it should do what it
+// does quickly since it is called while the queue is locked.
+// 'compressor' may be nil if you don't want any delta compression.
+//
+// 'keyLister' is expected to return a list of keys that the consumer of
+// this queue "knows about". It is used to decide which items are missing
+// when Replace() is called; 'Deleted' deltas are produced for these items.
+// It may be nil if you don't need to detect all deletions.
+// TODO: consider merging keyLister with this object, tracking a list of
+// "known" keys when Pop() is called. Have to think about how that
+// affects error retrying.
+// TODO(lavalamp): I believe there is a possible race only when using an
+// external known object source that the above TODO would
+// fix.
+//
+// Also see the comment on DeltaFIFO.
+func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
+ f := &DeltaFIFO{
+ items: map[string]Deltas{},
+ queue: []string{},
+ keyFunc: keyFunc,
+ deltaCompressor: compressor,
+ knownObjects: knownObjects,
+ }
+ f.cond.L = &f.lock
+ return f
+}
+
+// DeltaFIFO is like FIFO, but allows you to process deletes.
+//
+// DeltaFIFO is a producer-consumer queue, where a Reflector is
+// intended to be the producer, and the consumer is whatever calls
+// the Pop() method.
+//
+// DeltaFIFO solves this use case:
+// * You want to process every object change (delta) at most once.
+// * When you process an object, you want to see everything
+// that's happened to it since you last processed it.
+// * You want to process the deletion of objects.
+// * You might want to periodically reprocess objects.
+//
+// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
+// interface{} to satisfy the Store/Queue interfaces, but it
+// will always return an object of type Deltas.
+//
+// A note on threading: If you call Pop() in parallel from multiple
+// threads, you could end up with multiple threads processing slightly
+// different versions of the same object.
+//
+// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
+// to list keys that are "known", for the purpose of figuring out which
+// items have been deleted when Replace() or Delete() are called. The deleted
+// object will be included in the DeleteFinalStateUnknown markers. These objects
+// could be stale.
+//
+// You may provide a function to compress deltas (e.g., represent a
+// series of Updates as a single Update).
+type DeltaFIFO struct {
+ // lock/cond protects access to 'items' and 'queue'.
+ lock sync.RWMutex
+ cond sync.Cond
+
+ // We depend on the property that items in the set are in
+ // the queue and vice versa, and that all Deltas in this
+ // map have at least one Delta.
+ items map[string]Deltas
+ queue []string
+
+ // populated is true if the first batch of items inserted by Replace() has been populated
+ // or Delete/Add/Update was called first.
+ populated bool
+ // initialPopulationCount is the number of items inserted by the first call of Replace()
+ initialPopulationCount int
+
+ // keyFunc is used to make the key used for queued item
+ // insertion and retrieval, and should be deterministic.
+ keyFunc KeyFunc
+
+ // deltaCompressor tells us how to combine two or more
+ // deltas. It may be nil.
+ deltaCompressor DeltaCompressor
+
+ // knownObjects list keys that are "known", for the
+ // purpose of figuring out which items have been deleted
+ // when Replace() or Delete() is called.
+ knownObjects KeyListerGetter
+}
+
+var (
+ _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
+)
+
+var (
+ // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
+ // object with zero length is encountered (should be impossible,
+ // even if such an object is accidentally produced by a DeltaCompressor--
+ // but included for completeness).
+ ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
+)
+
+// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
+// DeletedFinalStateUnknown objects.
+func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
+ if d, ok := obj.(Deltas); ok {
+ if len(d) == 0 {
+ return "", KeyError{obj, ErrZeroLengthDeltasObject}
+ }
+ obj = d.Newest().Object
+ }
+ if d, ok := obj.(DeletedFinalStateUnknown); ok {
+ return d.Key, nil
+ }
+ return f.keyFunc(obj)
+}
+
+// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
+// or an Update called first but the first batch of items inserted by Replace() has been popped
+func (f *DeltaFIFO) HasSynced() bool {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ return f.populated && f.initialPopulationCount == 0
+}
+
+// Add inserts an item, and puts it in the queue. The item is only enqueued
+// if it doesn't already exist in the set.
+func (f *DeltaFIFO) Add(obj interface{}) error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ return f.queueActionLocked(Added, obj)
+}
+
+// Update is just like Add, but makes an Updated Delta.
+func (f *DeltaFIFO) Update(obj interface{}) error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ return f.queueActionLocked(Updated, obj)
+}
+
+// Delete is just like Add, but makes an Deleted Delta. If the item does not
+// already exist, it will be ignored. (It may have already been deleted by a
+// Replace (re-list), for example.
+func (f *DeltaFIFO) Delete(obj interface{}) error {
+ id, err := f.KeyOf(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ if f.knownObjects == nil {
+ if _, exists := f.items[id]; !exists {
+ // Presumably, this was deleted when a relist happened.
+ // Don't provide a second report of the same deletion.
+ return nil
+ }
+ } else {
+ // We only want to skip the "deletion" action if the object doesn't
+ // exist in knownObjects and it doesn't have corresponding item in items.
+ // Note that even if there is a "deletion" action in items, we can ignore it,
+ // because it will be deduped automatically in "queueActionLocked"
+ _, exists, err := f.knownObjects.GetByKey(id)
+ _, itemsExist := f.items[id]
+ if err == nil && !exists && !itemsExist {
+ // Presumably, this was deleted when a relist happened.
+ // Don't provide a second report of the same deletion.
+ // TODO(lavalamp): This may be racy-- we aren't properly locked
+ // with knownObjects.
+ return nil
+ }
+ }
+
+ return f.queueActionLocked(Deleted, obj)
+}
+
+// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
+// present in the set, it is neither enqueued nor added to the set.
+//
+// This is useful in a single producer/consumer scenario so that the consumer can
+// safely retry items without contending with the producer and potentially enqueueing
+// stale items.
+//
+// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
+// different from the Add/Update/Delete functions.
+func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
+ deltas, ok := obj.(Deltas)
+ if !ok {
+ return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
+ }
+ id, err := f.KeyOf(deltas.Newest().Object)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.addIfNotPresent(id, deltas)
+ return nil
+}
+
+// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
+// already holds the fifo lock.
+func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
+ f.populated = true
+ if _, exists := f.items[id]; exists {
+ return
+ }
+
+ f.queue = append(f.queue, id)
+ f.items[id] = deltas
+ f.cond.Broadcast()
+}
+
+// re-listing and watching can deliver the same update multiple times in any
+// order. This will combine the most recent two deltas if they are the same.
+func dedupDeltas(deltas Deltas) Deltas {
+ n := len(deltas)
+ if n < 2 {
+ return deltas
+ }
+ a := &deltas[n-1]
+ b := &deltas[n-2]
+ if out := isDup(a, b); out != nil {
+ d := append(Deltas{}, deltas[:n-2]...)
+ return append(d, *out)
+ }
+ return deltas
+}
+
+// If a & b represent the same event, returns the delta that ought to be kept.
+// Otherwise, returns nil.
+// TODO: is there anything other than deletions that need deduping?
+func isDup(a, b *Delta) *Delta {
+ if out := isDeletionDup(a, b); out != nil {
+ return out
+ }
+ // TODO: Detect other duplicate situations? Are there any?
+ return nil
+}
+
+// keep the one with the most information if both are deletions.
+func isDeletionDup(a, b *Delta) *Delta {
+ if b.Type != Deleted || a.Type != Deleted {
+ return nil
+ }
+ // Do more sophisticated checks, or is this sufficient?
+ if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
+ return a
+ }
+ return b
+}
+
+// willObjectBeDeletedLocked returns true only if the last delta for the
+// given object is Delete. Caller must lock first.
+func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
+ deltas := f.items[id]
+ return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
+}
+
+// queueActionLocked appends to the delta list for the object, calling
+// f.deltaCompressor if needed. Caller must lock first.
+func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
+ id, err := f.KeyOf(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+
+ // If object is supposed to be deleted (last event is Deleted),
+ // then we should ignore Sync events, because it would result in
+ // recreation of this object.
+ if actionType == Sync && f.willObjectBeDeletedLocked(id) {
+ return nil
+ }
+
+ newDeltas := append(f.items[id], Delta{actionType, obj})
+ newDeltas = dedupDeltas(newDeltas)
+ if f.deltaCompressor != nil {
+ newDeltas = f.deltaCompressor.Compress(newDeltas)
+ }
+
+ _, exists := f.items[id]
+ if len(newDeltas) > 0 {
+ if !exists {
+ f.queue = append(f.queue, id)
+ }
+ f.items[id] = newDeltas
+ f.cond.Broadcast()
+ } else if exists {
+ // The compression step removed all deltas, so
+ // we need to remove this from our map (extra items
+ // in the queue are ignored if they are not in the
+ // map).
+ delete(f.items, id)
+ }
+ return nil
+}
+
+// List returns a list of all the items; it returns the object
+// from the most recent Delta.
+// You should treat the items returned inside the deltas as immutable.
+func (f *DeltaFIFO) List() []interface{} {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ return f.listLocked()
+}
+
+func (f *DeltaFIFO) listLocked() []interface{} {
+ list := make([]interface{}, 0, len(f.items))
+ for _, item := range f.items {
+ // Copy item's slice so operations on this slice (delta
+ // compression) won't interfere with the object we return.
+ item = copyDeltas(item)
+ list = append(list, item.Newest().Object)
+ }
+ return list
+}
+
+// ListKeys returns a list of all the keys of the objects currently
+// in the FIFO.
+func (f *DeltaFIFO) ListKeys() []string {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ list := make([]string, 0, len(f.items))
+ for key := range f.items {
+ list = append(list, key)
+ }
+ return list
+}
+
+// Get returns the complete list of deltas for the requested item,
+// or sets exists=false.
+// You should treat the items returned inside the deltas as immutable.
+func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
+ key, err := f.KeyOf(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ return f.GetByKey(key)
+}
+
+// GetByKey returns the complete list of deltas for the requested item,
+// setting exists=false if that list is empty.
+// You should treat the items returned inside the deltas as immutable.
+func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ d, exists := f.items[key]
+ if exists {
+ // Copy item's slice so operations on this slice (delta
+ // compression) won't interfere with the object we return.
+ d = copyDeltas(d)
+ }
+ return d, exists, nil
+}
+
+// Pop blocks until an item is added to the queue, and then returns it. If
+// multiple items are ready, they are returned in the order in which they were
+// added/updated. The item is removed from the queue (and the store) before it
+// is returned, so if you don't successfully process it, you need to add it back
+// with AddIfNotPresent().
+// process function is called under lock, so it is safe update data structures
+// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
+// may return an instance of ErrRequeue with a nested error to indicate the current
+// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
+//
+// Pop returns a 'Deltas', which has a complete list of all the things
+// that happened to the object (deltas) while it was sitting in the queue.
+func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ for {
+ for len(f.queue) == 0 {
+ f.cond.Wait()
+ }
+ id := f.queue[0]
+ f.queue = f.queue[1:]
+ item, ok := f.items[id]
+ if f.initialPopulationCount > 0 {
+ f.initialPopulationCount--
+ }
+ if !ok {
+ // Item may have been deleted subsequently.
+ continue
+ }
+ delete(f.items, id)
+ err := process(item)
+ if e, ok := err.(ErrRequeue); ok {
+ f.addIfNotPresent(id, item)
+ err = e.Err
+ }
+ // Don't need to copyDeltas here, because we're transferring
+ // ownership to the caller.
+ return item, err
+ }
+}
+
+// Replace will delete the contents of 'f', using instead the given map.
+// 'f' takes ownership of the map, you should not reference the map again
+// after calling this function. f's queue is reset, too; upon return, it
+// will contain the items in the map, in no particular order.
+func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ keys := make(sets.String, len(list))
+
+ if !f.populated {
+ f.populated = true
+ f.initialPopulationCount = len(list)
+ }
+
+ for _, item := range list {
+ key, err := f.KeyOf(item)
+ if err != nil {
+ return KeyError{item, err}
+ }
+ keys.Insert(key)
+ if err := f.queueActionLocked(Sync, item); err != nil {
+ return fmt.Errorf("couldn't enqueue object: %v", err)
+ }
+ }
+
+ if f.knownObjects == nil {
+ // Do deletion detection against our own list.
+ for k, oldItem := range f.items {
+ if keys.Has(k) {
+ continue
+ }
+ var deletedObj interface{}
+ if n := oldItem.Newest(); n != nil {
+ deletedObj = n.Object
+ }
+ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+
+ // Detect deletions not already in the queue.
+ // TODO(lavalamp): This may be racy-- we aren't properly locked
+ // with knownObjects. Unproven.
+ knownKeys := f.knownObjects.ListKeys()
+ for _, k := range knownKeys {
+ if keys.Has(k) {
+ continue
+ }
+
+ deletedObj, exists, err := f.knownObjects.GetByKey(k)
+ if err != nil {
+ deletedObj = nil
+ glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
+ } else if !exists {
+ deletedObj = nil
+ glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
+ }
+ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Resync will send a sync event for each item
+func (f *DeltaFIFO) Resync() error {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ for _, k := range f.knownObjects.ListKeys() {
+ obj, exists, err := f.knownObjects.GetByKey(k)
+ if err != nil {
+ glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, k)
+ continue
+ } else if !exists {
+ glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", k)
+ continue
+ }
+
+ if err := f.queueActionLocked(Sync, obj); err != nil {
+ return fmt.Errorf("couldn't queue object: %v", err)
+ }
+ }
+ return nil
+}
+
+// A KeyListerGetter is anything that knows how to list its keys and look up by key.
+type KeyListerGetter interface {
+ KeyLister
+ KeyGetter
+}
+
+// A KeyLister is anything that knows how to list its keys.
+type KeyLister interface {
+ ListKeys() []string
+}
+
+// A KeyGetter is anything that knows how to get the value stored under a given key.
+type KeyGetter interface {
+ GetByKey(key string) (interface{}, bool, error)
+}
+
+// DeltaCompressor is an algorithm that removes redundant changes.
+type DeltaCompressor interface {
+ Compress(Deltas) Deltas
+}
+
+// DeltaCompressorFunc should remove redundant changes; but changes that
+// are redundant depend on one's desired semantics, so this is an
+// injectable function.
+//
+// DeltaCompressorFunc adapts a raw function to be a DeltaCompressor.
+type DeltaCompressorFunc func(Deltas) Deltas
+
+// Compress just calls dc.
+func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas {
+ return dc(d)
+}
+
+// DeltaType is the type of a change (addition, deletion, etc)
+type DeltaType string
+
+const (
+ Added DeltaType = "Added"
+ Updated DeltaType = "Updated"
+ Deleted DeltaType = "Deleted"
+ // The other types are obvious. You'll get Sync deltas when:
+ // * A watch expires/errors out and a new list/watch cycle is started.
+ // * You've turned on periodic syncs.
+ // (Anything that trigger's DeltaFIFO's Replace() method.)
+ Sync DeltaType = "Sync"
+)
+
+// Delta is the type stored by a DeltaFIFO. It tells you what change
+// happened, and the object's state after* that change.
+//
+// [*] Unless the change is a deletion, and then you'll get the final
+// state of the object before it was deleted.
+type Delta struct {
+ Type DeltaType
+ Object interface{}
+}
+
+// Deltas is a list of one or more 'Delta's to an individual object.
+// The oldest delta is at index 0, the newest delta is the last one.
+type Deltas []Delta
+
+// Oldest is a convenience function that returns the oldest delta, or
+// nil if there are no deltas.
+func (d Deltas) Oldest() *Delta {
+ if len(d) > 0 {
+ return &d[0]
+ }
+ return nil
+}
+
+// Newest is a convenience function that returns the newest delta, or
+// nil if there are no deltas.
+func (d Deltas) Newest() *Delta {
+ if n := len(d); n > 0 {
+ return &d[n-1]
+ }
+ return nil
+}
+
+// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
+// the objects in the slice. This allows Get/List to return an object that we
+// know won't be clobbered by a subsequent call to a delta compressor.
+func copyDeltas(d Deltas) Deltas {
+ d2 := make(Deltas, len(d))
+ copy(d2, d)
+ return d2
+}
+
+// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
+// an object was deleted but the watch deletion event was missed. In this
+// case we don't know the final "resting" state of the object, so there's
+// a chance the included `Obj` is stale.
+type DeletedFinalStateUnknown struct {
+ Key string
+ Obj interface{}
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/doc.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/doc.go
new file mode 100644
index 0000000..4f593f0
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/doc.go
@@ -0,0 +1,24 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package cache is a client-side caching mechanism. It is useful for
+// reducing the number of server calls you'd otherwise need to make.
+// Reflector watches a server and updates a Store. Two stores are provided;
+// one that simply caches objects (for example, to allow a scheduler to
+// list currently available nodes), and one that additionally acts as
+// a FIFO queue (for example, to allow a scheduler to process incoming
+// pods).
+package cache
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache.go
new file mode 100644
index 0000000..8c5c470
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache.go
@@ -0,0 +1,208 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "sync"
+ "time"
+
+ "github.com/golang/glog"
+ "k8s.io/kubernetes/pkg/util"
+)
+
+// ExpirationCache implements the store interface
+// 1. All entries are automatically time stamped on insert
+// a. The key is computed based off the original item/keyFunc
+// b. The value inserted under that key is the timestamped item
+// 2. Expiration happens lazily on read based on the expiration policy
+// a. No item can be inserted into the store while we're expiring
+// *any* item in the cache.
+// 3. Time-stamps are stripped off unexpired entries before return
+// Note that the ExpirationCache is inherently slower than a normal
+// threadSafeStore because it takes a write lock every time it checks if
+// an item has expired.
+type ExpirationCache struct {
+ cacheStorage ThreadSafeStore
+ keyFunc KeyFunc
+ clock util.Clock
+ expirationPolicy ExpirationPolicy
+ // expirationLock is a write lock used to guarantee that we don't clobber
+ // newly inserted objects because of a stale expiration timestamp comparison
+ expirationLock sync.Mutex
+}
+
+// ExpirationPolicy dictates when an object expires. Currently only abstracted out
+// so unittests don't rely on the system clock.
+type ExpirationPolicy interface {
+ IsExpired(obj *timestampedEntry) bool
+}
+
+// TTLPolicy implements a ttl based ExpirationPolicy.
+type TTLPolicy struct {
+ // >0: Expire entries with an age > ttl
+ // <=0: Don't expire any entry
+ Ttl time.Duration
+
+ // Clock used to calculate ttl expiration
+ Clock util.Clock
+}
+
+// IsExpired returns true if the given object is older than the ttl, or it can't
+// determine its age.
+func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool {
+ return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl
+}
+
+// timestampedEntry is the only type allowed in a ExpirationCache.
+type timestampedEntry struct {
+ obj interface{}
+ timestamp time.Time
+}
+
+// getTimestampedEntry returnes the timestampedEntry stored under the given key.
+func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
+ item, _ := c.cacheStorage.Get(key)
+ if tsEntry, ok := item.(*timestampedEntry); ok {
+ return tsEntry, true
+ }
+ return nil, false
+}
+
+// getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
+// already expired. It holds a write lock across deletion.
+func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
+ // Prevent all inserts from the time we deem an item as "expired" to when we
+ // delete it, so an un-expired item doesn't sneak in under the same key, just
+ // before the Delete.
+ c.expirationLock.Lock()
+ defer c.expirationLock.Unlock()
+ timestampedItem, exists := c.getTimestampedEntry(key)
+ if !exists {
+ return nil, false
+ }
+ if c.expirationPolicy.IsExpired(timestampedItem) {
+ glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
+ c.cacheStorage.Delete(key)
+ return nil, false
+ }
+ return timestampedItem.obj, true
+}
+
+// GetByKey returns the item stored under the key, or sets exists=false.
+func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error) {
+ obj, exists := c.getOrExpire(key)
+ return obj, exists, nil
+}
+
+// Get returns unexpired items. It purges the cache of expired items in the
+// process.
+func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error) {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ obj, exists := c.getOrExpire(key)
+ return obj, exists, nil
+}
+
+// List retrieves a list of unexpired items. It purges the cache of expired
+// items in the process.
+func (c *ExpirationCache) List() []interface{} {
+ items := c.cacheStorage.List()
+
+ list := make([]interface{}, 0, len(items))
+ for _, item := range items {
+ obj := item.(*timestampedEntry).obj
+ if key, err := c.keyFunc(obj); err != nil {
+ list = append(list, obj)
+ } else if obj, exists := c.getOrExpire(key); exists {
+ list = append(list, obj)
+ }
+ }
+ return list
+}
+
+// ListKeys returns a list of all keys in the expiration cache.
+func (c *ExpirationCache) ListKeys() []string {
+ return c.cacheStorage.ListKeys()
+}
+
+// Add timestamps an item and inserts it into the cache, overwriting entries
+// that might exist under the same key.
+func (c *ExpirationCache) Add(obj interface{}) error {
+ c.expirationLock.Lock()
+ defer c.expirationLock.Unlock()
+
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Add(key, &timestampedEntry{obj, c.clock.Now()})
+ return nil
+}
+
+// Update has not been implemented yet for lack of a use case, so this method
+// simply calls `Add`. This effectively refreshes the timestamp.
+func (c *ExpirationCache) Update(obj interface{}) error {
+ return c.Add(obj)
+}
+
+// Delete removes an item from the cache.
+func (c *ExpirationCache) Delete(obj interface{}) error {
+ c.expirationLock.Lock()
+ defer c.expirationLock.Unlock()
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Delete(key)
+ return nil
+}
+
+// Replace will convert all items in the given list to TimestampedEntries
+// before attempting the replace operation. The replace operation will
+// delete the contents of the ExpirationCache `c`.
+func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
+ c.expirationLock.Lock()
+ defer c.expirationLock.Unlock()
+ items := map[string]interface{}{}
+ ts := c.clock.Now()
+ for _, item := range list {
+ key, err := c.keyFunc(item)
+ if err != nil {
+ return KeyError{item, err}
+ }
+ items[key] = &timestampedEntry{item, ts}
+ }
+ c.cacheStorage.Replace(items, resourceVersion)
+ return nil
+}
+
+// Resync will touch all objects to put them into the processing queue
+func (c *ExpirationCache) Resync() error {
+ return c.cacheStorage.Resync()
+}
+
+// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
+func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
+ return &ExpirationCache{
+ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
+ keyFunc: keyFunc,
+ clock: util.RealClock{},
+ expirationPolicy: &TTLPolicy{ttl, util.RealClock{}},
+ }
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache_fakes.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache_fakes.go
new file mode 100644
index 0000000..eb1d535
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache_fakes.go
@@ -0,0 +1,54 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "k8s.io/kubernetes/pkg/util"
+ "k8s.io/kubernetes/pkg/util/sets"
+)
+
+type fakeThreadSafeMap struct {
+ ThreadSafeStore
+ deletedKeys chan<- string
+}
+
+func (c *fakeThreadSafeMap) Delete(key string) {
+ if c.deletedKeys != nil {
+ c.ThreadSafeStore.Delete(key)
+ c.deletedKeys <- key
+ }
+}
+
+type FakeExpirationPolicy struct {
+ NeverExpire sets.String
+ RetrieveKeyFunc KeyFunc
+}
+
+func (p *FakeExpirationPolicy) IsExpired(obj *timestampedEntry) bool {
+ key, _ := p.RetrieveKeyFunc(obj)
+ return !p.NeverExpire.Has(key)
+}
+
+func NewFakeExpirationStore(keyFunc KeyFunc, deletedKeys chan<- string, expirationPolicy ExpirationPolicy, cacheClock util.Clock) Store {
+ cacheStorage := NewThreadSafeStore(Indexers{}, Indices{})
+ return &ExpirationCache{
+ cacheStorage: &fakeThreadSafeMap{cacheStorage, deletedKeys},
+ keyFunc: keyFunc,
+ clock: cacheClock,
+ expirationPolicy: expirationPolicy,
+ }
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fake_custom_store.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fake_custom_store.go
new file mode 100644
index 0000000..8d71c24
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fake_custom_store.go
@@ -0,0 +1,102 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+// FakeStore lets you define custom functions for store operations
+type FakeCustomStore struct {
+ AddFunc func(obj interface{}) error
+ UpdateFunc func(obj interface{}) error
+ DeleteFunc func(obj interface{}) error
+ ListFunc func() []interface{}
+ ListKeysFunc func() []string
+ GetFunc func(obj interface{}) (item interface{}, exists bool, err error)
+ GetByKeyFunc func(key string) (item interface{}, exists bool, err error)
+ ReplaceFunc func(list []interface{}, resourceVerion string) error
+ ResyncFunc func() error
+}
+
+// Add calls the custom Add function if defined
+func (f *FakeCustomStore) Add(obj interface{}) error {
+ if f.AddFunc != nil {
+ return f.AddFunc(obj)
+ }
+ return nil
+}
+
+// Update calls the custom Update function if defined
+func (f *FakeCustomStore) Update(obj interface{}) error {
+ if f.UpdateFunc != nil {
+ return f.Update(obj)
+ }
+ return nil
+}
+
+// Delete calls the custom Delete function if defined
+func (f *FakeCustomStore) Delete(obj interface{}) error {
+ if f.DeleteFunc != nil {
+ return f.DeleteFunc(obj)
+ }
+ return nil
+}
+
+// List calls the custom List function if defined
+func (f *FakeCustomStore) List() []interface{} {
+ if f.ListFunc != nil {
+ return f.ListFunc()
+ }
+ return nil
+}
+
+// ListKeys calls the custom ListKeys function if defined
+func (f *FakeCustomStore) ListKeys() []string {
+ if f.ListKeysFunc != nil {
+ return f.ListKeysFunc()
+ }
+ return nil
+}
+
+// Get calls the custom Get function if defined
+func (f *FakeCustomStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
+ if f.GetFunc != nil {
+ return f.GetFunc(obj)
+ }
+ return nil, false, nil
+}
+
+// GetByKey calls the custom GetByKey function if defined
+func (f *FakeCustomStore) GetByKey(key string) (item interface{}, exists bool, err error) {
+ if f.GetByKeyFunc != nil {
+ return f.GetByKeyFunc(key)
+ }
+ return nil, false, nil
+}
+
+// Replace calls the custom Replace function if defined
+func (f *FakeCustomStore) Replace(list []interface{}, resourceVersion string) error {
+ if f.ReplaceFunc != nil {
+ return f.ReplaceFunc(list, resourceVersion)
+ }
+ return nil
+}
+
+// Resync calls the custom Resync function if defined
+func (f *FakeCustomStore) Resync() error {
+ if f.ResyncFunc != nil {
+ return f.ResyncFunc()
+ }
+ return nil
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fifo.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fifo.go
new file mode 100644
index 0000000..a6d5e0a
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fifo.go
@@ -0,0 +1,321 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "sync"
+
+ "k8s.io/kubernetes/pkg/util/sets"
+)
+
+// PopProcessFunc is passed to Pop() method of Queue interface.
+// It is supposed to process the element popped from the queue.
+type PopProcessFunc func(interface{}) error
+
+// ErrRequeue may be returned by a PopProcessFunc to safely requeue
+// the current item. The value of Err will be returned from Pop.
+type ErrRequeue struct {
+ // Err is returned by the Pop function
+ Err error
+}
+
+func (e ErrRequeue) Error() string {
+ if e.Err == nil {
+ return "the popped item should be requeued without returning an error"
+ }
+ return e.Err.Error()
+}
+
+// Queue is exactly like a Store, but has a Pop() method too.
+type Queue interface {
+ Store
+
+ // Pop blocks until it has something to process.
+ // It returns the object that was process and the result of processing.
+ // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
+ // should be requeued before releasing the lock on the queue.
+ Pop(PopProcessFunc) (interface{}, error)
+
+ // AddIfNotPresent adds a value previously
+ // returned by Pop back into the queue as long
+ // as nothing else (presumably more recent)
+ // has since been added.
+ AddIfNotPresent(interface{}) error
+
+ // Return true if the first batch of items has been popped
+ HasSynced() bool
+}
+
+// Helper function for popping from Queue.
+// WARNING: Do NOT use this function in non-test code to avoid races
+// unless you really really really really know what you are doing.
+func Pop(queue Queue) interface{} {
+ var result interface{}
+ queue.Pop(func(obj interface{}) error {
+ result = obj
+ return nil
+ })
+ return result
+}
+
+// FIFO receives adds and updates from a Reflector, and puts them in a queue for
+// FIFO order processing. If multiple adds/updates of a single item happen while
+// an item is in the queue before it has been processed, it will only be
+// processed once, and when it is processed, the most recent version will be
+// processed. This can't be done with a channel.
+//
+// FIFO solves this use case:
+// * You want to process every object (exactly) once.
+// * You want to process the most recent version of the object when you process it.
+// * You do not want to process deleted objects, they should be removed from the queue.
+// * You do not want to periodically reprocess objects.
+// Compare with DeltaFIFO for other use cases.
+type FIFO struct {
+ lock sync.RWMutex
+ cond sync.Cond
+ // We depend on the property that items in the set are in the queue and vice versa.
+ items map[string]interface{}
+ queue []string
+
+ // populated is true if the first batch of items inserted by Replace() has been populated
+ // or Delete/Add/Update was called first.
+ populated bool
+ // initialPopulationCount is the number of items inserted by the first call of Replace()
+ initialPopulationCount int
+
+ // keyFunc is used to make the key used for queued item insertion and retrieval, and
+ // should be deterministic.
+ keyFunc KeyFunc
+}
+
+var (
+ _ = Queue(&FIFO{}) // FIFO is a Queue
+)
+
+// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
+// or an Update called first but the first batch of items inserted by Replace() has been popped
+func (f *FIFO) HasSynced() bool {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ return f.populated && f.initialPopulationCount == 0
+}
+
+// Add inserts an item, and puts it in the queue. The item is only enqueued
+// if it doesn't already exist in the set.
+func (f *FIFO) Add(obj interface{}) error {
+ id, err := f.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ if _, exists := f.items[id]; !exists {
+ f.queue = append(f.queue, id)
+ }
+ f.items[id] = obj
+ f.cond.Broadcast()
+ return nil
+}
+
+// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
+// present in the set, it is neither enqueued nor added to the set.
+//
+// This is useful in a single producer/consumer scenario so that the consumer can
+// safely retry items without contending with the producer and potentially enqueueing
+// stale items.
+func (f *FIFO) AddIfNotPresent(obj interface{}) error {
+ id, err := f.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.addIfNotPresent(id, obj)
+ return nil
+}
+
+// addIfNotPresent assumes the fifo lock is already held and adds the the provided
+// item to the queue under id if it does not already exist.
+func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
+ f.populated = true
+ if _, exists := f.items[id]; exists {
+ return
+ }
+
+ f.queue = append(f.queue, id)
+ f.items[id] = obj
+ f.cond.Broadcast()
+}
+
+// Update is the same as Add in this implementation.
+func (f *FIFO) Update(obj interface{}) error {
+ return f.Add(obj)
+}
+
+// Delete removes an item. It doesn't add it to the queue, because
+// this implementation assumes the consumer only cares about the objects,
+// not the order in which they were created/added.
+func (f *FIFO) Delete(obj interface{}) error {
+ id, err := f.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.populated = true
+ delete(f.items, id)
+ return err
+}
+
+// List returns a list of all the items.
+func (f *FIFO) List() []interface{} {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ list := make([]interface{}, 0, len(f.items))
+ for _, item := range f.items {
+ list = append(list, item)
+ }
+ return list
+}
+
+// ListKeys returns a list of all the keys of the objects currently
+// in the FIFO.
+func (f *FIFO) ListKeys() []string {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ list := make([]string, 0, len(f.items))
+ for key := range f.items {
+ list = append(list, key)
+ }
+ return list
+}
+
+// Get returns the requested item, or sets exists=false.
+func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
+ key, err := f.keyFunc(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ return f.GetByKey(key)
+}
+
+// GetByKey returns the requested item, or sets exists=false.
+func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ item, exists = f.items[key]
+ return item, exists, nil
+}
+
+// Pop waits until an item is ready and processes it. If multiple items are
+// ready, they are returned in the order in which they were added/updated.
+// The item is removed from the queue (and the store) before it is processed,
+// so if you don't successfully process it, it should be added back with
+// AddIfNotPresent(). process function is called under lock, so it is safe
+// update data structures in it that need to be in sync with the queue.
+func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ for {
+ for len(f.queue) == 0 {
+ f.cond.Wait()
+ }
+ id := f.queue[0]
+ f.queue = f.queue[1:]
+ if f.initialPopulationCount > 0 {
+ f.initialPopulationCount--
+ }
+ item, ok := f.items[id]
+ if !ok {
+ // Item may have been deleted subsequently.
+ continue
+ }
+ delete(f.items, id)
+ err := process(item)
+ if e, ok := err.(ErrRequeue); ok {
+ f.addIfNotPresent(id, item)
+ err = e.Err
+ }
+ return item, err
+ }
+}
+
+// Replace will delete the contents of 'f', using instead the given map.
+// 'f' takes ownership of the map, you should not reference the map again
+// after calling this function. f's queue is reset, too; upon return, it
+// will contain the items in the map, in no particular order.
+func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
+ items := map[string]interface{}{}
+ for _, item := range list {
+ key, err := f.keyFunc(item)
+ if err != nil {
+ return KeyError{item, err}
+ }
+ items[key] = item
+ }
+
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if !f.populated {
+ f.populated = true
+ f.initialPopulationCount = len(items)
+ }
+
+ f.items = items
+ f.queue = f.queue[:0]
+ for id := range items {
+ f.queue = append(f.queue, id)
+ }
+ if len(f.queue) > 0 {
+ f.cond.Broadcast()
+ }
+ return nil
+}
+
+// Resync will touch all objects to put them into the processing queue
+func (f *FIFO) Resync() error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ inQueue := sets.NewString()
+ for _, id := range f.queue {
+ inQueue.Insert(id)
+ }
+ for id := range f.items {
+ if !inQueue.Has(id) {
+ f.queue = append(f.queue, id)
+ }
+ }
+ if len(f.queue) > 0 {
+ f.cond.Broadcast()
+ }
+ return nil
+}
+
+// NewFIFO returns a Store which can be used to queue up items to
+// process.
+func NewFIFO(keyFunc KeyFunc) *FIFO {
+ f := &FIFO{
+ items: map[string]interface{}{},
+ queue: []string{},
+ keyFunc: keyFunc,
+ }
+ f.cond.L = &f.lock
+ return f
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/index.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/index.go
new file mode 100644
index 0000000..4379880
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/index.go
@@ -0,0 +1,82 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+
+ "k8s.io/kubernetes/pkg/api/meta"
+ "k8s.io/kubernetes/pkg/util/sets"
+)
+
+// Indexer is a storage interface that lets you list objects using multiple indexing functions
+type Indexer interface {
+ Store
+ // Retrieve list of objects that match on the named indexing function
+ Index(indexName string, obj interface{}) ([]interface{}, error)
+ // ListIndexFuncValues returns the list of generated values of an Index func
+ ListIndexFuncValues(indexName string) []string
+ // ByIndex lists object that match on the named indexing function with the exact key
+ ByIndex(indexName, indexKey string) ([]interface{}, error)
+ // GetIndexer return the indexers
+ GetIndexers() Indexers
+
+ // AddIndexers adds more indexers to this store. If you call this after you already have data
+ // in the store, the results are undefined.
+ AddIndexers(newIndexers Indexers) error
+}
+
+// IndexFunc knows how to provide an indexed value for an object.
+type IndexFunc func(obj interface{}) ([]string, error)
+
+// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc. This is only useful if your index function returns
+// unique values for every object. This is conversion can create errors when more than one key is found. You
+// should prefer to make proper key and index functions.
+func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
+ return func(obj interface{}) (string, error) {
+ indexKeys, err := indexFunc(obj)
+ if err != nil {
+ return "", err
+ }
+ if len(indexKeys) > 1 {
+ return "", fmt.Errorf("too many keys: %v", indexKeys)
+ }
+ return indexKeys[0], nil
+ }
+}
+
+const (
+ NamespaceIndex string = "namespace"
+)
+
+// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
+func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
+ meta, err := meta.Accessor(obj)
+ if err != nil {
+ return []string{""}, fmt.Errorf("object has no meta: %v", err)
+ }
+ return []string{meta.GetNamespace()}, nil
+}
+
+// Index maps the indexed value to a set of keys in the store that match on that value
+type Index map[string]sets.String
+
+// Indexers maps a name to a IndexFunc
+type Indexers map[string]IndexFunc
+
+// Indices maps a name to an Index
+type Indices map[string]Index
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listers.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listers.go
new file mode 100644
index 0000000..29e5859
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listers.go
@@ -0,0 +1,672 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+
+ "github.com/golang/glog"
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/api/unversioned"
+ "k8s.io/kubernetes/pkg/apis/apps"
+ "k8s.io/kubernetes/pkg/apis/batch"
+ "k8s.io/kubernetes/pkg/apis/extensions"
+ "k8s.io/kubernetes/pkg/labels"
+)
+
+// TODO: generate these classes and methods for all resources of interest using
+// a script. Can use "go generate" once 1.4 is supported by all users.
+
+// StoreToPodLister makes a Store have the List method of the client.PodInterface
+// The Store must contain (only) Pods.
+//
+// Example:
+// s := cache.NewStore()
+// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"}
+// r := cache.NewReflector(lw, &api.Pod{}, s).Run()
+// l := StoreToPodLister{s}
+// l.List()
+type StoreToPodLister struct {
+ Indexer
+}
+
+// Please note that selector is filtering among the pods that have gotten into
+// the store; there may have been some filtering that already happened before
+// that.
+//
+// TODO: converge on the interface in pkg/client.
+func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
+ // TODO: it'd be great to just call
+ // s.Pods(api.NamespaceAll).List(selector), however then we'd have to
+ // remake the list.Items as a []*api.Pod. So leave this separate for
+ // now.
+ for _, m := range s.Indexer.List() {
+ pod := m.(*api.Pod)
+ if selector.Matches(labels.Set(pod.Labels)) {
+ pods = append(pods, pod)
+ }
+ }
+ return pods, nil
+}
+
+// Pods is taking baby steps to be more like the api in pkg/client
+func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
+ return storePodsNamespacer{s.Indexer, namespace}
+}
+
+type storePodsNamespacer struct {
+ indexer Indexer
+ namespace string
+}
+
+// Please note that selector is filtering among the pods that have gotten into
+// the store; there may have been some filtering that already happened before
+// that.
+func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error) {
+ pods := api.PodList{}
+
+ if s.namespace == api.NamespaceAll {
+ for _, m := range s.indexer.List() {
+ pod := m.(*api.Pod)
+ if selector.Matches(labels.Set(pod.Labels)) {
+ pods.Items = append(pods.Items, *pod)
+ }
+ }
+ return pods, nil
+ }
+
+ key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
+ items, err := s.indexer.Index(NamespaceIndex, key)
+ if err != nil {
+ // Ignore error; do slow search without index.
+ glog.Warningf("can not retrieve list of objects using index : %v", err)
+ for _, m := range s.indexer.List() {
+ pod := m.(*api.Pod)
+ if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) {
+ pods.Items = append(pods.Items, *pod)
+ }
+ }
+ return pods, nil
+ }
+ for _, m := range items {
+ pod := m.(*api.Pod)
+ if selector.Matches(labels.Set(pod.Labels)) {
+ pods.Items = append(pods.Items, *pod)
+ }
+ }
+ return pods, nil
+}
+
+// Exists returns true if a pod matching the namespace/name of the given pod exists in the store.
+func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) {
+ _, exists, err := s.Indexer.Get(pod)
+ if err != nil {
+ return false, err
+ }
+ return exists, nil
+}
+
+// NodeConditionPredicate is a function that indicates whether the given node's conditions meet
+// some set of criteria defined by the function.
+type NodeConditionPredicate func(node *api.Node) bool
+
+// StoreToNodeLister makes a Store have the List method of the client.NodeInterface
+// The Store must contain (only) Nodes.
+type StoreToNodeLister struct {
+ Store
+}
+
+func (s *StoreToNodeLister) List() (machines api.NodeList, err error) {
+ for _, m := range s.Store.List() {
+ machines.Items = append(machines.Items, *(m.(*api.Node)))
+ }
+ return machines, nil
+}
+
+// NodeCondition returns a storeToNodeConditionLister
+func (s *StoreToNodeLister) NodeCondition(predicate NodeConditionPredicate) storeToNodeConditionLister {
+ // TODO: Move this filtering server side. Currently our selectors don't facilitate searching through a list so we
+ // have the reflector filter out the Unschedulable field and sift through node conditions in the lister.
+ return storeToNodeConditionLister{s.Store, predicate}
+}
+
+// storeToNodeConditionLister filters and returns nodes matching the given type and status from the store.
+type storeToNodeConditionLister struct {
+ store Store
+ predicate NodeConditionPredicate
+}
+
+// List returns a list of nodes that match the conditions defined by the predicate functions in the storeToNodeConditionLister.
+func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) {
+ for _, m := range s.store.List() {
+ node := m.(*api.Node)
+ if s.predicate(node) {
+ nodes.Items = append(nodes.Items, *node)
+ } else {
+ glog.V(5).Infof("Node %s matches none of the conditions", node.Name)
+ }
+ }
+ return
+}
+
+// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers.
+type StoreToReplicationControllerLister struct {
+ Indexer
+}
+
+// Exists checks if the given rc exists in the store.
+func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) {
+ _, exists, err := s.Indexer.Get(controller)
+ if err != nil {
+ return false, err
+ }
+ return exists, nil
+}
+
+// StoreToReplicationControllerLister lists all controllers in the store.
+// TODO: converge on the interface in pkg/client
+func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) {
+ for _, c := range s.Indexer.List() {
+ controllers = append(controllers, *(c.(*api.ReplicationController)))
+ }
+ return controllers, nil
+}
+
+func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer {
+ return storeReplicationControllersNamespacer{s.Indexer, namespace}
+}
+
+type storeReplicationControllersNamespacer struct {
+ indexer Indexer
+ namespace string
+}
+
+func (s storeReplicationControllersNamespacer) List(selector labels.Selector) ([]api.ReplicationController, error) {
+ controllers := []api.ReplicationController{}
+
+ if s.namespace == api.NamespaceAll {
+ for _, m := range s.indexer.List() {
+ rc := *(m.(*api.ReplicationController))
+ if selector.Matches(labels.Set(rc.Labels)) {
+ controllers = append(controllers, rc)
+ }
+ }
+ return controllers, nil
+ }
+
+ key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
+ items, err := s.indexer.Index(NamespaceIndex, key)
+ if err != nil {
+ // Ignore error; do slow search without index.
+ glog.Warningf("can not retrieve list of objects using index : %v", err)
+ for _, m := range s.indexer.List() {
+ rc := *(m.(*api.ReplicationController))
+ if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) {
+ controllers = append(controllers, rc)
+ }
+ }
+ return controllers, nil
+ }
+ for _, m := range items {
+ rc := *(m.(*api.ReplicationController))
+ if selector.Matches(labels.Set(rc.Labels)) {
+ controllers = append(controllers, rc)
+ }
+ }
+ return controllers, nil
+}
+
+// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found.
+func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
+ var selector labels.Selector
+ var rc api.ReplicationController
+
+ if len(pod.Labels) == 0 {
+ err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name)
+ return
+ }
+
+ key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}}
+ items, err := s.Indexer.Index(NamespaceIndex, key)
+ if err != nil {
+ return
+ }
+
+ for _, m := range items {
+ rc = *m.(*api.ReplicationController)
+ labelSet := labels.Set(rc.Spec.Selector)
+ selector = labels.Set(rc.Spec.Selector).AsSelector()
+
+ // If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
+ if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(pod.Labels)) {
+ continue
+ }
+ controllers = append(controllers, rc)
+ }
+ if len(controllers) == 0 {
+ err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
+ }
+ return
+}
+
+// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments.
+type StoreToDeploymentLister struct {
+ Store
+}
+
+// Exists checks if the given deployment exists in the store.
+func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (bool, error) {
+ _, exists, err := s.Store.Get(deployment)
+ if err != nil {
+ return false, err
+ }
+ return exists, nil
+}
+
+// StoreToDeploymentLister lists all deployments in the store.
+// TODO: converge on the interface in pkg/client
+func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, err error) {
+ for _, c := range s.Store.List() {
+ deployments = append(deployments, *(c.(*extensions.Deployment)))
+ }
+ return deployments, nil
+}
+
+// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found.
+func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []extensions.Deployment, err error) {
+ var d extensions.Deployment
+
+ if len(rs.Labels) == 0 {
+ err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name)
+ return
+ }
+
+ // TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label
+ for _, m := range s.Store.List() {
+ d = *m.(*extensions.Deployment)
+ if d.Namespace != rs.Namespace {
+ continue
+ }
+
+ selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector)
+ if err != nil {
+ return nil, fmt.Errorf("invalid label selector: %v", err)
+ }
+ // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
+ if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) {
+ continue
+ }
+ deployments = append(deployments, d)
+ }
+ if len(deployments) == 0 {
+ err = fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels)
+ }
+ return
+}
+
+// StoreToReplicaSetLister gives a store List and Exists methods. The store must contain only ReplicaSets.
+type StoreToReplicaSetLister struct {
+ Store
+}
+
+// Exists checks if the given ReplicaSet exists in the store.
+func (s *StoreToReplicaSetLister) Exists(rs *extensions.ReplicaSet) (bool, error) {
+ _, exists, err := s.Store.Get(rs)
+ if err != nil {
+ return false, err
+ }
+ return exists, nil
+}
+
+// List lists all ReplicaSets in the store.
+// TODO: converge on the interface in pkg/client
+func (s *StoreToReplicaSetLister) List() (rss []extensions.ReplicaSet, err error) {
+ for _, rs := range s.Store.List() {
+ rss = append(rss, *(rs.(*extensions.ReplicaSet)))
+ }
+ return rss, nil
+}
+
+type storeReplicaSetsNamespacer struct {
+ store Store
+ namespace string
+}
+
+func (s storeReplicaSetsNamespacer) List(selector labels.Selector) (rss []extensions.ReplicaSet, err error) {
+ for _, c := range s.store.List() {
+ rs := *(c.(*extensions.ReplicaSet))
+ if s.namespace == api.NamespaceAll || s.namespace == rs.Namespace {
+ if selector.Matches(labels.Set(rs.Labels)) {
+ rss = append(rss, rs)
+ }
+ }
+ }
+ return
+}
+
+func (s *StoreToReplicaSetLister) ReplicaSets(namespace string) storeReplicaSetsNamespacer {
+ return storeReplicaSetsNamespacer{s.Store, namespace}
+}
+
+// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found.
+func (s *StoreToReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
+ var selector labels.Selector
+ var rs extensions.ReplicaSet
+
+ if len(pod.Labels) == 0 {
+ err = fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name)
+ return
+ }
+
+ for _, m := range s.Store.List() {
+ rs = *m.(*extensions.ReplicaSet)
+ if rs.Namespace != pod.Namespace {
+ continue
+ }
+ selector, err = unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
+ if err != nil {
+ err = fmt.Errorf("invalid selector: %v", err)
+ return
+ }
+
+ // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
+ if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
+ continue
+ }
+ rss = append(rss, rs)
+ }
+ if len(rss) == 0 {
+ err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
+ }
+ return
+}
+
+// StoreToDaemonSetLister gives a store List and Exists methods. The store must contain only DaemonSets.
+type StoreToDaemonSetLister struct {
+ Store
+}
+
+// Exists checks if the given daemon set exists in the store.
+func (s *StoreToDaemonSetLister) Exists(ds *extensions.DaemonSet) (bool, error) {
+ _, exists, err := s.Store.Get(ds)
+ if err != nil {
+ return false, err
+ }
+ return exists, nil
+}
+
+// List lists all daemon sets in the store.
+// TODO: converge on the interface in pkg/client
+func (s *StoreToDaemonSetLister) List() (dss extensions.DaemonSetList, err error) {
+ for _, c := range s.Store.List() {
+ dss.Items = append(dss.Items, *(c.(*extensions.DaemonSet)))
+ }
+ return dss, nil
+}
+
+// GetPodDaemonSets returns a list of daemon sets managing a pod.
+// Returns an error if and only if no matching daemon sets are found.
+func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []extensions.DaemonSet, err error) {
+ var selector labels.Selector
+ var daemonSet extensions.DaemonSet
+
+ if len(pod.Labels) == 0 {
+ err = fmt.Errorf("no daemon sets found for pod %v because it has no labels", pod.Name)
+ return
+ }
+
+ for _, m := range s.Store.List() {
+ daemonSet = *m.(*extensions.DaemonSet)
+ if daemonSet.Namespace != pod.Namespace {
+ continue
+ }
+ selector, err = unversioned.LabelSelectorAsSelector(daemonSet.Spec.Selector)
+ if err != nil {
+ // this should not happen if the DaemonSet passed validation
+ return nil, err
+ }
+
+ // If a daemonSet with a nil or empty selector creeps in, it should match nothing, not everything.
+ if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
+ continue
+ }
+ daemonSets = append(daemonSets, daemonSet)
+ }
+ if len(daemonSets) == 0 {
+ err = fmt.Errorf("could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
+ }
+ return
+}
+
+// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface
+// The Store must contain (only) Services.
+type StoreToServiceLister struct {
+ Store
+}
+
+func (s *StoreToServiceLister) List() (services api.ServiceList, err error) {
+ for _, m := range s.Store.List() {
+ services.Items = append(services.Items, *(m.(*api.Service)))
+ }
+ return services, nil
+}
+
+// TODO: Move this back to scheduler as a helper function that takes a Store,
+// rather than a method of StoreToServiceLister.
+func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) {
+ var selector labels.Selector
+ var service api.Service
+
+ for _, m := range s.Store.List() {
+ service = *m.(*api.Service)
+ // consider only services that are in the same namespace as the pod
+ if service.Namespace != pod.Namespace {
+ continue
+ }
+ if service.Spec.Selector == nil {
+ // services with nil selectors match nothing, not everything.
+ continue
+ }
+ selector = labels.Set(service.Spec.Selector).AsSelector()
+ if selector.Matches(labels.Set(pod.Labels)) {
+ services = append(services, service)
+ }
+ }
+ if len(services) == 0 {
+ err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
+ }
+
+ return
+}
+
+// StoreToEndpointsLister makes a Store that lists endpoints.
+type StoreToEndpointsLister struct {
+ Store
+}
+
+// List lists all endpoints in the store.
+func (s *StoreToEndpointsLister) List() (services api.EndpointsList, err error) {
+ for _, m := range s.Store.List() {
+ services.Items = append(services.Items, *(m.(*api.Endpoints)))
+ }
+ return services, nil
+}
+
+// GetServiceEndpoints returns the endpoints of a service, matched on service name.
+func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.Endpoints, err error) {
+ for _, m := range s.Store.List() {
+ ep = *m.(*api.Endpoints)
+ if svc.Name == ep.Name && svc.Namespace == ep.Namespace {
+ return ep, nil
+ }
+ }
+ err = fmt.Errorf("could not find endpoints for service: %v", svc.Name)
+ return
+}
+
+// StoreToJobLister gives a store List and Exists methods. The store must contain only Jobs.
+type StoreToJobLister struct {
+ Store
+}
+
+// Exists checks if the given job exists in the store.
+func (s *StoreToJobLister) Exists(job *batch.Job) (bool, error) {
+ _, exists, err := s.Store.Get(job)
+ if err != nil {
+ return false, err
+ }
+ return exists, nil
+}
+
+// StoreToJobLister lists all jobs in the store.
+func (s *StoreToJobLister) List() (jobs batch.JobList, err error) {
+ for _, c := range s.Store.List() {
+ jobs.Items = append(jobs.Items, *(c.(*batch.Job)))
+ }
+ return jobs, nil
+}
+
+// GetPodJobs returns a list of jobs managing a pod. Returns an error only if no matching jobs are found.
+func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) {
+ var selector labels.Selector
+ var job batch.Job
+
+ if len(pod.Labels) == 0 {
+ err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)
+ return
+ }
+
+ for _, m := range s.Store.List() {
+ job = *m.(*batch.Job)
+ if job.Namespace != pod.Namespace {
+ continue
+ }
+
+ selector, _ = unversioned.LabelSelectorAsSelector(job.Spec.Selector)
+ if !selector.Matches(labels.Set(pod.Labels)) {
+ continue
+ }
+ jobs = append(jobs, job)
+ }
+ if len(jobs) == 0 {
+ err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
+ }
+ return
+}
+
+// Typed wrapper around a store of PersistentVolumes
+type StoreToPVFetcher struct {
+ Store
+}
+
+// GetPersistentVolumeInfo returns cached data for the PersistentVolume 'id'.
+func (s *StoreToPVFetcher) GetPersistentVolumeInfo(id string) (*api.PersistentVolume, error) {
+ o, exists, err := s.Get(&api.PersistentVolume{ObjectMeta: api.ObjectMeta{Name: id}})
+
+ if err != nil {
+ return nil, fmt.Errorf("error retrieving PersistentVolume '%v' from cache: %v", id, err)
+ }
+
+ if !exists {
+ return nil, fmt.Errorf("PersistentVolume '%v' not found", id)
+ }
+
+ return o.(*api.PersistentVolume), nil
+}
+
+// Typed wrapper around a store of PersistentVolumeClaims
+type StoreToPVCFetcher struct {
+ Store
+}
+
+// GetPersistentVolumeClaimInfo returns cached data for the PersistentVolumeClaim 'id'.
+func (s *StoreToPVCFetcher) GetPersistentVolumeClaimInfo(namespace string, id string) (*api.PersistentVolumeClaim, error) {
+ o, exists, err := s.Get(&api.PersistentVolumeClaim{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: id}})
+ if err != nil {
+ return nil, fmt.Errorf("error retrieving PersistentVolumeClaim '%s/%s' from cache: %v", namespace, id, err)
+ }
+
+ if !exists {
+ return nil, fmt.Errorf("PersistentVolumeClaim '%s/%s' not found", namespace, id)
+ }
+
+ return o.(*api.PersistentVolumeClaim), nil
+}
+
+// StoreToPetSetLister gives a store List and Exists methods. The store must contain only PetSets.
+type StoreToPetSetLister struct {
+ Store
+}
+
+// Exists checks if the given PetSet exists in the store.
+func (s *StoreToPetSetLister) Exists(ps *apps.PetSet) (bool, error) {
+ _, exists, err := s.Store.Get(ps)
+ if err != nil {
+ return false, err
+ }
+ return exists, nil
+}
+
+// List lists all PetSets in the store.
+func (s *StoreToPetSetLister) List() (psList []apps.PetSet, err error) {
+ for _, ps := range s.Store.List() {
+ psList = append(psList, *(ps.(*apps.PetSet)))
+ }
+ return psList, nil
+}
+
+type storePetSetsNamespacer struct {
+ store Store
+ namespace string
+}
+
+func (s *StoreToPetSetLister) PetSets(namespace string) storePetSetsNamespacer {
+ return storePetSetsNamespacer{s.Store, namespace}
+}
+
+// GetPodPetSets returns a list of PetSets managing a pod. Returns an error only if no matching PetSets are found.
+func (s *StoreToPetSetLister) GetPodPetSets(pod *api.Pod) (psList []apps.PetSet, err error) {
+ var selector labels.Selector
+ var ps apps.PetSet
+
+ if len(pod.Labels) == 0 {
+ err = fmt.Errorf("no PetSets found for pod %v because it has no labels", pod.Name)
+ return
+ }
+
+ for _, m := range s.Store.List() {
+ ps = *m.(*apps.PetSet)
+ if ps.Namespace != pod.Namespace {
+ continue
+ }
+ selector, err = unversioned.LabelSelectorAsSelector(ps.Spec.Selector)
+ if err != nil {
+ err = fmt.Errorf("invalid selector: %v", err)
+ return
+ }
+
+ // If a PetSet with a nil or empty selector creeps in, it should match nothing, not everything.
+ if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
+ continue
+ }
+ psList = append(psList, ps)
+ }
+ if len(psList) == 0 {
+ err = fmt.Errorf("could not find PetSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
+ }
+ return
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listwatch.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listwatch.go
new file mode 100644
index 0000000..ff56c0b
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listwatch.go
@@ -0,0 +1,86 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "time"
+
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/client/restclient"
+ "k8s.io/kubernetes/pkg/fields"
+ "k8s.io/kubernetes/pkg/runtime"
+ "k8s.io/kubernetes/pkg/watch"
+)
+
+// ListFunc knows how to list resources
+type ListFunc func(options api.ListOptions) (runtime.Object, error)
+
+// WatchFunc knows how to watch resources
+type WatchFunc func(options api.ListOptions) (watch.Interface, error)
+
+// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
+// It is a convenience function for users of NewReflector, etc.
+// ListFunc and WatchFunc must not be nil
+type ListWatch struct {
+ ListFunc ListFunc
+ WatchFunc WatchFunc
+}
+
+// Getter interface knows how to access Get method from RESTClient.
+type Getter interface {
+ Get() *restclient.Request
+}
+
+// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
+func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
+ listFunc := func(options api.ListOptions) (runtime.Object, error) {
+ return c.Get().
+ Namespace(namespace).
+ Resource(resource).
+ VersionedParams(&options, api.ParameterCodec).
+ FieldsSelectorParam(fieldSelector).
+ Do().
+ Get()
+ }
+ watchFunc := func(options api.ListOptions) (watch.Interface, error) {
+ return c.Get().
+ Prefix("watch").
+ Namespace(namespace).
+ Resource(resource).
+ VersionedParams(&options, api.ParameterCodec).
+ FieldsSelectorParam(fieldSelector).
+ Watch()
+ }
+ return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
+}
+
+func timeoutFromListOptions(options api.ListOptions) time.Duration {
+ if options.TimeoutSeconds != nil {
+ return time.Duration(*options.TimeoutSeconds) * time.Second
+ }
+ return 0
+}
+
+// List a set of apiserver resources
+func (lw *ListWatch) List(options api.ListOptions) (runtime.Object, error) {
+ return lw.ListFunc(options)
+}
+
+// Watch a set of apiserver resources
+func (lw *ListWatch) Watch(options api.ListOptions) (watch.Interface, error) {
+ return lw.WatchFunc(options)
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/reflector.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/reflector.go
new file mode 100644
index 0000000..e1af63e
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/reflector.go
@@ -0,0 +1,423 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "net"
+ "net/url"
+ "reflect"
+ "regexp"
+ goruntime "runtime"
+ "runtime/debug"
+ "strconv"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/golang/glog"
+ "k8s.io/kubernetes/pkg/api"
+ apierrs "k8s.io/kubernetes/pkg/api/errors"
+ "k8s.io/kubernetes/pkg/api/meta"
+ "k8s.io/kubernetes/pkg/runtime"
+ utilruntime "k8s.io/kubernetes/pkg/util/runtime"
+ "k8s.io/kubernetes/pkg/util/wait"
+ "k8s.io/kubernetes/pkg/watch"
+)
+
+// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
+type ListerWatcher interface {
+ // List should return a list type object; the Items field will be extracted, and the
+ // ResourceVersion field will be used to start the watch in the right place.
+ List(options api.ListOptions) (runtime.Object, error)
+ // Watch should begin a watch at the specified version.
+ Watch(options api.ListOptions) (watch.Interface, error)
+}
+
+// Reflector watches a specified resource and causes all changes to be reflected in the given store.
+type Reflector struct {
+ // name identifies this reflector. By default it will be a file:line if possible.
+ name string
+
+ // The type of object we expect to place in the store.
+ expectedType reflect.Type
+ // The destination to sync up with the watch source
+ store Store
+ // listerWatcher is used to perform lists and watches.
+ listerWatcher ListerWatcher
+ // period controls timing between one watch ending and
+ // the beginning of the next one.
+ period time.Duration
+ resyncPeriod time.Duration
+ // now() returns current time - exposed for testing purposes
+ now func() time.Time
+ // nextResync is approximate time of next resync (0 if not scheduled)
+ nextResync time.Time
+ // lastSyncResourceVersion is the resource version token last
+ // observed when doing a sync with the underlying store
+ // it is thread safe, but not synchronized with the underlying store
+ lastSyncResourceVersion string
+ // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
+ lastSyncResourceVersionMutex sync.RWMutex
+}
+
+var (
+ // We try to spread the load on apiserver by setting timeouts for
+ // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
+ // However, it can be modified to avoid periodic resync to break the
+ // TCP connection.
+ minWatchTimeout = 5 * time.Minute
+ // If we are within 'forceResyncThreshold' from the next planned resync
+ // and are just before issuing Watch(), resync will be forced now.
+ forceResyncThreshold = 3 * time.Second
+ // We try to set timeouts for Watch() so that we will finish about
+ // than 'timeoutThreshold' from next planned periodic resync.
+ timeoutThreshold = 1 * time.Second
+)
+
+// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
+// The indexer is configured to key on namespace
+func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
+ indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
+ reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
+ return indexer, reflector
+}
+
+// NewReflector creates a new Reflector object which will keep the given store up to
+// date with the server's contents for the given resource. Reflector promises to
+// only put things in the store that have the type of expectedType, unless expectedType
+// is nil. If resyncPeriod is non-zero, then lists will be executed after every
+// resyncPeriod, so that you can use reflectors to periodically process everything as
+// well as incrementally processing the things that change.
+func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
+ return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
+}
+
+// NewNamedReflector same as NewReflector, but with a specified name for logging
+func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
+ r := &Reflector{
+ name: name,
+ listerWatcher: lw,
+ store: store,
+ expectedType: reflect.TypeOf(expectedType),
+ period: time.Second,
+ resyncPeriod: resyncPeriod,
+ now: time.Now,
+ }
+ return r
+}
+
+// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
+// call chains to NewReflector, so they'd be low entropy names for reflectors
+var internalPackages = []string{"kubernetes/pkg/client/cache/", "kubernetes/pkg/controller/framework/", "/runtime/asm_"}
+
+// getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages
+// it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging
+func getDefaultReflectorName(ignoredPackages ...string) string {
+ name := "????"
+ const maxStack = 10
+ for i := 1; i < maxStack; i++ {
+ _, file, line, ok := goruntime.Caller(i)
+ if !ok {
+ file, line, ok = extractStackCreator()
+ if !ok {
+ break
+ }
+ i += maxStack
+ }
+ if hasPackage(file, ignoredPackages) {
+ continue
+ }
+
+ file = trimPackagePrefix(file)
+ name = fmt.Sprintf("%s:%d", file, line)
+ break
+ }
+ return name
+}
+
+// hasPackage returns true if the file is in one of the ignored packages.
+func hasPackage(file string, ignoredPackages []string) bool {
+ for _, ignoredPackage := range ignoredPackages {
+ if strings.Contains(file, ignoredPackage) {
+ return true
+ }
+ }
+ return false
+}
+
+// trimPackagePrefix reduces dulpicate values off the front of a package name.
+func trimPackagePrefix(file string) string {
+ if l := strings.LastIndex(file, "k8s.io/kubernetes/pkg/"); l >= 0 {
+ return file[l+len("k8s.io/kubernetes/"):]
+ }
+ if l := strings.LastIndex(file, "/src/"); l >= 0 {
+ return file[l+5:]
+ }
+ if l := strings.LastIndex(file, "/pkg/"); l >= 0 {
+ return file[l+1:]
+ }
+ return file
+}
+
+var stackCreator = regexp.MustCompile(`(?m)^created by (.*)\n\s+(.*):(\d+) \+0x[[:xdigit:]]+$`)
+
+// extractStackCreator retrieves the goroutine file and line that launched this stack. Returns false
+// if the creator cannot be located.
+// TODO: Go does not expose this via runtime https://github.com/golang/go/issues/11440
+func extractStackCreator() (string, int, bool) {
+ stack := debug.Stack()
+ matches := stackCreator.FindStringSubmatch(string(stack))
+ if matches == nil || len(matches) != 4 {
+ return "", 0, false
+ }
+ line, err := strconv.Atoi(matches[3])
+ if err != nil {
+ return "", 0, false
+ }
+ return matches[2], line, true
+}
+
+// Run starts a watch and handles watch events. Will restart the watch if it is closed.
+// Run starts a goroutine and returns immediately.
+func (r *Reflector) Run() {
+ glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
+ go wait.Until(func() {
+ if err := r.ListAndWatch(wait.NeverStop); err != nil {
+ utilruntime.HandleError(err)
+ }
+ }, r.period, wait.NeverStop)
+}
+
+// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
+// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
+func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
+ glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
+ go wait.Until(func() {
+ if err := r.ListAndWatch(stopCh); err != nil {
+ utilruntime.HandleError(err)
+ }
+ }, r.period, stopCh)
+}
+
+var (
+ // nothing will ever be sent down this channel
+ neverExitWatch <-chan time.Time = make(chan time.Time)
+
+ // Used to indicate that watching stopped so that a resync could happen.
+ errorResyncRequested = errors.New("resync channel fired")
+
+ // Used to indicate that watching stopped because of a signal from the stop
+ // channel passed in from a client of the reflector.
+ errorStopRequested = errors.New("Stop requested")
+)
+
+// resyncChan returns a channel which will receive something when a resync is
+// required, and a cleanup function.
+func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
+ if r.resyncPeriod == 0 {
+ r.nextResync = time.Time{}
+ return neverExitWatch, func() bool { return false }
+ }
+ // The cleanup function is required: imagine the scenario where watches
+ // always fail so we end up listing frequently. Then, if we don't
+ // manually stop the timer, we could end up with many timers active
+ // concurrently.
+ r.nextResync = r.now().Add(r.resyncPeriod)
+ t := time.NewTimer(r.resyncPeriod)
+ return t.C, t.Stop
+}
+
+// ListAndWatch first lists all items and get the resource version at the moment of call,
+// and then use the resource version to watch.
+// It returns error if ListAndWatch didn't even try to initialize watch.
+func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
+ glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
+ var resourceVersion string
+ resyncCh, cleanup := r.resyncChan()
+ defer cleanup()
+
+ // Explicitly set "0" as resource version - it's fine for the List()
+ // to be served from cache and potentially be delayed relative to
+ // etcd contents. Reflector framework will catch up via Watch() eventually.
+ options := api.ListOptions{ResourceVersion: "0"}
+ list, err := r.listerWatcher.List(options)
+ if err != nil {
+ return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
+ }
+ listMetaInterface, err := meta.ListAccessor(list)
+ if err != nil {
+ return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
+ }
+ resourceVersion = listMetaInterface.GetResourceVersion()
+ items, err := meta.ExtractList(list)
+ if err != nil {
+ return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
+ }
+ if err := r.syncWith(items, resourceVersion); err != nil {
+ return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
+ }
+ r.setLastSyncResourceVersion(resourceVersion)
+
+ resyncerrc := make(chan error, 1)
+ go func() {
+ for {
+ select {
+ case <-resyncCh:
+ case <-stopCh:
+ return
+ }
+ glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
+ if err := r.store.Resync(); err != nil {
+ resyncerrc <- err
+ return
+ }
+ cleanup()
+ resyncCh, cleanup = r.resyncChan()
+ }
+ }()
+
+ for {
+ timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
+ options = api.ListOptions{
+ ResourceVersion: resourceVersion,
+ // We want to avoid situations of hanging watchers. Stop any wachers that do not
+ // receive any events within the timeout window.
+ TimeoutSeconds: &timemoutseconds,
+ }
+
+ w, err := r.listerWatcher.Watch(options)
+ if err != nil {
+ switch err {
+ case io.EOF:
+ // watch closed normally
+ case io.ErrUnexpectedEOF:
+ glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
+ default:
+ utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
+ }
+ // If this is "connection refused" error, it means that most likely apiserver is not responsive.
+ // It doesn't make sense to re-list all objects because most likely we will be able to restart
+ // watch where we ended.
+ // If that's the case wait and resend watch request.
+ if urlError, ok := err.(*url.Error); ok {
+ if opError, ok := urlError.Err.(*net.OpError); ok {
+ if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
+ time.Sleep(time.Second)
+ continue
+ }
+ }
+ }
+ return nil
+ }
+
+ if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
+ if err != errorStopRequested {
+ glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
+ }
+ return nil
+ }
+ }
+}
+
+// syncWith replaces the store's items with the given list.
+func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
+ found := make([]interface{}, 0, len(items))
+ for _, item := range items {
+ found = append(found, item)
+ }
+ return r.store.Replace(found, resourceVersion)
+}
+
+// watchHandler watches w and keeps *resourceVersion up to date.
+func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
+ start := time.Now()
+ eventCount := 0
+
+ // Stopping the watcher should be idempotent and if we return from this function there's no way
+ // we're coming back in with the same watch interface.
+ defer w.Stop()
+
+loop:
+ for {
+ select {
+ case <-stopCh:
+ return errorStopRequested
+ case err := <-errc:
+ return err
+ case event, ok := <-w.ResultChan():
+ if !ok {
+ break loop
+ }
+ if event.Type == watch.Error {
+ return apierrs.FromObject(event.Object)
+ }
+ if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
+ utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
+ continue
+ }
+ meta, err := meta.Accessor(event.Object)
+ if err != nil {
+ utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
+ continue
+ }
+ newResourceVersion := meta.GetResourceVersion()
+ switch event.Type {
+ case watch.Added:
+ r.store.Add(event.Object)
+ case watch.Modified:
+ r.store.Update(event.Object)
+ case watch.Deleted:
+ // TODO: Will any consumers need access to the "last known
+ // state", which is passed in event.Object? If so, may need
+ // to change this.
+ r.store.Delete(event.Object)
+ default:
+ utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
+ }
+ *resourceVersion = newResourceVersion
+ r.setLastSyncResourceVersion(newResourceVersion)
+ eventCount++
+ }
+ }
+
+ watchDuration := time.Now().Sub(start)
+ if watchDuration < 1*time.Second && eventCount == 0 {
+ glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
+ return errors.New("very short watch")
+ }
+ glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
+ return nil
+}
+
+// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
+// The value returned is not synchronized with access to the underlying store and is not thread-safe
+func (r *Reflector) LastSyncResourceVersion() string {
+ r.lastSyncResourceVersionMutex.RLock()
+ defer r.lastSyncResourceVersionMutex.RUnlock()
+ return r.lastSyncResourceVersion
+}
+
+func (r *Reflector) setLastSyncResourceVersion(v string) {
+ r.lastSyncResourceVersionMutex.Lock()
+ defer r.lastSyncResourceVersionMutex.Unlock()
+ r.lastSyncResourceVersion = v
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/store.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/store.go
new file mode 100644
index 0000000..4cd2479
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/store.go
@@ -0,0 +1,240 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+ "strings"
+
+ "k8s.io/kubernetes/pkg/api/meta"
+)
+
+// Store is a generic object storage interface. Reflector knows how to watch a server
+// and update a store. A generic store is provided, which allows Reflector to be used
+// as a local caching system, and an LRU store, which allows Reflector to work like a
+// queue of items yet to be processed.
+//
+// Store makes no assumptions about stored object identity; it is the responsibility
+// of a Store implementation to provide a mechanism to correctly key objects and to
+// define the contract for obtaining objects by some arbitrary key type.
+type Store interface {
+ Add(obj interface{}) error
+ Update(obj interface{}) error
+ Delete(obj interface{}) error
+ List() []interface{}
+ ListKeys() []string
+ Get(obj interface{}) (item interface{}, exists bool, err error)
+ GetByKey(key string) (item interface{}, exists bool, err error)
+
+ // Replace will delete the contents of the store, using instead the
+ // given list. Store takes ownership of the list, you should not reference
+ // it after calling this function.
+ Replace([]interface{}, string) error
+ Resync() error
+}
+
+// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
+type KeyFunc func(obj interface{}) (string, error)
+
+// KeyError will be returned any time a KeyFunc gives an error; it includes the object
+// at fault.
+type KeyError struct {
+ Obj interface{}
+ Err error
+}
+
+// Error gives a human-readable description of the error.
+func (k KeyError) Error() string {
+ return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
+}
+
+// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for
+// the object but not the object itself.
+type ExplicitKey string
+
+// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
+// keys for API objects which implement meta.Interface.
+// The key uses the format <namespace>/<name> unless <namespace> is empty, then
+// it's just <name>.
+//
+// TODO: replace key-as-string with a key-as-struct so that this
+// packing/unpacking won't be necessary.
+func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
+ if key, ok := obj.(ExplicitKey); ok {
+ return string(key), nil
+ }
+ meta, err := meta.Accessor(obj)
+ if err != nil {
+ return "", fmt.Errorf("object has no meta: %v", err)
+ }
+ if len(meta.GetNamespace()) > 0 {
+ return meta.GetNamespace() + "/" + meta.GetName(), nil
+ }
+ return meta.GetName(), nil
+}
+
+// SplitMetaNamespaceKey returns the namespace and name that
+// MetaNamespaceKeyFunc encoded into key.
+//
+// TODO: replace key-as-string with a key-as-struct so that this
+// packing/unpacking won't be necessary.
+func SplitMetaNamespaceKey(key string) (namespace, name string, err error) {
+ parts := strings.Split(key, "/")
+ switch len(parts) {
+ case 1:
+ // name only, no namespace
+ return "", parts[0], nil
+ case 2:
+ // namespace and name
+ return parts[0], parts[1], nil
+ }
+
+ return "", "", fmt.Errorf("unexpected key format: %q", key)
+}
+
+// cache responsibilities are limited to:
+// 1. Computing keys for objects via keyFunc
+// 2. Invoking methods of a ThreadSafeStorage interface
+type cache struct {
+ // cacheStorage bears the burden of thread safety for the cache
+ cacheStorage ThreadSafeStore
+ // keyFunc is used to make the key for objects stored in and retrieved from items, and
+ // should be deterministic.
+ keyFunc KeyFunc
+}
+
+var _ Store = &cache{}
+
+// Add inserts an item into the cache.
+func (c *cache) Add(obj interface{}) error {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Add(key, obj)
+ return nil
+}
+
+// Update sets an item in the cache to its updated state.
+func (c *cache) Update(obj interface{}) error {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Update(key, obj)
+ return nil
+}
+
+// Delete removes an item from the cache.
+func (c *cache) Delete(obj interface{}) error {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return KeyError{obj, err}
+ }
+ c.cacheStorage.Delete(key)
+ return nil
+}
+
+// List returns a list of all the items.
+// List is completely threadsafe as long as you treat all items as immutable.
+func (c *cache) List() []interface{} {
+ return c.cacheStorage.List()
+}
+
+// ListKeys returns a list of all the keys of the objects currently
+// in the cache.
+func (c *cache) ListKeys() []string {
+ return c.cacheStorage.ListKeys()
+}
+
+// GetIndexers returns the indexers of cache
+func (c *cache) GetIndexers() Indexers {
+ return c.cacheStorage.GetIndexers()
+}
+
+// Index returns a list of items that match on the index function
+// Index is thread-safe so long as you treat all items as immutable
+func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
+ return c.cacheStorage.Index(indexName, obj)
+}
+
+// ListIndexFuncValues returns the list of generated values of an Index func
+func (c *cache) ListIndexFuncValues(indexName string) []string {
+ return c.cacheStorage.ListIndexFuncValues(indexName)
+}
+
+func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
+ return c.cacheStorage.ByIndex(indexName, indexKey)
+}
+
+func (c *cache) AddIndexers(newIndexers Indexers) error {
+ return c.cacheStorage.AddIndexers(newIndexers)
+}
+
+// Get returns the requested item, or sets exists=false.
+// Get is completely threadsafe as long as you treat all items as immutable.
+func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
+ key, err := c.keyFunc(obj)
+ if err != nil {
+ return nil, false, KeyError{obj, err}
+ }
+ return c.GetByKey(key)
+}
+
+// GetByKey returns the request item, or exists=false.
+// GetByKey is completely threadsafe as long as you treat all items as immutable.
+func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
+ item, exists = c.cacheStorage.Get(key)
+ return item, exists, nil
+}
+
+// Replace will delete the contents of 'c', using instead the given list.
+// 'c' takes ownership of the list, you should not reference the list again
+// after calling this function.
+func (c *cache) Replace(list []interface{}, resourceVersion string) error {
+ items := map[string]interface{}{}
+ for _, item := range list {
+ key, err := c.keyFunc(item)
+ if err != nil {
+ return KeyError{item, err}
+ }
+ items[key] = item
+ }
+ c.cacheStorage.Replace(items, resourceVersion)
+ return nil
+}
+
+// Resync touches all items in the store to force processing
+func (c *cache) Resync() error {
+ return c.cacheStorage.Resync()
+}
+
+// NewStore returns a Store implemented simply with a map and a lock.
+func NewStore(keyFunc KeyFunc) Store {
+ return &cache{
+ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
+ keyFunc: keyFunc,
+ }
+}
+
+// NewIndexer returns an Indexer implemented simply with a map and a lock.
+func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
+ return &cache{
+ cacheStorage: NewThreadSafeStore(indexers, Indices{}),
+ keyFunc: keyFunc,
+ }
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/thread_safe_store.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/thread_safe_store.go
new file mode 100644
index 0000000..9d88ce3
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/thread_safe_store.go
@@ -0,0 +1,288 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+import (
+ "fmt"
+ "sync"
+
+ "k8s.io/kubernetes/pkg/util/sets"
+)
+
+// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
+// TL;DR caveats: you must not modify anything returned by Get or List as it will break
+// the indexing feature in addition to not being thread safe.
+//
+// The guarantees of thread safety provided by List/Get are only valid if the caller
+// treats returned items as read-only. For example, a pointer inserted in the store
+// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
+// on the same key and modify the pointer in a non-thread-safe way. Also note that
+// modifying objects stored by the indexers (if any) will *not* automatically lead
+// to a re-index. So it's not a good idea to directly modify the objects returned by
+// Get/List, in general.
+type ThreadSafeStore interface {
+ Add(key string, obj interface{})
+ Update(key string, obj interface{})
+ Delete(key string)
+ Get(key string) (item interface{}, exists bool)
+ List() []interface{}
+ ListKeys() []string
+ Replace(map[string]interface{}, string)
+ Index(indexName string, obj interface{}) ([]interface{}, error)
+ ListIndexFuncValues(name string) []string
+ ByIndex(indexName, indexKey string) ([]interface{}, error)
+ GetIndexers() Indexers
+
+ // AddIndexers adds more indexers to this store. If you call this after you already have data
+ // in the store, the results are undefined.
+ AddIndexers(newIndexers Indexers) error
+ Resync() error
+}
+
+// threadSafeMap implements ThreadSafeStore
+type threadSafeMap struct {
+ lock sync.RWMutex
+ items map[string]interface{}
+
+ // indexers maps a name to an IndexFunc
+ indexers Indexers
+ // indices maps a name to an Index
+ indices Indices
+}
+
+func (c *threadSafeMap) Add(key string, obj interface{}) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ oldObject := c.items[key]
+ c.items[key] = obj
+ c.updateIndices(oldObject, obj, key)
+}
+
+func (c *threadSafeMap) Update(key string, obj interface{}) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ oldObject := c.items[key]
+ c.items[key] = obj
+ c.updateIndices(oldObject, obj, key)
+}
+
+func (c *threadSafeMap) Delete(key string) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if obj, exists := c.items[key]; exists {
+ c.deleteFromIndices(obj, key)
+ delete(c.items, key)
+ }
+}
+
+func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ item, exists = c.items[key]
+ return item, exists
+}
+
+func (c *threadSafeMap) List() []interface{} {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ list := make([]interface{}, 0, len(c.items))
+ for _, item := range c.items {
+ list = append(list, item)
+ }
+ return list
+}
+
+// ListKeys returns a list of all the keys of the objects currently
+// in the threadSafeMap.
+func (c *threadSafeMap) ListKeys() []string {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ list := make([]string, 0, len(c.items))
+ for key := range c.items {
+ list = append(list, key)
+ }
+ return list
+}
+
+func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ c.items = items
+
+ // rebuild any index
+ c.indices = Indices{}
+ for key, item := range c.items {
+ c.updateIndices(nil, item, key)
+ }
+}
+
+// Index returns a list of items that match on the index function
+// Index is thread-safe so long as you treat all items as immutable
+func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+
+ indexFunc := c.indexers[indexName]
+ if indexFunc == nil {
+ return nil, fmt.Errorf("Index with name %s does not exist", indexName)
+ }
+
+ indexKeys, err := indexFunc(obj)
+ if err != nil {
+ return nil, err
+ }
+ index := c.indices[indexName]
+
+ // need to de-dupe the return list. Since multiple keys are allowed, this can happen.
+ returnKeySet := sets.String{}
+ for _, indexKey := range indexKeys {
+ set := index[indexKey]
+ for _, key := range set.List() {
+ returnKeySet.Insert(key)
+ }
+ }
+
+ list := make([]interface{}, 0, returnKeySet.Len())
+ for absoluteKey := range returnKeySet {
+ list = append(list, c.items[absoluteKey])
+ }
+ return list, nil
+}
+
+// ByIndex returns a list of items that match an exact value on the index function
+func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+
+ indexFunc := c.indexers[indexName]
+ if indexFunc == nil {
+ return nil, fmt.Errorf("Index with name %s does not exist", indexName)
+ }
+
+ index := c.indices[indexName]
+
+ set := index[indexKey]
+ list := make([]interface{}, 0, set.Len())
+ for _, key := range set.List() {
+ list = append(list, c.items[key])
+ }
+
+ return list, nil
+}
+
+func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+
+ index := c.indices[indexName]
+ names := make([]string, 0, len(index))
+ for key := range index {
+ names = append(names, key)
+ }
+ return names
+}
+
+func (c *threadSafeMap) GetIndexers() Indexers {
+ return c.indexers
+}
+
+func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ if len(c.items) > 0 {
+ return fmt.Errorf("cannot add indexers to running index")
+ }
+
+ oldKeys := sets.StringKeySet(c.indexers)
+ newKeys := sets.StringKeySet(newIndexers)
+
+ if oldKeys.HasAny(newKeys.List()...) {
+ return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
+ }
+
+ for k, v := range newIndexers {
+ c.indexers[k] = v
+ }
+ return nil
+}
+
+// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
+// updateIndices must be called from a function that already has a lock on the cache
+func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
+ // if we got an old object, we need to remove it before we add it again
+ if oldObj != nil {
+ c.deleteFromIndices(oldObj, key)
+ }
+ for name, indexFunc := range c.indexers {
+ indexValues, err := indexFunc(newObj)
+ if err != nil {
+ return err
+ }
+ index := c.indices[name]
+ if index == nil {
+ index = Index{}
+ c.indices[name] = index
+ }
+
+ for _, indexValue := range indexValues {
+ set := index[indexValue]
+ if set == nil {
+ set = sets.String{}
+ index[indexValue] = set
+ }
+ set.Insert(key)
+ }
+ }
+ return nil
+}
+
+// deleteFromIndices removes the object from each of the managed indexes
+// it is intended to be called from a function that already has a lock on the cache
+func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
+ for name, indexFunc := range c.indexers {
+ indexValues, err := indexFunc(obj)
+ if err != nil {
+ return err
+ }
+
+ index := c.indices[name]
+ if index == nil {
+ continue
+ }
+ for _, indexValue := range indexValues {
+ set := index[indexValue]
+ if set != nil {
+ set.Delete(key)
+ }
+ }
+ }
+ return nil
+}
+
+func (c *threadSafeMap) Resync() error {
+ // Nothing to do
+ return nil
+}
+
+func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
+ return &threadSafeMap{
+ items: map[string]interface{}{},
+ indexers: indexers,
+ indices: indices,
+ }
+}
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/undelta_store.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/undelta_store.go
new file mode 100644
index 0000000..117df46
--- /dev/null
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/undelta_store.go
@@ -0,0 +1,83 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cache
+
+// UndeltaStore listens to incremental updates and sends complete state on every change.
+// It implements the Store interface so that it can receive a stream of mirrored objects
+// from Reflector. Whenever it receives any complete (Store.Replace) or incremental change
+// (Store.Add, Store.Update, Store.Delete), it sends the complete state by calling PushFunc.
+// It is thread-safe. It guarantees that every change (Add, Update, Replace, Delete) results
+// in one call to PushFunc, but sometimes PushFunc may be called twice with the same values.
+// PushFunc should be thread safe.
+type UndeltaStore struct {
+ Store
+ PushFunc func([]interface{})
+}
+
+// Assert that it implements the Store interface.
+var _ Store = &UndeltaStore{}
+
+// Note about thread safety. The Store implementation (cache.cache) uses a lock for all methods.
+// In the functions below, the lock gets released and reacquired betweend the {Add,Delete,etc}
+// and the List. So, the following can happen, resulting in two identical calls to PushFunc.
+// time thread 1 thread 2
+// 0 UndeltaStore.Add(a)
+// 1 UndeltaStore.Add(b)
+// 2 Store.Add(a)
+// 3 Store.Add(b)
+// 4 Store.List() -> [a,b]
+// 5 Store.List() -> [a,b]
+
+func (u *UndeltaStore) Add(obj interface{}) error {
+ if err := u.Store.Add(obj); err != nil {
+ return err
+ }
+ u.PushFunc(u.Store.List())
+ return nil
+}
+
+func (u *UndeltaStore) Update(obj interface{}) error {
+ if err := u.Store.Update(obj); err != nil {
+ return err
+ }
+ u.PushFunc(u.Store.List())
+ return nil
+}
+
+func (u *UndeltaStore) Delete(obj interface{}) error {
+ if err := u.Store.Delete(obj); err != nil {
+ return err
+ }
+ u.PushFunc(u.Store.List())
+ return nil
+}
+
+func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
+ if err := u.Store.Replace(list, resourceVersion); err != nil {
+ return err
+ }
+ u.PushFunc(u.Store.List())
+ return nil
+}
+
+// NewUndeltaStore returns an UndeltaStore implemented with a Store.
+func NewUndeltaStore(pushFunc func([]interface{}), keyFunc KeyFunc) *UndeltaStore {
+ return &UndeltaStore{
+ Store: NewStore(keyFunc),
+ PushFunc: pushFunc,
+ }
+}