diff options
Diffstat (limited to 'src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache')
13 files changed, 3196 insertions, 0 deletions
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go new file mode 100644 index 0000000..5a7f4a9 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go @@ -0,0 +1,613 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "errors" + "fmt" + "sync" + + "k8s.io/kubernetes/pkg/util/sets" + + "github.com/golang/glog" +) + +// NewDeltaFIFO returns a Store which can be used process changes to items. +// +// keyFunc is used to figure out what key an object should have. (It's +// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) +// +// 'compressor' may compress as many or as few items as it wants +// (including returning an empty slice), but it should do what it +// does quickly since it is called while the queue is locked. +// 'compressor' may be nil if you don't want any delta compression. +// +// 'keyLister' is expected to return a list of keys that the consumer of +// this queue "knows about". It is used to decide which items are missing +// when Replace() is called; 'Deleted' deltas are produced for these items. +// It may be nil if you don't need to detect all deletions. +// TODO: consider merging keyLister with this object, tracking a list of +// "known" keys when Pop() is called. Have to think about how that +// affects error retrying. +// TODO(lavalamp): I believe there is a possible race only when using an +// external known object source that the above TODO would +// fix. +// +// Also see the comment on DeltaFIFO. +func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO { + f := &DeltaFIFO{ + items: map[string]Deltas{}, + queue: []string{}, + keyFunc: keyFunc, + deltaCompressor: compressor, + knownObjects: knownObjects, + } + f.cond.L = &f.lock + return f +} + +// DeltaFIFO is like FIFO, but allows you to process deletes. +// +// DeltaFIFO is a producer-consumer queue, where a Reflector is +// intended to be the producer, and the consumer is whatever calls +// the Pop() method. +// +// DeltaFIFO solves this use case: +// * You want to process every object change (delta) at most once. +// * When you process an object, you want to see everything +// that's happened to it since you last processed it. +// * You want to process the deletion of objects. +// * You might want to periodically reprocess objects. +// +// DeltaFIFO's Pop(), Get(), and GetByKey() methods return +// interface{} to satisfy the Store/Queue interfaces, but it +// will always return an object of type Deltas. +// +// A note on threading: If you call Pop() in parallel from multiple +// threads, you could end up with multiple threads processing slightly +// different versions of the same object. +// +// A note on the KeyLister used by the DeltaFIFO: It's main purpose is +// to list keys that are "known", for the purpose of figuring out which +// items have been deleted when Replace() or Delete() are called. The deleted +// object will be included in the DeleteFinalStateUnknown markers. These objects +// could be stale. +// +// You may provide a function to compress deltas (e.g., represent a +// series of Updates as a single Update). +type DeltaFIFO struct { + // lock/cond protects access to 'items' and 'queue'. + lock sync.RWMutex + cond sync.Cond + + // We depend on the property that items in the set are in + // the queue and vice versa, and that all Deltas in this + // map have at least one Delta. + items map[string]Deltas + queue []string + + // populated is true if the first batch of items inserted by Replace() has been populated + // or Delete/Add/Update was called first. + populated bool + // initialPopulationCount is the number of items inserted by the first call of Replace() + initialPopulationCount int + + // keyFunc is used to make the key used for queued item + // insertion and retrieval, and should be deterministic. + keyFunc KeyFunc + + // deltaCompressor tells us how to combine two or more + // deltas. It may be nil. + deltaCompressor DeltaCompressor + + // knownObjects list keys that are "known", for the + // purpose of figuring out which items have been deleted + // when Replace() or Delete() is called. + knownObjects KeyListerGetter +} + +var ( + _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue +) + +var ( + // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas + // object with zero length is encountered (should be impossible, + // even if such an object is accidentally produced by a DeltaCompressor-- + // but included for completeness). + ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key") +) + +// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or +// DeletedFinalStateUnknown objects. +func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { + if d, ok := obj.(Deltas); ok { + if len(d) == 0 { + return "", KeyError{obj, ErrZeroLengthDeltasObject} + } + obj = d.Newest().Object + } + if d, ok := obj.(DeletedFinalStateUnknown); ok { + return d.Key, nil + } + return f.keyFunc(obj) +} + +// Return true if an Add/Update/Delete/AddIfNotPresent are called first, +// or an Update called first but the first batch of items inserted by Replace() has been popped +func (f *DeltaFIFO) HasSynced() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.populated && f.initialPopulationCount == 0 +} + +// Add inserts an item, and puts it in the queue. The item is only enqueued +// if it doesn't already exist in the set. +func (f *DeltaFIFO) Add(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + f.populated = true + return f.queueActionLocked(Added, obj) +} + +// Update is just like Add, but makes an Updated Delta. +func (f *DeltaFIFO) Update(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + f.populated = true + return f.queueActionLocked(Updated, obj) +} + +// Delete is just like Add, but makes an Deleted Delta. If the item does not +// already exist, it will be ignored. (It may have already been deleted by a +// Replace (re-list), for example. +func (f *DeltaFIFO) Delete(obj interface{}) error { + id, err := f.KeyOf(obj) + if err != nil { + return KeyError{obj, err} + } + f.lock.Lock() + defer f.lock.Unlock() + f.populated = true + if f.knownObjects == nil { + if _, exists := f.items[id]; !exists { + // Presumably, this was deleted when a relist happened. + // Don't provide a second report of the same deletion. + return nil + } + } else { + // We only want to skip the "deletion" action if the object doesn't + // exist in knownObjects and it doesn't have corresponding item in items. + // Note that even if there is a "deletion" action in items, we can ignore it, + // because it will be deduped automatically in "queueActionLocked" + _, exists, err := f.knownObjects.GetByKey(id) + _, itemsExist := f.items[id] + if err == nil && !exists && !itemsExist { + // Presumably, this was deleted when a relist happened. + // Don't provide a second report of the same deletion. + // TODO(lavalamp): This may be racy-- we aren't properly locked + // with knownObjects. + return nil + } + } + + return f.queueActionLocked(Deleted, obj) +} + +// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already +// present in the set, it is neither enqueued nor added to the set. +// +// This is useful in a single producer/consumer scenario so that the consumer can +// safely retry items without contending with the producer and potentially enqueueing +// stale items. +// +// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is +// different from the Add/Update/Delete functions. +func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { + deltas, ok := obj.(Deltas) + if !ok { + return fmt.Errorf("object must be of type deltas, but got: %#v", obj) + } + id, err := f.KeyOf(deltas.Newest().Object) + if err != nil { + return KeyError{obj, err} + } + f.lock.Lock() + defer f.lock.Unlock() + f.addIfNotPresent(id, deltas) + return nil +} + +// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller +// already holds the fifo lock. +func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { + f.populated = true + if _, exists := f.items[id]; exists { + return + } + + f.queue = append(f.queue, id) + f.items[id] = deltas + f.cond.Broadcast() +} + +// re-listing and watching can deliver the same update multiple times in any +// order. This will combine the most recent two deltas if they are the same. +func dedupDeltas(deltas Deltas) Deltas { + n := len(deltas) + if n < 2 { + return deltas + } + a := &deltas[n-1] + b := &deltas[n-2] + if out := isDup(a, b); out != nil { + d := append(Deltas{}, deltas[:n-2]...) + return append(d, *out) + } + return deltas +} + +// If a & b represent the same event, returns the delta that ought to be kept. +// Otherwise, returns nil. +// TODO: is there anything other than deletions that need deduping? +func isDup(a, b *Delta) *Delta { + if out := isDeletionDup(a, b); out != nil { + return out + } + // TODO: Detect other duplicate situations? Are there any? + return nil +} + +// keep the one with the most information if both are deletions. +func isDeletionDup(a, b *Delta) *Delta { + if b.Type != Deleted || a.Type != Deleted { + return nil + } + // Do more sophisticated checks, or is this sufficient? + if _, ok := b.Object.(DeletedFinalStateUnknown); ok { + return a + } + return b +} + +// willObjectBeDeletedLocked returns true only if the last delta for the +// given object is Delete. Caller must lock first. +func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool { + deltas := f.items[id] + return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted +} + +// queueActionLocked appends to the delta list for the object, calling +// f.deltaCompressor if needed. Caller must lock first. +func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { + id, err := f.KeyOf(obj) + if err != nil { + return KeyError{obj, err} + } + + // If object is supposed to be deleted (last event is Deleted), + // then we should ignore Sync events, because it would result in + // recreation of this object. + if actionType == Sync && f.willObjectBeDeletedLocked(id) { + return nil + } + + newDeltas := append(f.items[id], Delta{actionType, obj}) + newDeltas = dedupDeltas(newDeltas) + if f.deltaCompressor != nil { + newDeltas = f.deltaCompressor.Compress(newDeltas) + } + + _, exists := f.items[id] + if len(newDeltas) > 0 { + if !exists { + f.queue = append(f.queue, id) + } + f.items[id] = newDeltas + f.cond.Broadcast() + } else if exists { + // The compression step removed all deltas, so + // we need to remove this from our map (extra items + // in the queue are ignored if they are not in the + // map). + delete(f.items, id) + } + return nil +} + +// List returns a list of all the items; it returns the object +// from the most recent Delta. +// You should treat the items returned inside the deltas as immutable. +func (f *DeltaFIFO) List() []interface{} { + f.lock.RLock() + defer f.lock.RUnlock() + return f.listLocked() +} + +func (f *DeltaFIFO) listLocked() []interface{} { + list := make([]interface{}, 0, len(f.items)) + for _, item := range f.items { + // Copy item's slice so operations on this slice (delta + // compression) won't interfere with the object we return. + item = copyDeltas(item) + list = append(list, item.Newest().Object) + } + return list +} + +// ListKeys returns a list of all the keys of the objects currently +// in the FIFO. +func (f *DeltaFIFO) ListKeys() []string { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]string, 0, len(f.items)) + for key := range f.items { + list = append(list, key) + } + return list +} + +// Get returns the complete list of deltas for the requested item, +// or sets exists=false. +// You should treat the items returned inside the deltas as immutable. +func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { + key, err := f.KeyOf(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + return f.GetByKey(key) +} + +// GetByKey returns the complete list of deltas for the requested item, +// setting exists=false if that list is empty. +// You should treat the items returned inside the deltas as immutable. +func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { + f.lock.RLock() + defer f.lock.RUnlock() + d, exists := f.items[key] + if exists { + // Copy item's slice so operations on this slice (delta + // compression) won't interfere with the object we return. + d = copyDeltas(d) + } + return d, exists, nil +} + +// Pop blocks until an item is added to the queue, and then returns it. If +// multiple items are ready, they are returned in the order in which they were +// added/updated. The item is removed from the queue (and the store) before it +// is returned, so if you don't successfully process it, you need to add it back +// with AddIfNotPresent(). +// process function is called under lock, so it is safe update data structures +// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc +// may return an instance of ErrRequeue with a nested error to indicate the current +// item should be requeued (equivalent to calling AddIfNotPresent under the lock). +// +// Pop returns a 'Deltas', which has a complete list of all the things +// that happened to the object (deltas) while it was sitting in the queue. +func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { + f.lock.Lock() + defer f.lock.Unlock() + for { + for len(f.queue) == 0 { + f.cond.Wait() + } + id := f.queue[0] + f.queue = f.queue[1:] + item, ok := f.items[id] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } + if !ok { + // Item may have been deleted subsequently. + continue + } + delete(f.items, id) + err := process(item) + if e, ok := err.(ErrRequeue); ok { + f.addIfNotPresent(id, item) + err = e.Err + } + // Don't need to copyDeltas here, because we're transferring + // ownership to the caller. + return item, err + } +} + +// Replace will delete the contents of 'f', using instead the given map. +// 'f' takes ownership of the map, you should not reference the map again +// after calling this function. f's queue is reset, too; upon return, it +// will contain the items in the map, in no particular order. +func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { + f.lock.Lock() + defer f.lock.Unlock() + keys := make(sets.String, len(list)) + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(list) + } + + for _, item := range list { + key, err := f.KeyOf(item) + if err != nil { + return KeyError{item, err} + } + keys.Insert(key) + if err := f.queueActionLocked(Sync, item); err != nil { + return fmt.Errorf("couldn't enqueue object: %v", err) + } + } + + if f.knownObjects == nil { + // Do deletion detection against our own list. + for k, oldItem := range f.items { + if keys.Has(k) { + continue + } + var deletedObj interface{} + if n := oldItem.Newest(); n != nil { + deletedObj = n.Object + } + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { + return err + } + } + return nil + } + + // Detect deletions not already in the queue. + // TODO(lavalamp): This may be racy-- we aren't properly locked + // with knownObjects. Unproven. + knownKeys := f.knownObjects.ListKeys() + for _, k := range knownKeys { + if keys.Has(k) { + continue + } + + deletedObj, exists, err := f.knownObjects.GetByKey(k) + if err != nil { + deletedObj = nil + glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) + } else if !exists { + deletedObj = nil + glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) + } + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { + return err + } + } + return nil +} + +// Resync will send a sync event for each item +func (f *DeltaFIFO) Resync() error { + f.lock.RLock() + defer f.lock.RUnlock() + for _, k := range f.knownObjects.ListKeys() { + obj, exists, err := f.knownObjects.GetByKey(k) + if err != nil { + glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, k) + continue + } else if !exists { + glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", k) + continue + } + + if err := f.queueActionLocked(Sync, obj); err != nil { + return fmt.Errorf("couldn't queue object: %v", err) + } + } + return nil +} + +// A KeyListerGetter is anything that knows how to list its keys and look up by key. +type KeyListerGetter interface { + KeyLister + KeyGetter +} + +// A KeyLister is anything that knows how to list its keys. +type KeyLister interface { + ListKeys() []string +} + +// A KeyGetter is anything that knows how to get the value stored under a given key. +type KeyGetter interface { + GetByKey(key string) (interface{}, bool, error) +} + +// DeltaCompressor is an algorithm that removes redundant changes. +type DeltaCompressor interface { + Compress(Deltas) Deltas +} + +// DeltaCompressorFunc should remove redundant changes; but changes that +// are redundant depend on one's desired semantics, so this is an +// injectable function. +// +// DeltaCompressorFunc adapts a raw function to be a DeltaCompressor. +type DeltaCompressorFunc func(Deltas) Deltas + +// Compress just calls dc. +func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas { + return dc(d) +} + +// DeltaType is the type of a change (addition, deletion, etc) +type DeltaType string + +const ( + Added DeltaType = "Added" + Updated DeltaType = "Updated" + Deleted DeltaType = "Deleted" + // The other types are obvious. You'll get Sync deltas when: + // * A watch expires/errors out and a new list/watch cycle is started. + // * You've turned on periodic syncs. + // (Anything that trigger's DeltaFIFO's Replace() method.) + Sync DeltaType = "Sync" +) + +// Delta is the type stored by a DeltaFIFO. It tells you what change +// happened, and the object's state after* that change. +// +// [*] Unless the change is a deletion, and then you'll get the final +// state of the object before it was deleted. +type Delta struct { + Type DeltaType + Object interface{} +} + +// Deltas is a list of one or more 'Delta's to an individual object. +// The oldest delta is at index 0, the newest delta is the last one. +type Deltas []Delta + +// Oldest is a convenience function that returns the oldest delta, or +// nil if there are no deltas. +func (d Deltas) Oldest() *Delta { + if len(d) > 0 { + return &d[0] + } + return nil +} + +// Newest is a convenience function that returns the newest delta, or +// nil if there are no deltas. +func (d Deltas) Newest() *Delta { + if n := len(d); n > 0 { + return &d[n-1] + } + return nil +} + +// copyDeltas returns a shallow copy of d; that is, it copies the slice but not +// the objects in the slice. This allows Get/List to return an object that we +// know won't be clobbered by a subsequent call to a delta compressor. +func copyDeltas(d Deltas) Deltas { + d2 := make(Deltas, len(d)) + copy(d2, d) + return d2 +} + +// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where +// an object was deleted but the watch deletion event was missed. In this +// case we don't know the final "resting" state of the object, so there's +// a chance the included `Obj` is stale. +type DeletedFinalStateUnknown struct { + Key string + Obj interface{} +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/doc.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/doc.go new file mode 100644 index 0000000..4f593f0 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/doc.go @@ -0,0 +1,24 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package cache is a client-side caching mechanism. It is useful for +// reducing the number of server calls you'd otherwise need to make. +// Reflector watches a server and updates a Store. Two stores are provided; +// one that simply caches objects (for example, to allow a scheduler to +// list currently available nodes), and one that additionally acts as +// a FIFO queue (for example, to allow a scheduler to process incoming +// pods). +package cache diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache.go new file mode 100644 index 0000000..8c5c470 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache.go @@ -0,0 +1,208 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util" +) + +// ExpirationCache implements the store interface +// 1. All entries are automatically time stamped on insert +// a. The key is computed based off the original item/keyFunc +// b. The value inserted under that key is the timestamped item +// 2. Expiration happens lazily on read based on the expiration policy +// a. No item can be inserted into the store while we're expiring +// *any* item in the cache. +// 3. Time-stamps are stripped off unexpired entries before return +// Note that the ExpirationCache is inherently slower than a normal +// threadSafeStore because it takes a write lock every time it checks if +// an item has expired. +type ExpirationCache struct { + cacheStorage ThreadSafeStore + keyFunc KeyFunc + clock util.Clock + expirationPolicy ExpirationPolicy + // expirationLock is a write lock used to guarantee that we don't clobber + // newly inserted objects because of a stale expiration timestamp comparison + expirationLock sync.Mutex +} + +// ExpirationPolicy dictates when an object expires. Currently only abstracted out +// so unittests don't rely on the system clock. +type ExpirationPolicy interface { + IsExpired(obj *timestampedEntry) bool +} + +// TTLPolicy implements a ttl based ExpirationPolicy. +type TTLPolicy struct { + // >0: Expire entries with an age > ttl + // <=0: Don't expire any entry + Ttl time.Duration + + // Clock used to calculate ttl expiration + Clock util.Clock +} + +// IsExpired returns true if the given object is older than the ttl, or it can't +// determine its age. +func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool { + return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl +} + +// timestampedEntry is the only type allowed in a ExpirationCache. +type timestampedEntry struct { + obj interface{} + timestamp time.Time +} + +// getTimestampedEntry returnes the timestampedEntry stored under the given key. +func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) { + item, _ := c.cacheStorage.Get(key) + if tsEntry, ok := item.(*timestampedEntry); ok { + return tsEntry, true + } + return nil, false +} + +// getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't +// already expired. It holds a write lock across deletion. +func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) { + // Prevent all inserts from the time we deem an item as "expired" to when we + // delete it, so an un-expired item doesn't sneak in under the same key, just + // before the Delete. + c.expirationLock.Lock() + defer c.expirationLock.Unlock() + timestampedItem, exists := c.getTimestampedEntry(key) + if !exists { + return nil, false + } + if c.expirationPolicy.IsExpired(timestampedItem) { + glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj) + c.cacheStorage.Delete(key) + return nil, false + } + return timestampedItem.obj, true +} + +// GetByKey returns the item stored under the key, or sets exists=false. +func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error) { + obj, exists := c.getOrExpire(key) + return obj, exists, nil +} + +// Get returns unexpired items. It purges the cache of expired items in the +// process. +func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error) { + key, err := c.keyFunc(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + obj, exists := c.getOrExpire(key) + return obj, exists, nil +} + +// List retrieves a list of unexpired items. It purges the cache of expired +// items in the process. +func (c *ExpirationCache) List() []interface{} { + items := c.cacheStorage.List() + + list := make([]interface{}, 0, len(items)) + for _, item := range items { + obj := item.(*timestampedEntry).obj + if key, err := c.keyFunc(obj); err != nil { + list = append(list, obj) + } else if obj, exists := c.getOrExpire(key); exists { + list = append(list, obj) + } + } + return list +} + +// ListKeys returns a list of all keys in the expiration cache. +func (c *ExpirationCache) ListKeys() []string { + return c.cacheStorage.ListKeys() +} + +// Add timestamps an item and inserts it into the cache, overwriting entries +// that might exist under the same key. +func (c *ExpirationCache) Add(obj interface{}) error { + c.expirationLock.Lock() + defer c.expirationLock.Unlock() + + key, err := c.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + c.cacheStorage.Add(key, ×tampedEntry{obj, c.clock.Now()}) + return nil +} + +// Update has not been implemented yet for lack of a use case, so this method +// simply calls `Add`. This effectively refreshes the timestamp. +func (c *ExpirationCache) Update(obj interface{}) error { + return c.Add(obj) +} + +// Delete removes an item from the cache. +func (c *ExpirationCache) Delete(obj interface{}) error { + c.expirationLock.Lock() + defer c.expirationLock.Unlock() + key, err := c.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + c.cacheStorage.Delete(key) + return nil +} + +// Replace will convert all items in the given list to TimestampedEntries +// before attempting the replace operation. The replace operation will +// delete the contents of the ExpirationCache `c`. +func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error { + c.expirationLock.Lock() + defer c.expirationLock.Unlock() + items := map[string]interface{}{} + ts := c.clock.Now() + for _, item := range list { + key, err := c.keyFunc(item) + if err != nil { + return KeyError{item, err} + } + items[key] = ×tampedEntry{item, ts} + } + c.cacheStorage.Replace(items, resourceVersion) + return nil +} + +// Resync will touch all objects to put them into the processing queue +func (c *ExpirationCache) Resync() error { + return c.cacheStorage.Resync() +} + +// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy +func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store { + return &ExpirationCache{ + cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), + keyFunc: keyFunc, + clock: util.RealClock{}, + expirationPolicy: &TTLPolicy{ttl, util.RealClock{}}, + } +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache_fakes.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache_fakes.go new file mode 100644 index 0000000..eb1d535 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/expiration_cache_fakes.go @@ -0,0 +1,54 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/sets" +) + +type fakeThreadSafeMap struct { + ThreadSafeStore + deletedKeys chan<- string +} + +func (c *fakeThreadSafeMap) Delete(key string) { + if c.deletedKeys != nil { + c.ThreadSafeStore.Delete(key) + c.deletedKeys <- key + } +} + +type FakeExpirationPolicy struct { + NeverExpire sets.String + RetrieveKeyFunc KeyFunc +} + +func (p *FakeExpirationPolicy) IsExpired(obj *timestampedEntry) bool { + key, _ := p.RetrieveKeyFunc(obj) + return !p.NeverExpire.Has(key) +} + +func NewFakeExpirationStore(keyFunc KeyFunc, deletedKeys chan<- string, expirationPolicy ExpirationPolicy, cacheClock util.Clock) Store { + cacheStorage := NewThreadSafeStore(Indexers{}, Indices{}) + return &ExpirationCache{ + cacheStorage: &fakeThreadSafeMap{cacheStorage, deletedKeys}, + keyFunc: keyFunc, + clock: cacheClock, + expirationPolicy: expirationPolicy, + } +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fake_custom_store.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fake_custom_store.go new file mode 100644 index 0000000..8d71c24 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fake_custom_store.go @@ -0,0 +1,102 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +// FakeStore lets you define custom functions for store operations +type FakeCustomStore struct { + AddFunc func(obj interface{}) error + UpdateFunc func(obj interface{}) error + DeleteFunc func(obj interface{}) error + ListFunc func() []interface{} + ListKeysFunc func() []string + GetFunc func(obj interface{}) (item interface{}, exists bool, err error) + GetByKeyFunc func(key string) (item interface{}, exists bool, err error) + ReplaceFunc func(list []interface{}, resourceVerion string) error + ResyncFunc func() error +} + +// Add calls the custom Add function if defined +func (f *FakeCustomStore) Add(obj interface{}) error { + if f.AddFunc != nil { + return f.AddFunc(obj) + } + return nil +} + +// Update calls the custom Update function if defined +func (f *FakeCustomStore) Update(obj interface{}) error { + if f.UpdateFunc != nil { + return f.Update(obj) + } + return nil +} + +// Delete calls the custom Delete function if defined +func (f *FakeCustomStore) Delete(obj interface{}) error { + if f.DeleteFunc != nil { + return f.DeleteFunc(obj) + } + return nil +} + +// List calls the custom List function if defined +func (f *FakeCustomStore) List() []interface{} { + if f.ListFunc != nil { + return f.ListFunc() + } + return nil +} + +// ListKeys calls the custom ListKeys function if defined +func (f *FakeCustomStore) ListKeys() []string { + if f.ListKeysFunc != nil { + return f.ListKeysFunc() + } + return nil +} + +// Get calls the custom Get function if defined +func (f *FakeCustomStore) Get(obj interface{}) (item interface{}, exists bool, err error) { + if f.GetFunc != nil { + return f.GetFunc(obj) + } + return nil, false, nil +} + +// GetByKey calls the custom GetByKey function if defined +func (f *FakeCustomStore) GetByKey(key string) (item interface{}, exists bool, err error) { + if f.GetByKeyFunc != nil { + return f.GetByKeyFunc(key) + } + return nil, false, nil +} + +// Replace calls the custom Replace function if defined +func (f *FakeCustomStore) Replace(list []interface{}, resourceVersion string) error { + if f.ReplaceFunc != nil { + return f.ReplaceFunc(list, resourceVersion) + } + return nil +} + +// Resync calls the custom Resync function if defined +func (f *FakeCustomStore) Resync() error { + if f.ResyncFunc != nil { + return f.ResyncFunc() + } + return nil +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fifo.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fifo.go new file mode 100644 index 0000000..a6d5e0a --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/fifo.go @@ -0,0 +1,321 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sync" + + "k8s.io/kubernetes/pkg/util/sets" +) + +// PopProcessFunc is passed to Pop() method of Queue interface. +// It is supposed to process the element popped from the queue. +type PopProcessFunc func(interface{}) error + +// ErrRequeue may be returned by a PopProcessFunc to safely requeue +// the current item. The value of Err will be returned from Pop. +type ErrRequeue struct { + // Err is returned by the Pop function + Err error +} + +func (e ErrRequeue) Error() string { + if e.Err == nil { + return "the popped item should be requeued without returning an error" + } + return e.Err.Error() +} + +// Queue is exactly like a Store, but has a Pop() method too. +type Queue interface { + Store + + // Pop blocks until it has something to process. + // It returns the object that was process and the result of processing. + // The PopProcessFunc may return an ErrRequeue{...} to indicate the item + // should be requeued before releasing the lock on the queue. + Pop(PopProcessFunc) (interface{}, error) + + // AddIfNotPresent adds a value previously + // returned by Pop back into the queue as long + // as nothing else (presumably more recent) + // has since been added. + AddIfNotPresent(interface{}) error + + // Return true if the first batch of items has been popped + HasSynced() bool +} + +// Helper function for popping from Queue. +// WARNING: Do NOT use this function in non-test code to avoid races +// unless you really really really really know what you are doing. +func Pop(queue Queue) interface{} { + var result interface{} + queue.Pop(func(obj interface{}) error { + result = obj + return nil + }) + return result +} + +// FIFO receives adds and updates from a Reflector, and puts them in a queue for +// FIFO order processing. If multiple adds/updates of a single item happen while +// an item is in the queue before it has been processed, it will only be +// processed once, and when it is processed, the most recent version will be +// processed. This can't be done with a channel. +// +// FIFO solves this use case: +// * You want to process every object (exactly) once. +// * You want to process the most recent version of the object when you process it. +// * You do not want to process deleted objects, they should be removed from the queue. +// * You do not want to periodically reprocess objects. +// Compare with DeltaFIFO for other use cases. +type FIFO struct { + lock sync.RWMutex + cond sync.Cond + // We depend on the property that items in the set are in the queue and vice versa. + items map[string]interface{} + queue []string + + // populated is true if the first batch of items inserted by Replace() has been populated + // or Delete/Add/Update was called first. + populated bool + // initialPopulationCount is the number of items inserted by the first call of Replace() + initialPopulationCount int + + // keyFunc is used to make the key used for queued item insertion and retrieval, and + // should be deterministic. + keyFunc KeyFunc +} + +var ( + _ = Queue(&FIFO{}) // FIFO is a Queue +) + +// Return true if an Add/Update/Delete/AddIfNotPresent are called first, +// or an Update called first but the first batch of items inserted by Replace() has been popped +func (f *FIFO) HasSynced() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.populated && f.initialPopulationCount == 0 +} + +// Add inserts an item, and puts it in the queue. The item is only enqueued +// if it doesn't already exist in the set. +func (f *FIFO) Add(obj interface{}) error { + id, err := f.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + f.lock.Lock() + defer f.lock.Unlock() + f.populated = true + if _, exists := f.items[id]; !exists { + f.queue = append(f.queue, id) + } + f.items[id] = obj + f.cond.Broadcast() + return nil +} + +// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already +// present in the set, it is neither enqueued nor added to the set. +// +// This is useful in a single producer/consumer scenario so that the consumer can +// safely retry items without contending with the producer and potentially enqueueing +// stale items. +func (f *FIFO) AddIfNotPresent(obj interface{}) error { + id, err := f.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + f.lock.Lock() + defer f.lock.Unlock() + f.addIfNotPresent(id, obj) + return nil +} + +// addIfNotPresent assumes the fifo lock is already held and adds the the provided +// item to the queue under id if it does not already exist. +func (f *FIFO) addIfNotPresent(id string, obj interface{}) { + f.populated = true + if _, exists := f.items[id]; exists { + return + } + + f.queue = append(f.queue, id) + f.items[id] = obj + f.cond.Broadcast() +} + +// Update is the same as Add in this implementation. +func (f *FIFO) Update(obj interface{}) error { + return f.Add(obj) +} + +// Delete removes an item. It doesn't add it to the queue, because +// this implementation assumes the consumer only cares about the objects, +// not the order in which they were created/added. +func (f *FIFO) Delete(obj interface{}) error { + id, err := f.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + f.lock.Lock() + defer f.lock.Unlock() + f.populated = true + delete(f.items, id) + return err +} + +// List returns a list of all the items. +func (f *FIFO) List() []interface{} { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]interface{}, 0, len(f.items)) + for _, item := range f.items { + list = append(list, item) + } + return list +} + +// ListKeys returns a list of all the keys of the objects currently +// in the FIFO. +func (f *FIFO) ListKeys() []string { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]string, 0, len(f.items)) + for key := range f.items { + list = append(list, key) + } + return list +} + +// Get returns the requested item, or sets exists=false. +func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { + key, err := f.keyFunc(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + return f.GetByKey(key) +} + +// GetByKey returns the requested item, or sets exists=false. +func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { + f.lock.RLock() + defer f.lock.RUnlock() + item, exists = f.items[key] + return item, exists, nil +} + +// Pop waits until an item is ready and processes it. If multiple items are +// ready, they are returned in the order in which they were added/updated. +// The item is removed from the queue (and the store) before it is processed, +// so if you don't successfully process it, it should be added back with +// AddIfNotPresent(). process function is called under lock, so it is safe +// update data structures in it that need to be in sync with the queue. +func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { + f.lock.Lock() + defer f.lock.Unlock() + for { + for len(f.queue) == 0 { + f.cond.Wait() + } + id := f.queue[0] + f.queue = f.queue[1:] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } + item, ok := f.items[id] + if !ok { + // Item may have been deleted subsequently. + continue + } + delete(f.items, id) + err := process(item) + if e, ok := err.(ErrRequeue); ok { + f.addIfNotPresent(id, item) + err = e.Err + } + return item, err + } +} + +// Replace will delete the contents of 'f', using instead the given map. +// 'f' takes ownership of the map, you should not reference the map again +// after calling this function. f's queue is reset, too; upon return, it +// will contain the items in the map, in no particular order. +func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { + items := map[string]interface{}{} + for _, item := range list { + key, err := f.keyFunc(item) + if err != nil { + return KeyError{item, err} + } + items[key] = item + } + + f.lock.Lock() + defer f.lock.Unlock() + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(items) + } + + f.items = items + f.queue = f.queue[:0] + for id := range items { + f.queue = append(f.queue, id) + } + if len(f.queue) > 0 { + f.cond.Broadcast() + } + return nil +} + +// Resync will touch all objects to put them into the processing queue +func (f *FIFO) Resync() error { + f.lock.Lock() + defer f.lock.Unlock() + + inQueue := sets.NewString() + for _, id := range f.queue { + inQueue.Insert(id) + } + for id := range f.items { + if !inQueue.Has(id) { + f.queue = append(f.queue, id) + } + } + if len(f.queue) > 0 { + f.cond.Broadcast() + } + return nil +} + +// NewFIFO returns a Store which can be used to queue up items to +// process. +func NewFIFO(keyFunc KeyFunc) *FIFO { + f := &FIFO{ + items: map[string]interface{}{}, + queue: []string{}, + keyFunc: keyFunc, + } + f.cond.L = &f.lock + return f +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/index.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/index.go new file mode 100644 index 0000000..4379880 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/index.go @@ -0,0 +1,82 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/util/sets" +) + +// Indexer is a storage interface that lets you list objects using multiple indexing functions +type Indexer interface { + Store + // Retrieve list of objects that match on the named indexing function + Index(indexName string, obj interface{}) ([]interface{}, error) + // ListIndexFuncValues returns the list of generated values of an Index func + ListIndexFuncValues(indexName string) []string + // ByIndex lists object that match on the named indexing function with the exact key + ByIndex(indexName, indexKey string) ([]interface{}, error) + // GetIndexer return the indexers + GetIndexers() Indexers + + // AddIndexers adds more indexers to this store. If you call this after you already have data + // in the store, the results are undefined. + AddIndexers(newIndexers Indexers) error +} + +// IndexFunc knows how to provide an indexed value for an object. +type IndexFunc func(obj interface{}) ([]string, error) + +// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc. This is only useful if your index function returns +// unique values for every object. This is conversion can create errors when more than one key is found. You +// should prefer to make proper key and index functions. +func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc { + return func(obj interface{}) (string, error) { + indexKeys, err := indexFunc(obj) + if err != nil { + return "", err + } + if len(indexKeys) > 1 { + return "", fmt.Errorf("too many keys: %v", indexKeys) + } + return indexKeys[0], nil + } +} + +const ( + NamespaceIndex string = "namespace" +) + +// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace +func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return []string{""}, fmt.Errorf("object has no meta: %v", err) + } + return []string{meta.GetNamespace()}, nil +} + +// Index maps the indexed value to a set of keys in the store that match on that value +type Index map[string]sets.String + +// Indexers maps a name to a IndexFunc +type Indexers map[string]IndexFunc + +// Indices maps a name to an Index +type Indices map[string]Index diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listers.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listers.go new file mode 100644 index 0000000..29e5859 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listers.go @@ -0,0 +1,672 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/labels" +) + +// TODO: generate these classes and methods for all resources of interest using +// a script. Can use "go generate" once 1.4 is supported by all users. + +// StoreToPodLister makes a Store have the List method of the client.PodInterface +// The Store must contain (only) Pods. +// +// Example: +// s := cache.NewStore() +// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"} +// r := cache.NewReflector(lw, &api.Pod{}, s).Run() +// l := StoreToPodLister{s} +// l.List() +type StoreToPodLister struct { + Indexer +} + +// Please note that selector is filtering among the pods that have gotten into +// the store; there may have been some filtering that already happened before +// that. +// +// TODO: converge on the interface in pkg/client. +func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) { + // TODO: it'd be great to just call + // s.Pods(api.NamespaceAll).List(selector), however then we'd have to + // remake the list.Items as a []*api.Pod. So leave this separate for + // now. + for _, m := range s.Indexer.List() { + pod := m.(*api.Pod) + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, pod) + } + } + return pods, nil +} + +// Pods is taking baby steps to be more like the api in pkg/client +func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer { + return storePodsNamespacer{s.Indexer, namespace} +} + +type storePodsNamespacer struct { + indexer Indexer + namespace string +} + +// Please note that selector is filtering among the pods that have gotten into +// the store; there may have been some filtering that already happened before +// that. +func (s storePodsNamespacer) List(selector labels.Selector) (api.PodList, error) { + pods := api.PodList{} + + if s.namespace == api.NamespaceAll { + for _, m := range s.indexer.List() { + pod := m.(*api.Pod) + if selector.Matches(labels.Set(pod.Labels)) { + pods.Items = append(pods.Items, *pod) + } + } + return pods, nil + } + + key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} + items, err := s.indexer.Index(NamespaceIndex, key) + if err != nil { + // Ignore error; do slow search without index. + glog.Warningf("can not retrieve list of objects using index : %v", err) + for _, m := range s.indexer.List() { + pod := m.(*api.Pod) + if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) { + pods.Items = append(pods.Items, *pod) + } + } + return pods, nil + } + for _, m := range items { + pod := m.(*api.Pod) + if selector.Matches(labels.Set(pod.Labels)) { + pods.Items = append(pods.Items, *pod) + } + } + return pods, nil +} + +// Exists returns true if a pod matching the namespace/name of the given pod exists in the store. +func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) { + _, exists, err := s.Indexer.Get(pod) + if err != nil { + return false, err + } + return exists, nil +} + +// NodeConditionPredicate is a function that indicates whether the given node's conditions meet +// some set of criteria defined by the function. +type NodeConditionPredicate func(node *api.Node) bool + +// StoreToNodeLister makes a Store have the List method of the client.NodeInterface +// The Store must contain (only) Nodes. +type StoreToNodeLister struct { + Store +} + +func (s *StoreToNodeLister) List() (machines api.NodeList, err error) { + for _, m := range s.Store.List() { + machines.Items = append(machines.Items, *(m.(*api.Node))) + } + return machines, nil +} + +// NodeCondition returns a storeToNodeConditionLister +func (s *StoreToNodeLister) NodeCondition(predicate NodeConditionPredicate) storeToNodeConditionLister { + // TODO: Move this filtering server side. Currently our selectors don't facilitate searching through a list so we + // have the reflector filter out the Unschedulable field and sift through node conditions in the lister. + return storeToNodeConditionLister{s.Store, predicate} +} + +// storeToNodeConditionLister filters and returns nodes matching the given type and status from the store. +type storeToNodeConditionLister struct { + store Store + predicate NodeConditionPredicate +} + +// List returns a list of nodes that match the conditions defined by the predicate functions in the storeToNodeConditionLister. +func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) { + for _, m := range s.store.List() { + node := m.(*api.Node) + if s.predicate(node) { + nodes.Items = append(nodes.Items, *node) + } else { + glog.V(5).Infof("Node %s matches none of the conditions", node.Name) + } + } + return +} + +// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers. +type StoreToReplicationControllerLister struct { + Indexer +} + +// Exists checks if the given rc exists in the store. +func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) { + _, exists, err := s.Indexer.Get(controller) + if err != nil { + return false, err + } + return exists, nil +} + +// StoreToReplicationControllerLister lists all controllers in the store. +// TODO: converge on the interface in pkg/client +func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) { + for _, c := range s.Indexer.List() { + controllers = append(controllers, *(c.(*api.ReplicationController))) + } + return controllers, nil +} + +func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer { + return storeReplicationControllersNamespacer{s.Indexer, namespace} +} + +type storeReplicationControllersNamespacer struct { + indexer Indexer + namespace string +} + +func (s storeReplicationControllersNamespacer) List(selector labels.Selector) ([]api.ReplicationController, error) { + controllers := []api.ReplicationController{} + + if s.namespace == api.NamespaceAll { + for _, m := range s.indexer.List() { + rc := *(m.(*api.ReplicationController)) + if selector.Matches(labels.Set(rc.Labels)) { + controllers = append(controllers, rc) + } + } + return controllers, nil + } + + key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} + items, err := s.indexer.Index(NamespaceIndex, key) + if err != nil { + // Ignore error; do slow search without index. + glog.Warningf("can not retrieve list of objects using index : %v", err) + for _, m := range s.indexer.List() { + rc := *(m.(*api.ReplicationController)) + if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) { + controllers = append(controllers, rc) + } + } + return controllers, nil + } + for _, m := range items { + rc := *(m.(*api.ReplicationController)) + if selector.Matches(labels.Set(rc.Labels)) { + controllers = append(controllers, rc) + } + } + return controllers, nil +} + +// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found. +func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) { + var selector labels.Selector + var rc api.ReplicationController + + if len(pod.Labels) == 0 { + err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) + return + } + + key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}} + items, err := s.Indexer.Index(NamespaceIndex, key) + if err != nil { + return + } + + for _, m := range items { + rc = *m.(*api.ReplicationController) + labelSet := labels.Set(rc.Spec.Selector) + selector = labels.Set(rc.Spec.Selector).AsSelector() + + // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. + if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + controllers = append(controllers, rc) + } + if len(controllers) == 0 { + err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + +// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments. +type StoreToDeploymentLister struct { + Store +} + +// Exists checks if the given deployment exists in the store. +func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (bool, error) { + _, exists, err := s.Store.Get(deployment) + if err != nil { + return false, err + } + return exists, nil +} + +// StoreToDeploymentLister lists all deployments in the store. +// TODO: converge on the interface in pkg/client +func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, err error) { + for _, c := range s.Store.List() { + deployments = append(deployments, *(c.(*extensions.Deployment))) + } + return deployments, nil +} + +// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found. +func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []extensions.Deployment, err error) { + var d extensions.Deployment + + if len(rs.Labels) == 0 { + err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name) + return + } + + // TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label + for _, m := range s.Store.List() { + d = *m.(*extensions.Deployment) + if d.Namespace != rs.Namespace { + continue + } + + selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %v", err) + } + // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) { + continue + } + deployments = append(deployments, d) + } + if len(deployments) == 0 { + err = fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels) + } + return +} + +// StoreToReplicaSetLister gives a store List and Exists methods. The store must contain only ReplicaSets. +type StoreToReplicaSetLister struct { + Store +} + +// Exists checks if the given ReplicaSet exists in the store. +func (s *StoreToReplicaSetLister) Exists(rs *extensions.ReplicaSet) (bool, error) { + _, exists, err := s.Store.Get(rs) + if err != nil { + return false, err + } + return exists, nil +} + +// List lists all ReplicaSets in the store. +// TODO: converge on the interface in pkg/client +func (s *StoreToReplicaSetLister) List() (rss []extensions.ReplicaSet, err error) { + for _, rs := range s.Store.List() { + rss = append(rss, *(rs.(*extensions.ReplicaSet))) + } + return rss, nil +} + +type storeReplicaSetsNamespacer struct { + store Store + namespace string +} + +func (s storeReplicaSetsNamespacer) List(selector labels.Selector) (rss []extensions.ReplicaSet, err error) { + for _, c := range s.store.List() { + rs := *(c.(*extensions.ReplicaSet)) + if s.namespace == api.NamespaceAll || s.namespace == rs.Namespace { + if selector.Matches(labels.Set(rs.Labels)) { + rss = append(rss, rs) + } + } + } + return +} + +func (s *StoreToReplicaSetLister) ReplicaSets(namespace string) storeReplicaSetsNamespacer { + return storeReplicaSetsNamespacer{s.Store, namespace} +} + +// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found. +func (s *StoreToReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) { + var selector labels.Selector + var rs extensions.ReplicaSet + + if len(pod.Labels) == 0 { + err = fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name) + return + } + + for _, m := range s.Store.List() { + rs = *m.(*extensions.ReplicaSet) + if rs.Namespace != pod.Namespace { + continue + } + selector, err = unversioned.LabelSelectorAsSelector(rs.Spec.Selector) + if err != nil { + err = fmt.Errorf("invalid selector: %v", err) + return + } + + // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + rss = append(rss, rs) + } + if len(rss) == 0 { + err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + +// StoreToDaemonSetLister gives a store List and Exists methods. The store must contain only DaemonSets. +type StoreToDaemonSetLister struct { + Store +} + +// Exists checks if the given daemon set exists in the store. +func (s *StoreToDaemonSetLister) Exists(ds *extensions.DaemonSet) (bool, error) { + _, exists, err := s.Store.Get(ds) + if err != nil { + return false, err + } + return exists, nil +} + +// List lists all daemon sets in the store. +// TODO: converge on the interface in pkg/client +func (s *StoreToDaemonSetLister) List() (dss extensions.DaemonSetList, err error) { + for _, c := range s.Store.List() { + dss.Items = append(dss.Items, *(c.(*extensions.DaemonSet))) + } + return dss, nil +} + +// GetPodDaemonSets returns a list of daemon sets managing a pod. +// Returns an error if and only if no matching daemon sets are found. +func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []extensions.DaemonSet, err error) { + var selector labels.Selector + var daemonSet extensions.DaemonSet + + if len(pod.Labels) == 0 { + err = fmt.Errorf("no daemon sets found for pod %v because it has no labels", pod.Name) + return + } + + for _, m := range s.Store.List() { + daemonSet = *m.(*extensions.DaemonSet) + if daemonSet.Namespace != pod.Namespace { + continue + } + selector, err = unversioned.LabelSelectorAsSelector(daemonSet.Spec.Selector) + if err != nil { + // this should not happen if the DaemonSet passed validation + return nil, err + } + + // If a daemonSet with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + daemonSets = append(daemonSets, daemonSet) + } + if len(daemonSets) == 0 { + err = fmt.Errorf("could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + +// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface +// The Store must contain (only) Services. +type StoreToServiceLister struct { + Store +} + +func (s *StoreToServiceLister) List() (services api.ServiceList, err error) { + for _, m := range s.Store.List() { + services.Items = append(services.Items, *(m.(*api.Service))) + } + return services, nil +} + +// TODO: Move this back to scheduler as a helper function that takes a Store, +// rather than a method of StoreToServiceLister. +func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) { + var selector labels.Selector + var service api.Service + + for _, m := range s.Store.List() { + service = *m.(*api.Service) + // consider only services that are in the same namespace as the pod + if service.Namespace != pod.Namespace { + continue + } + if service.Spec.Selector == nil { + // services with nil selectors match nothing, not everything. + continue + } + selector = labels.Set(service.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + services = append(services, service) + } + } + if len(services) == 0 { + err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + + return +} + +// StoreToEndpointsLister makes a Store that lists endpoints. +type StoreToEndpointsLister struct { + Store +} + +// List lists all endpoints in the store. +func (s *StoreToEndpointsLister) List() (services api.EndpointsList, err error) { + for _, m := range s.Store.List() { + services.Items = append(services.Items, *(m.(*api.Endpoints))) + } + return services, nil +} + +// GetServiceEndpoints returns the endpoints of a service, matched on service name. +func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.Endpoints, err error) { + for _, m := range s.Store.List() { + ep = *m.(*api.Endpoints) + if svc.Name == ep.Name && svc.Namespace == ep.Namespace { + return ep, nil + } + } + err = fmt.Errorf("could not find endpoints for service: %v", svc.Name) + return +} + +// StoreToJobLister gives a store List and Exists methods. The store must contain only Jobs. +type StoreToJobLister struct { + Store +} + +// Exists checks if the given job exists in the store. +func (s *StoreToJobLister) Exists(job *batch.Job) (bool, error) { + _, exists, err := s.Store.Get(job) + if err != nil { + return false, err + } + return exists, nil +} + +// StoreToJobLister lists all jobs in the store. +func (s *StoreToJobLister) List() (jobs batch.JobList, err error) { + for _, c := range s.Store.List() { + jobs.Items = append(jobs.Items, *(c.(*batch.Job))) + } + return jobs, nil +} + +// GetPodJobs returns a list of jobs managing a pod. Returns an error only if no matching jobs are found. +func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) { + var selector labels.Selector + var job batch.Job + + if len(pod.Labels) == 0 { + err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name) + return + } + + for _, m := range s.Store.List() { + job = *m.(*batch.Job) + if job.Namespace != pod.Namespace { + continue + } + + selector, _ = unversioned.LabelSelectorAsSelector(job.Spec.Selector) + if !selector.Matches(labels.Set(pod.Labels)) { + continue + } + jobs = append(jobs, job) + } + if len(jobs) == 0 { + err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + +// Typed wrapper around a store of PersistentVolumes +type StoreToPVFetcher struct { + Store +} + +// GetPersistentVolumeInfo returns cached data for the PersistentVolume 'id'. +func (s *StoreToPVFetcher) GetPersistentVolumeInfo(id string) (*api.PersistentVolume, error) { + o, exists, err := s.Get(&api.PersistentVolume{ObjectMeta: api.ObjectMeta{Name: id}}) + + if err != nil { + return nil, fmt.Errorf("error retrieving PersistentVolume '%v' from cache: %v", id, err) + } + + if !exists { + return nil, fmt.Errorf("PersistentVolume '%v' not found", id) + } + + return o.(*api.PersistentVolume), nil +} + +// Typed wrapper around a store of PersistentVolumeClaims +type StoreToPVCFetcher struct { + Store +} + +// GetPersistentVolumeClaimInfo returns cached data for the PersistentVolumeClaim 'id'. +func (s *StoreToPVCFetcher) GetPersistentVolumeClaimInfo(namespace string, id string) (*api.PersistentVolumeClaim, error) { + o, exists, err := s.Get(&api.PersistentVolumeClaim{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: id}}) + if err != nil { + return nil, fmt.Errorf("error retrieving PersistentVolumeClaim '%s/%s' from cache: %v", namespace, id, err) + } + + if !exists { + return nil, fmt.Errorf("PersistentVolumeClaim '%s/%s' not found", namespace, id) + } + + return o.(*api.PersistentVolumeClaim), nil +} + +// StoreToPetSetLister gives a store List and Exists methods. The store must contain only PetSets. +type StoreToPetSetLister struct { + Store +} + +// Exists checks if the given PetSet exists in the store. +func (s *StoreToPetSetLister) Exists(ps *apps.PetSet) (bool, error) { + _, exists, err := s.Store.Get(ps) + if err != nil { + return false, err + } + return exists, nil +} + +// List lists all PetSets in the store. +func (s *StoreToPetSetLister) List() (psList []apps.PetSet, err error) { + for _, ps := range s.Store.List() { + psList = append(psList, *(ps.(*apps.PetSet))) + } + return psList, nil +} + +type storePetSetsNamespacer struct { + store Store + namespace string +} + +func (s *StoreToPetSetLister) PetSets(namespace string) storePetSetsNamespacer { + return storePetSetsNamespacer{s.Store, namespace} +} + +// GetPodPetSets returns a list of PetSets managing a pod. Returns an error only if no matching PetSets are found. +func (s *StoreToPetSetLister) GetPodPetSets(pod *api.Pod) (psList []apps.PetSet, err error) { + var selector labels.Selector + var ps apps.PetSet + + if len(pod.Labels) == 0 { + err = fmt.Errorf("no PetSets found for pod %v because it has no labels", pod.Name) + return + } + + for _, m := range s.Store.List() { + ps = *m.(*apps.PetSet) + if ps.Namespace != pod.Namespace { + continue + } + selector, err = unversioned.LabelSelectorAsSelector(ps.Spec.Selector) + if err != nil { + err = fmt.Errorf("invalid selector: %v", err) + return + } + + // If a PetSet with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + psList = append(psList, ps) + } + if len(psList) == 0 { + err = fmt.Errorf("could not find PetSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listwatch.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listwatch.go new file mode 100644 index 0000000..ff56c0b --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/listwatch.go @@ -0,0 +1,86 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// ListFunc knows how to list resources +type ListFunc func(options api.ListOptions) (runtime.Object, error) + +// WatchFunc knows how to watch resources +type WatchFunc func(options api.ListOptions) (watch.Interface, error) + +// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. +// It is a convenience function for users of NewReflector, etc. +// ListFunc and WatchFunc must not be nil +type ListWatch struct { + ListFunc ListFunc + WatchFunc WatchFunc +} + +// Getter interface knows how to access Get method from RESTClient. +type Getter interface { + Get() *restclient.Request +} + +// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector. +func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch { + listFunc := func(options api.ListOptions) (runtime.Object, error) { + return c.Get(). + Namespace(namespace). + Resource(resource). + VersionedParams(&options, api.ParameterCodec). + FieldsSelectorParam(fieldSelector). + Do(). + Get() + } + watchFunc := func(options api.ListOptions) (watch.Interface, error) { + return c.Get(). + Prefix("watch"). + Namespace(namespace). + Resource(resource). + VersionedParams(&options, api.ParameterCodec). + FieldsSelectorParam(fieldSelector). + Watch() + } + return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} +} + +func timeoutFromListOptions(options api.ListOptions) time.Duration { + if options.TimeoutSeconds != nil { + return time.Duration(*options.TimeoutSeconds) * time.Second + } + return 0 +} + +// List a set of apiserver resources +func (lw *ListWatch) List(options api.ListOptions) (runtime.Object, error) { + return lw.ListFunc(options) +} + +// Watch a set of apiserver resources +func (lw *ListWatch) Watch(options api.ListOptions) (watch.Interface, error) { + return lw.WatchFunc(options) +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/reflector.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/reflector.go new file mode 100644 index 0000000..e1af63e --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/reflector.go @@ -0,0 +1,423 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "errors" + "fmt" + "io" + "math/rand" + "net" + "net/url" + "reflect" + "regexp" + goruntime "runtime" + "runtime/debug" + "strconv" + "strings" + "sync" + "syscall" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + apierrs "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" +) + +// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. +type ListerWatcher interface { + // List should return a list type object; the Items field will be extracted, and the + // ResourceVersion field will be used to start the watch in the right place. + List(options api.ListOptions) (runtime.Object, error) + // Watch should begin a watch at the specified version. + Watch(options api.ListOptions) (watch.Interface, error) +} + +// Reflector watches a specified resource and causes all changes to be reflected in the given store. +type Reflector struct { + // name identifies this reflector. By default it will be a file:line if possible. + name string + + // The type of object we expect to place in the store. + expectedType reflect.Type + // The destination to sync up with the watch source + store Store + // listerWatcher is used to perform lists and watches. + listerWatcher ListerWatcher + // period controls timing between one watch ending and + // the beginning of the next one. + period time.Duration + resyncPeriod time.Duration + // now() returns current time - exposed for testing purposes + now func() time.Time + // nextResync is approximate time of next resync (0 if not scheduled) + nextResync time.Time + // lastSyncResourceVersion is the resource version token last + // observed when doing a sync with the underlying store + // it is thread safe, but not synchronized with the underlying store + lastSyncResourceVersion string + // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion + lastSyncResourceVersionMutex sync.RWMutex +} + +var ( + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. + // However, it can be modified to avoid periodic resync to break the + // TCP connection. + minWatchTimeout = 5 * time.Minute + // If we are within 'forceResyncThreshold' from the next planned resync + // and are just before issuing Watch(), resync will be forced now. + forceResyncThreshold = 3 * time.Second + // We try to set timeouts for Watch() so that we will finish about + // than 'timeoutThreshold' from next planned periodic resync. + timeoutThreshold = 1 * time.Second +) + +// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector +// The indexer is configured to key on namespace +func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { + indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc}) + reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) + return indexer, reflector +} + +// NewReflector creates a new Reflector object which will keep the given store up to +// date with the server's contents for the given resource. Reflector promises to +// only put things in the store that have the type of expectedType, unless expectedType +// is nil. If resyncPeriod is non-zero, then lists will be executed after every +// resyncPeriod, so that you can use reflectors to periodically process everything as +// well as incrementally processing the things that change. +func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { + return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod) +} + +// NewNamedReflector same as NewReflector, but with a specified name for logging +func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { + r := &Reflector{ + name: name, + listerWatcher: lw, + store: store, + expectedType: reflect.TypeOf(expectedType), + period: time.Second, + resyncPeriod: resyncPeriod, + now: time.Now, + } + return r +} + +// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common +// call chains to NewReflector, so they'd be low entropy names for reflectors +var internalPackages = []string{"kubernetes/pkg/client/cache/", "kubernetes/pkg/controller/framework/", "/runtime/asm_"} + +// getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages +// it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging +func getDefaultReflectorName(ignoredPackages ...string) string { + name := "????" + const maxStack = 10 + for i := 1; i < maxStack; i++ { + _, file, line, ok := goruntime.Caller(i) + if !ok { + file, line, ok = extractStackCreator() + if !ok { + break + } + i += maxStack + } + if hasPackage(file, ignoredPackages) { + continue + } + + file = trimPackagePrefix(file) + name = fmt.Sprintf("%s:%d", file, line) + break + } + return name +} + +// hasPackage returns true if the file is in one of the ignored packages. +func hasPackage(file string, ignoredPackages []string) bool { + for _, ignoredPackage := range ignoredPackages { + if strings.Contains(file, ignoredPackage) { + return true + } + } + return false +} + +// trimPackagePrefix reduces dulpicate values off the front of a package name. +func trimPackagePrefix(file string) string { + if l := strings.LastIndex(file, "k8s.io/kubernetes/pkg/"); l >= 0 { + return file[l+len("k8s.io/kubernetes/"):] + } + if l := strings.LastIndex(file, "/src/"); l >= 0 { + return file[l+5:] + } + if l := strings.LastIndex(file, "/pkg/"); l >= 0 { + return file[l+1:] + } + return file +} + +var stackCreator = regexp.MustCompile(`(?m)^created by (.*)\n\s+(.*):(\d+) \+0x[[:xdigit:]]+$`) + +// extractStackCreator retrieves the goroutine file and line that launched this stack. Returns false +// if the creator cannot be located. +// TODO: Go does not expose this via runtime https://github.com/golang/go/issues/11440 +func extractStackCreator() (string, int, bool) { + stack := debug.Stack() + matches := stackCreator.FindStringSubmatch(string(stack)) + if matches == nil || len(matches) != 4 { + return "", 0, false + } + line, err := strconv.Atoi(matches[3]) + if err != nil { + return "", 0, false + } + return matches[2], line, true +} + +// Run starts a watch and handles watch events. Will restart the watch if it is closed. +// Run starts a goroutine and returns immediately. +func (r *Reflector) Run() { + glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) + go wait.Until(func() { + if err := r.ListAndWatch(wait.NeverStop); err != nil { + utilruntime.HandleError(err) + } + }, r.period, wait.NeverStop) +} + +// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed. +// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed. +func (r *Reflector) RunUntil(stopCh <-chan struct{}) { + glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) + go wait.Until(func() { + if err := r.ListAndWatch(stopCh); err != nil { + utilruntime.HandleError(err) + } + }, r.period, stopCh) +} + +var ( + // nothing will ever be sent down this channel + neverExitWatch <-chan time.Time = make(chan time.Time) + + // Used to indicate that watching stopped so that a resync could happen. + errorResyncRequested = errors.New("resync channel fired") + + // Used to indicate that watching stopped because of a signal from the stop + // channel passed in from a client of the reflector. + errorStopRequested = errors.New("Stop requested") +) + +// resyncChan returns a channel which will receive something when a resync is +// required, and a cleanup function. +func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { + if r.resyncPeriod == 0 { + r.nextResync = time.Time{} + return neverExitWatch, func() bool { return false } + } + // The cleanup function is required: imagine the scenario where watches + // always fail so we end up listing frequently. Then, if we don't + // manually stop the timer, we could end up with many timers active + // concurrently. + r.nextResync = r.now().Add(r.resyncPeriod) + t := time.NewTimer(r.resyncPeriod) + return t.C, t.Stop +} + +// ListAndWatch first lists all items and get the resource version at the moment of call, +// and then use the resource version to watch. +// It returns error if ListAndWatch didn't even try to initialize watch. +func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { + glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name) + var resourceVersion string + resyncCh, cleanup := r.resyncChan() + defer cleanup() + + // Explicitly set "0" as resource version - it's fine for the List() + // to be served from cache and potentially be delayed relative to + // etcd contents. Reflector framework will catch up via Watch() eventually. + options := api.ListOptions{ResourceVersion: "0"} + list, err := r.listerWatcher.List(options) + if err != nil { + return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) + } + listMetaInterface, err := meta.ListAccessor(list) + if err != nil { + return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) + } + resourceVersion = listMetaInterface.GetResourceVersion() + items, err := meta.ExtractList(list) + if err != nil { + return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) + } + if err := r.syncWith(items, resourceVersion); err != nil { + return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) + } + r.setLastSyncResourceVersion(resourceVersion) + + resyncerrc := make(chan error, 1) + go func() { + for { + select { + case <-resyncCh: + case <-stopCh: + return + } + glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync) + if err := r.store.Resync(); err != nil { + resyncerrc <- err + return + } + cleanup() + resyncCh, cleanup = r.resyncChan() + } + }() + + for { + timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + options = api.ListOptions{ + ResourceVersion: resourceVersion, + // We want to avoid situations of hanging watchers. Stop any wachers that do not + // receive any events within the timeout window. + TimeoutSeconds: &timemoutseconds, + } + + w, err := r.listerWatcher.Watch(options) + if err != nil { + switch err { + case io.EOF: + // watch closed normally + case io.ErrUnexpectedEOF: + glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) + default: + utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) + } + // If this is "connection refused" error, it means that most likely apiserver is not responsive. + // It doesn't make sense to re-list all objects because most likely we will be able to restart + // watch where we ended. + // If that's the case wait and resend watch request. + if urlError, ok := err.(*url.Error); ok { + if opError, ok := urlError.Err.(*net.OpError); ok { + if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { + time.Sleep(time.Second) + continue + } + } + } + return nil + } + + if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { + if err != errorStopRequested { + glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) + } + return nil + } + } +} + +// syncWith replaces the store's items with the given list. +func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { + found := make([]interface{}, 0, len(items)) + for _, item := range items { + found = append(found, item) + } + return r.store.Replace(found, resourceVersion) +} + +// watchHandler watches w and keeps *resourceVersion up to date. +func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { + start := time.Now() + eventCount := 0 + + // Stopping the watcher should be idempotent and if we return from this function there's no way + // we're coming back in with the same watch interface. + defer w.Stop() + +loop: + for { + select { + case <-stopCh: + return errorStopRequested + case err := <-errc: + return err + case event, ok := <-w.ResultChan(): + if !ok { + break loop + } + if event.Type == watch.Error { + return apierrs.FromObject(event.Object) + } + if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a { + utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) + continue + } + meta, err := meta.Accessor(event.Object) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) + continue + } + newResourceVersion := meta.GetResourceVersion() + switch event.Type { + case watch.Added: + r.store.Add(event.Object) + case watch.Modified: + r.store.Update(event.Object) + case watch.Deleted: + // TODO: Will any consumers need access to the "last known + // state", which is passed in event.Object? If so, may need + // to change this. + r.store.Delete(event.Object) + default: + utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) + } + *resourceVersion = newResourceVersion + r.setLastSyncResourceVersion(newResourceVersion) + eventCount++ + } + } + + watchDuration := time.Now().Sub(start) + if watchDuration < 1*time.Second && eventCount == 0 { + glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name) + return errors.New("very short watch") + } + glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount) + return nil +} + +// LastSyncResourceVersion is the resource version observed when last sync with the underlying store +// The value returned is not synchronized with access to the underlying store and is not thread-safe +func (r *Reflector) LastSyncResourceVersion() string { + r.lastSyncResourceVersionMutex.RLock() + defer r.lastSyncResourceVersionMutex.RUnlock() + return r.lastSyncResourceVersion +} + +func (r *Reflector) setLastSyncResourceVersion(v string) { + r.lastSyncResourceVersionMutex.Lock() + defer r.lastSyncResourceVersionMutex.Unlock() + r.lastSyncResourceVersion = v +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/store.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/store.go new file mode 100644 index 0000000..4cd2479 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/store.go @@ -0,0 +1,240 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "strings" + + "k8s.io/kubernetes/pkg/api/meta" +) + +// Store is a generic object storage interface. Reflector knows how to watch a server +// and update a store. A generic store is provided, which allows Reflector to be used +// as a local caching system, and an LRU store, which allows Reflector to work like a +// queue of items yet to be processed. +// +// Store makes no assumptions about stored object identity; it is the responsibility +// of a Store implementation to provide a mechanism to correctly key objects and to +// define the contract for obtaining objects by some arbitrary key type. +type Store interface { + Add(obj interface{}) error + Update(obj interface{}) error + Delete(obj interface{}) error + List() []interface{} + ListKeys() []string + Get(obj interface{}) (item interface{}, exists bool, err error) + GetByKey(key string) (item interface{}, exists bool, err error) + + // Replace will delete the contents of the store, using instead the + // given list. Store takes ownership of the list, you should not reference + // it after calling this function. + Replace([]interface{}, string) error + Resync() error +} + +// KeyFunc knows how to make a key from an object. Implementations should be deterministic. +type KeyFunc func(obj interface{}) (string, error) + +// KeyError will be returned any time a KeyFunc gives an error; it includes the object +// at fault. +type KeyError struct { + Obj interface{} + Err error +} + +// Error gives a human-readable description of the error. +func (k KeyError) Error() string { + return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err) +} + +// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for +// the object but not the object itself. +type ExplicitKey string + +// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make +// keys for API objects which implement meta.Interface. +// The key uses the format <namespace>/<name> unless <namespace> is empty, then +// it's just <name>. +// +// TODO: replace key-as-string with a key-as-struct so that this +// packing/unpacking won't be necessary. +func MetaNamespaceKeyFunc(obj interface{}) (string, error) { + if key, ok := obj.(ExplicitKey); ok { + return string(key), nil + } + meta, err := meta.Accessor(obj) + if err != nil { + return "", fmt.Errorf("object has no meta: %v", err) + } + if len(meta.GetNamespace()) > 0 { + return meta.GetNamespace() + "/" + meta.GetName(), nil + } + return meta.GetName(), nil +} + +// SplitMetaNamespaceKey returns the namespace and name that +// MetaNamespaceKeyFunc encoded into key. +// +// TODO: replace key-as-string with a key-as-struct so that this +// packing/unpacking won't be necessary. +func SplitMetaNamespaceKey(key string) (namespace, name string, err error) { + parts := strings.Split(key, "/") + switch len(parts) { + case 1: + // name only, no namespace + return "", parts[0], nil + case 2: + // namespace and name + return parts[0], parts[1], nil + } + + return "", "", fmt.Errorf("unexpected key format: %q", key) +} + +// cache responsibilities are limited to: +// 1. Computing keys for objects via keyFunc +// 2. Invoking methods of a ThreadSafeStorage interface +type cache struct { + // cacheStorage bears the burden of thread safety for the cache + cacheStorage ThreadSafeStore + // keyFunc is used to make the key for objects stored in and retrieved from items, and + // should be deterministic. + keyFunc KeyFunc +} + +var _ Store = &cache{} + +// Add inserts an item into the cache. +func (c *cache) Add(obj interface{}) error { + key, err := c.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + c.cacheStorage.Add(key, obj) + return nil +} + +// Update sets an item in the cache to its updated state. +func (c *cache) Update(obj interface{}) error { + key, err := c.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + c.cacheStorage.Update(key, obj) + return nil +} + +// Delete removes an item from the cache. +func (c *cache) Delete(obj interface{}) error { + key, err := c.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + c.cacheStorage.Delete(key) + return nil +} + +// List returns a list of all the items. +// List is completely threadsafe as long as you treat all items as immutable. +func (c *cache) List() []interface{} { + return c.cacheStorage.List() +} + +// ListKeys returns a list of all the keys of the objects currently +// in the cache. +func (c *cache) ListKeys() []string { + return c.cacheStorage.ListKeys() +} + +// GetIndexers returns the indexers of cache +func (c *cache) GetIndexers() Indexers { + return c.cacheStorage.GetIndexers() +} + +// Index returns a list of items that match on the index function +// Index is thread-safe so long as you treat all items as immutable +func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) { + return c.cacheStorage.Index(indexName, obj) +} + +// ListIndexFuncValues returns the list of generated values of an Index func +func (c *cache) ListIndexFuncValues(indexName string) []string { + return c.cacheStorage.ListIndexFuncValues(indexName) +} + +func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) { + return c.cacheStorage.ByIndex(indexName, indexKey) +} + +func (c *cache) AddIndexers(newIndexers Indexers) error { + return c.cacheStorage.AddIndexers(newIndexers) +} + +// Get returns the requested item, or sets exists=false. +// Get is completely threadsafe as long as you treat all items as immutable. +func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) { + key, err := c.keyFunc(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + return c.GetByKey(key) +} + +// GetByKey returns the request item, or exists=false. +// GetByKey is completely threadsafe as long as you treat all items as immutable. +func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) { + item, exists = c.cacheStorage.Get(key) + return item, exists, nil +} + +// Replace will delete the contents of 'c', using instead the given list. +// 'c' takes ownership of the list, you should not reference the list again +// after calling this function. +func (c *cache) Replace(list []interface{}, resourceVersion string) error { + items := map[string]interface{}{} + for _, item := range list { + key, err := c.keyFunc(item) + if err != nil { + return KeyError{item, err} + } + items[key] = item + } + c.cacheStorage.Replace(items, resourceVersion) + return nil +} + +// Resync touches all items in the store to force processing +func (c *cache) Resync() error { + return c.cacheStorage.Resync() +} + +// NewStore returns a Store implemented simply with a map and a lock. +func NewStore(keyFunc KeyFunc) Store { + return &cache{ + cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), + keyFunc: keyFunc, + } +} + +// NewIndexer returns an Indexer implemented simply with a map and a lock. +func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { + return &cache{ + cacheStorage: NewThreadSafeStore(indexers, Indices{}), + keyFunc: keyFunc, + } +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/thread_safe_store.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/thread_safe_store.go new file mode 100644 index 0000000..9d88ce3 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/thread_safe_store.go @@ -0,0 +1,288 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "sync" + + "k8s.io/kubernetes/pkg/util/sets" +) + +// ThreadSafeStore is an interface that allows concurrent access to a storage backend. +// TL;DR caveats: you must not modify anything returned by Get or List as it will break +// the indexing feature in addition to not being thread safe. +// +// The guarantees of thread safety provided by List/Get are only valid if the caller +// treats returned items as read-only. For example, a pointer inserted in the store +// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get` +// on the same key and modify the pointer in a non-thread-safe way. Also note that +// modifying objects stored by the indexers (if any) will *not* automatically lead +// to a re-index. So it's not a good idea to directly modify the objects returned by +// Get/List, in general. +type ThreadSafeStore interface { + Add(key string, obj interface{}) + Update(key string, obj interface{}) + Delete(key string) + Get(key string) (item interface{}, exists bool) + List() []interface{} + ListKeys() []string + Replace(map[string]interface{}, string) + Index(indexName string, obj interface{}) ([]interface{}, error) + ListIndexFuncValues(name string) []string + ByIndex(indexName, indexKey string) ([]interface{}, error) + GetIndexers() Indexers + + // AddIndexers adds more indexers to this store. If you call this after you already have data + // in the store, the results are undefined. + AddIndexers(newIndexers Indexers) error + Resync() error +} + +// threadSafeMap implements ThreadSafeStore +type threadSafeMap struct { + lock sync.RWMutex + items map[string]interface{} + + // indexers maps a name to an IndexFunc + indexers Indexers + // indices maps a name to an Index + indices Indices +} + +func (c *threadSafeMap) Add(key string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + oldObject := c.items[key] + c.items[key] = obj + c.updateIndices(oldObject, obj, key) +} + +func (c *threadSafeMap) Update(key string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + oldObject := c.items[key] + c.items[key] = obj + c.updateIndices(oldObject, obj, key) +} + +func (c *threadSafeMap) Delete(key string) { + c.lock.Lock() + defer c.lock.Unlock() + if obj, exists := c.items[key]; exists { + c.deleteFromIndices(obj, key) + delete(c.items, key) + } +} + +func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) { + c.lock.RLock() + defer c.lock.RUnlock() + item, exists = c.items[key] + return item, exists +} + +func (c *threadSafeMap) List() []interface{} { + c.lock.RLock() + defer c.lock.RUnlock() + list := make([]interface{}, 0, len(c.items)) + for _, item := range c.items { + list = append(list, item) + } + return list +} + +// ListKeys returns a list of all the keys of the objects currently +// in the threadSafeMap. +func (c *threadSafeMap) ListKeys() []string { + c.lock.RLock() + defer c.lock.RUnlock() + list := make([]string, 0, len(c.items)) + for key := range c.items { + list = append(list, key) + } + return list +} + +func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) { + c.lock.Lock() + defer c.lock.Unlock() + c.items = items + + // rebuild any index + c.indices = Indices{} + for key, item := range c.items { + c.updateIndices(nil, item, key) + } +} + +// Index returns a list of items that match on the index function +// Index is thread-safe so long as you treat all items as immutable +func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + indexFunc := c.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + indexKeys, err := indexFunc(obj) + if err != nil { + return nil, err + } + index := c.indices[indexName] + + // need to de-dupe the return list. Since multiple keys are allowed, this can happen. + returnKeySet := sets.String{} + for _, indexKey := range indexKeys { + set := index[indexKey] + for _, key := range set.List() { + returnKeySet.Insert(key) + } + } + + list := make([]interface{}, 0, returnKeySet.Len()) + for absoluteKey := range returnKeySet { + list = append(list, c.items[absoluteKey]) + } + return list, nil +} + +// ByIndex returns a list of items that match an exact value on the index function +func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + indexFunc := c.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + index := c.indices[indexName] + + set := index[indexKey] + list := make([]interface{}, 0, set.Len()) + for _, key := range set.List() { + list = append(list, c.items[key]) + } + + return list, nil +} + +func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string { + c.lock.RLock() + defer c.lock.RUnlock() + + index := c.indices[indexName] + names := make([]string, 0, len(index)) + for key := range index { + names = append(names, key) + } + return names +} + +func (c *threadSafeMap) GetIndexers() Indexers { + return c.indexers +} + +func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { + c.lock.Lock() + defer c.lock.Unlock() + + if len(c.items) > 0 { + return fmt.Errorf("cannot add indexers to running index") + } + + oldKeys := sets.StringKeySet(c.indexers) + newKeys := sets.StringKeySet(newIndexers) + + if oldKeys.HasAny(newKeys.List()...) { + return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) + } + + for k, v := range newIndexers { + c.indexers[k] = v + } + return nil +} + +// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj +// updateIndices must be called from a function that already has a lock on the cache +func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error { + // if we got an old object, we need to remove it before we add it again + if oldObj != nil { + c.deleteFromIndices(oldObj, key) + } + for name, indexFunc := range c.indexers { + indexValues, err := indexFunc(newObj) + if err != nil { + return err + } + index := c.indices[name] + if index == nil { + index = Index{} + c.indices[name] = index + } + + for _, indexValue := range indexValues { + set := index[indexValue] + if set == nil { + set = sets.String{} + index[indexValue] = set + } + set.Insert(key) + } + } + return nil +} + +// deleteFromIndices removes the object from each of the managed indexes +// it is intended to be called from a function that already has a lock on the cache +func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error { + for name, indexFunc := range c.indexers { + indexValues, err := indexFunc(obj) + if err != nil { + return err + } + + index := c.indices[name] + if index == nil { + continue + } + for _, indexValue := range indexValues { + set := index[indexValue] + if set != nil { + set.Delete(key) + } + } + } + return nil +} + +func (c *threadSafeMap) Resync() error { + // Nothing to do + return nil +} + +func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { + return &threadSafeMap{ + items: map[string]interface{}{}, + indexers: indexers, + indices: indices, + } +} diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/undelta_store.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/undelta_store.go new file mode 100644 index 0000000..117df46 --- /dev/null +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/client/cache/undelta_store.go @@ -0,0 +1,83 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +// UndeltaStore listens to incremental updates and sends complete state on every change. +// It implements the Store interface so that it can receive a stream of mirrored objects +// from Reflector. Whenever it receives any complete (Store.Replace) or incremental change +// (Store.Add, Store.Update, Store.Delete), it sends the complete state by calling PushFunc. +// It is thread-safe. It guarantees that every change (Add, Update, Replace, Delete) results +// in one call to PushFunc, but sometimes PushFunc may be called twice with the same values. +// PushFunc should be thread safe. +type UndeltaStore struct { + Store + PushFunc func([]interface{}) +} + +// Assert that it implements the Store interface. +var _ Store = &UndeltaStore{} + +// Note about thread safety. The Store implementation (cache.cache) uses a lock for all methods. +// In the functions below, the lock gets released and reacquired betweend the {Add,Delete,etc} +// and the List. So, the following can happen, resulting in two identical calls to PushFunc. +// time thread 1 thread 2 +// 0 UndeltaStore.Add(a) +// 1 UndeltaStore.Add(b) +// 2 Store.Add(a) +// 3 Store.Add(b) +// 4 Store.List() -> [a,b] +// 5 Store.List() -> [a,b] + +func (u *UndeltaStore) Add(obj interface{}) error { + if err := u.Store.Add(obj); err != nil { + return err + } + u.PushFunc(u.Store.List()) + return nil +} + +func (u *UndeltaStore) Update(obj interface{}) error { + if err := u.Store.Update(obj); err != nil { + return err + } + u.PushFunc(u.Store.List()) + return nil +} + +func (u *UndeltaStore) Delete(obj interface{}) error { + if err := u.Store.Delete(obj); err != nil { + return err + } + u.PushFunc(u.Store.List()) + return nil +} + +func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error { + if err := u.Store.Replace(list, resourceVersion); err != nil { + return err + } + u.PushFunc(u.Store.List()) + return nil +} + +// NewUndeltaStore returns an UndeltaStore implemented with a Store. +func NewUndeltaStore(pushFunc func([]interface{}), keyFunc KeyFunc) *UndeltaStore { + return &UndeltaStore{ + Store: NewStore(keyFunc), + PushFunc: pushFunc, + } +} |