aboutsummaryrefslogtreecommitdiffstats
path: root/kube2msb/src/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go
diff options
context:
space:
mode:
Diffstat (limited to 'kube2msb/src/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go')
-rw-r--r--kube2msb/src/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go613
1 files changed, 0 insertions, 613 deletions
diff --git a/kube2msb/src/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go b/kube2msb/src/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go
deleted file mode 100644
index 5a7f4a9..0000000
--- a/kube2msb/src/vendor/k8s.io/kubernetes/pkg/client/cache/delta_fifo.go
+++ /dev/null
@@ -1,613 +0,0 @@
-/*
-Copyright 2014 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package cache
-
-import (
- "errors"
- "fmt"
- "sync"
-
- "k8s.io/kubernetes/pkg/util/sets"
-
- "github.com/golang/glog"
-)
-
-// NewDeltaFIFO returns a Store which can be used process changes to items.
-//
-// keyFunc is used to figure out what key an object should have. (It's
-// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
-//
-// 'compressor' may compress as many or as few items as it wants
-// (including returning an empty slice), but it should do what it
-// does quickly since it is called while the queue is locked.
-// 'compressor' may be nil if you don't want any delta compression.
-//
-// 'keyLister' is expected to return a list of keys that the consumer of
-// this queue "knows about". It is used to decide which items are missing
-// when Replace() is called; 'Deleted' deltas are produced for these items.
-// It may be nil if you don't need to detect all deletions.
-// TODO: consider merging keyLister with this object, tracking a list of
-// "known" keys when Pop() is called. Have to think about how that
-// affects error retrying.
-// TODO(lavalamp): I believe there is a possible race only when using an
-// external known object source that the above TODO would
-// fix.
-//
-// Also see the comment on DeltaFIFO.
-func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
- f := &DeltaFIFO{
- items: map[string]Deltas{},
- queue: []string{},
- keyFunc: keyFunc,
- deltaCompressor: compressor,
- knownObjects: knownObjects,
- }
- f.cond.L = &f.lock
- return f
-}
-
-// DeltaFIFO is like FIFO, but allows you to process deletes.
-//
-// DeltaFIFO is a producer-consumer queue, where a Reflector is
-// intended to be the producer, and the consumer is whatever calls
-// the Pop() method.
-//
-// DeltaFIFO solves this use case:
-// * You want to process every object change (delta) at most once.
-// * When you process an object, you want to see everything
-// that's happened to it since you last processed it.
-// * You want to process the deletion of objects.
-// * You might want to periodically reprocess objects.
-//
-// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
-// interface{} to satisfy the Store/Queue interfaces, but it
-// will always return an object of type Deltas.
-//
-// A note on threading: If you call Pop() in parallel from multiple
-// threads, you could end up with multiple threads processing slightly
-// different versions of the same object.
-//
-// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
-// to list keys that are "known", for the purpose of figuring out which
-// items have been deleted when Replace() or Delete() are called. The deleted
-// object will be included in the DeleteFinalStateUnknown markers. These objects
-// could be stale.
-//
-// You may provide a function to compress deltas (e.g., represent a
-// series of Updates as a single Update).
-type DeltaFIFO struct {
- // lock/cond protects access to 'items' and 'queue'.
- lock sync.RWMutex
- cond sync.Cond
-
- // We depend on the property that items in the set are in
- // the queue and vice versa, and that all Deltas in this
- // map have at least one Delta.
- items map[string]Deltas
- queue []string
-
- // populated is true if the first batch of items inserted by Replace() has been populated
- // or Delete/Add/Update was called first.
- populated bool
- // initialPopulationCount is the number of items inserted by the first call of Replace()
- initialPopulationCount int
-
- // keyFunc is used to make the key used for queued item
- // insertion and retrieval, and should be deterministic.
- keyFunc KeyFunc
-
- // deltaCompressor tells us how to combine two or more
- // deltas. It may be nil.
- deltaCompressor DeltaCompressor
-
- // knownObjects list keys that are "known", for the
- // purpose of figuring out which items have been deleted
- // when Replace() or Delete() is called.
- knownObjects KeyListerGetter
-}
-
-var (
- _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
-)
-
-var (
- // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
- // object with zero length is encountered (should be impossible,
- // even if such an object is accidentally produced by a DeltaCompressor--
- // but included for completeness).
- ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
-)
-
-// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
-// DeletedFinalStateUnknown objects.
-func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
- if d, ok := obj.(Deltas); ok {
- if len(d) == 0 {
- return "", KeyError{obj, ErrZeroLengthDeltasObject}
- }
- obj = d.Newest().Object
- }
- if d, ok := obj.(DeletedFinalStateUnknown); ok {
- return d.Key, nil
- }
- return f.keyFunc(obj)
-}
-
-// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
-// or an Update called first but the first batch of items inserted by Replace() has been popped
-func (f *DeltaFIFO) HasSynced() bool {
- f.lock.Lock()
- defer f.lock.Unlock()
- return f.populated && f.initialPopulationCount == 0
-}
-
-// Add inserts an item, and puts it in the queue. The item is only enqueued
-// if it doesn't already exist in the set.
-func (f *DeltaFIFO) Add(obj interface{}) error {
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- return f.queueActionLocked(Added, obj)
-}
-
-// Update is just like Add, but makes an Updated Delta.
-func (f *DeltaFIFO) Update(obj interface{}) error {
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- return f.queueActionLocked(Updated, obj)
-}
-
-// Delete is just like Add, but makes an Deleted Delta. If the item does not
-// already exist, it will be ignored. (It may have already been deleted by a
-// Replace (re-list), for example.
-func (f *DeltaFIFO) Delete(obj interface{}) error {
- id, err := f.KeyOf(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- if f.knownObjects == nil {
- if _, exists := f.items[id]; !exists {
- // Presumably, this was deleted when a relist happened.
- // Don't provide a second report of the same deletion.
- return nil
- }
- } else {
- // We only want to skip the "deletion" action if the object doesn't
- // exist in knownObjects and it doesn't have corresponding item in items.
- // Note that even if there is a "deletion" action in items, we can ignore it,
- // because it will be deduped automatically in "queueActionLocked"
- _, exists, err := f.knownObjects.GetByKey(id)
- _, itemsExist := f.items[id]
- if err == nil && !exists && !itemsExist {
- // Presumably, this was deleted when a relist happened.
- // Don't provide a second report of the same deletion.
- // TODO(lavalamp): This may be racy-- we aren't properly locked
- // with knownObjects.
- return nil
- }
- }
-
- return f.queueActionLocked(Deleted, obj)
-}
-
-// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
-// present in the set, it is neither enqueued nor added to the set.
-//
-// This is useful in a single producer/consumer scenario so that the consumer can
-// safely retry items without contending with the producer and potentially enqueueing
-// stale items.
-//
-// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
-// different from the Add/Update/Delete functions.
-func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
- deltas, ok := obj.(Deltas)
- if !ok {
- return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
- }
- id, err := f.KeyOf(deltas.Newest().Object)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.addIfNotPresent(id, deltas)
- return nil
-}
-
-// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
-// already holds the fifo lock.
-func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
- f.populated = true
- if _, exists := f.items[id]; exists {
- return
- }
-
- f.queue = append(f.queue, id)
- f.items[id] = deltas
- f.cond.Broadcast()
-}
-
-// re-listing and watching can deliver the same update multiple times in any
-// order. This will combine the most recent two deltas if they are the same.
-func dedupDeltas(deltas Deltas) Deltas {
- n := len(deltas)
- if n < 2 {
- return deltas
- }
- a := &deltas[n-1]
- b := &deltas[n-2]
- if out := isDup(a, b); out != nil {
- d := append(Deltas{}, deltas[:n-2]...)
- return append(d, *out)
- }
- return deltas
-}
-
-// If a & b represent the same event, returns the delta that ought to be kept.
-// Otherwise, returns nil.
-// TODO: is there anything other than deletions that need deduping?
-func isDup(a, b *Delta) *Delta {
- if out := isDeletionDup(a, b); out != nil {
- return out
- }
- // TODO: Detect other duplicate situations? Are there any?
- return nil
-}
-
-// keep the one with the most information if both are deletions.
-func isDeletionDup(a, b *Delta) *Delta {
- if b.Type != Deleted || a.Type != Deleted {
- return nil
- }
- // Do more sophisticated checks, or is this sufficient?
- if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
- return a
- }
- return b
-}
-
-// willObjectBeDeletedLocked returns true only if the last delta for the
-// given object is Delete. Caller must lock first.
-func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
- deltas := f.items[id]
- return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
-}
-
-// queueActionLocked appends to the delta list for the object, calling
-// f.deltaCompressor if needed. Caller must lock first.
-func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
- id, err := f.KeyOf(obj)
- if err != nil {
- return KeyError{obj, err}
- }
-
- // If object is supposed to be deleted (last event is Deleted),
- // then we should ignore Sync events, because it would result in
- // recreation of this object.
- if actionType == Sync && f.willObjectBeDeletedLocked(id) {
- return nil
- }
-
- newDeltas := append(f.items[id], Delta{actionType, obj})
- newDeltas = dedupDeltas(newDeltas)
- if f.deltaCompressor != nil {
- newDeltas = f.deltaCompressor.Compress(newDeltas)
- }
-
- _, exists := f.items[id]
- if len(newDeltas) > 0 {
- if !exists {
- f.queue = append(f.queue, id)
- }
- f.items[id] = newDeltas
- f.cond.Broadcast()
- } else if exists {
- // The compression step removed all deltas, so
- // we need to remove this from our map (extra items
- // in the queue are ignored if they are not in the
- // map).
- delete(f.items, id)
- }
- return nil
-}
-
-// List returns a list of all the items; it returns the object
-// from the most recent Delta.
-// You should treat the items returned inside the deltas as immutable.
-func (f *DeltaFIFO) List() []interface{} {
- f.lock.RLock()
- defer f.lock.RUnlock()
- return f.listLocked()
-}
-
-func (f *DeltaFIFO) listLocked() []interface{} {
- list := make([]interface{}, 0, len(f.items))
- for _, item := range f.items {
- // Copy item's slice so operations on this slice (delta
- // compression) won't interfere with the object we return.
- item = copyDeltas(item)
- list = append(list, item.Newest().Object)
- }
- return list
-}
-
-// ListKeys returns a list of all the keys of the objects currently
-// in the FIFO.
-func (f *DeltaFIFO) ListKeys() []string {
- f.lock.RLock()
- defer f.lock.RUnlock()
- list := make([]string, 0, len(f.items))
- for key := range f.items {
- list = append(list, key)
- }
- return list
-}
-
-// Get returns the complete list of deltas for the requested item,
-// or sets exists=false.
-// You should treat the items returned inside the deltas as immutable.
-func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
- key, err := f.KeyOf(obj)
- if err != nil {
- return nil, false, KeyError{obj, err}
- }
- return f.GetByKey(key)
-}
-
-// GetByKey returns the complete list of deltas for the requested item,
-// setting exists=false if that list is empty.
-// You should treat the items returned inside the deltas as immutable.
-func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
- f.lock.RLock()
- defer f.lock.RUnlock()
- d, exists := f.items[key]
- if exists {
- // Copy item's slice so operations on this slice (delta
- // compression) won't interfere with the object we return.
- d = copyDeltas(d)
- }
- return d, exists, nil
-}
-
-// Pop blocks until an item is added to the queue, and then returns it. If
-// multiple items are ready, they are returned in the order in which they were
-// added/updated. The item is removed from the queue (and the store) before it
-// is returned, so if you don't successfully process it, you need to add it back
-// with AddIfNotPresent().
-// process function is called under lock, so it is safe update data structures
-// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
-// may return an instance of ErrRequeue with a nested error to indicate the current
-// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
-//
-// Pop returns a 'Deltas', which has a complete list of all the things
-// that happened to the object (deltas) while it was sitting in the queue.
-func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
- f.lock.Lock()
- defer f.lock.Unlock()
- for {
- for len(f.queue) == 0 {
- f.cond.Wait()
- }
- id := f.queue[0]
- f.queue = f.queue[1:]
- item, ok := f.items[id]
- if f.initialPopulationCount > 0 {
- f.initialPopulationCount--
- }
- if !ok {
- // Item may have been deleted subsequently.
- continue
- }
- delete(f.items, id)
- err := process(item)
- if e, ok := err.(ErrRequeue); ok {
- f.addIfNotPresent(id, item)
- err = e.Err
- }
- // Don't need to copyDeltas here, because we're transferring
- // ownership to the caller.
- return item, err
- }
-}
-
-// Replace will delete the contents of 'f', using instead the given map.
-// 'f' takes ownership of the map, you should not reference the map again
-// after calling this function. f's queue is reset, too; upon return, it
-// will contain the items in the map, in no particular order.
-func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
- f.lock.Lock()
- defer f.lock.Unlock()
- keys := make(sets.String, len(list))
-
- if !f.populated {
- f.populated = true
- f.initialPopulationCount = len(list)
- }
-
- for _, item := range list {
- key, err := f.KeyOf(item)
- if err != nil {
- return KeyError{item, err}
- }
- keys.Insert(key)
- if err := f.queueActionLocked(Sync, item); err != nil {
- return fmt.Errorf("couldn't enqueue object: %v", err)
- }
- }
-
- if f.knownObjects == nil {
- // Do deletion detection against our own list.
- for k, oldItem := range f.items {
- if keys.Has(k) {
- continue
- }
- var deletedObj interface{}
- if n := oldItem.Newest(); n != nil {
- deletedObj = n.Object
- }
- if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
- return err
- }
- }
- return nil
- }
-
- // Detect deletions not already in the queue.
- // TODO(lavalamp): This may be racy-- we aren't properly locked
- // with knownObjects. Unproven.
- knownKeys := f.knownObjects.ListKeys()
- for _, k := range knownKeys {
- if keys.Has(k) {
- continue
- }
-
- deletedObj, exists, err := f.knownObjects.GetByKey(k)
- if err != nil {
- deletedObj = nil
- glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
- } else if !exists {
- deletedObj = nil
- glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
- }
- if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
- return err
- }
- }
- return nil
-}
-
-// Resync will send a sync event for each item
-func (f *DeltaFIFO) Resync() error {
- f.lock.RLock()
- defer f.lock.RUnlock()
- for _, k := range f.knownObjects.ListKeys() {
- obj, exists, err := f.knownObjects.GetByKey(k)
- if err != nil {
- glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, k)
- continue
- } else if !exists {
- glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", k)
- continue
- }
-
- if err := f.queueActionLocked(Sync, obj); err != nil {
- return fmt.Errorf("couldn't queue object: %v", err)
- }
- }
- return nil
-}
-
-// A KeyListerGetter is anything that knows how to list its keys and look up by key.
-type KeyListerGetter interface {
- KeyLister
- KeyGetter
-}
-
-// A KeyLister is anything that knows how to list its keys.
-type KeyLister interface {
- ListKeys() []string
-}
-
-// A KeyGetter is anything that knows how to get the value stored under a given key.
-type KeyGetter interface {
- GetByKey(key string) (interface{}, bool, error)
-}
-
-// DeltaCompressor is an algorithm that removes redundant changes.
-type DeltaCompressor interface {
- Compress(Deltas) Deltas
-}
-
-// DeltaCompressorFunc should remove redundant changes; but changes that
-// are redundant depend on one's desired semantics, so this is an
-// injectable function.
-//
-// DeltaCompressorFunc adapts a raw function to be a DeltaCompressor.
-type DeltaCompressorFunc func(Deltas) Deltas
-
-// Compress just calls dc.
-func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas {
- return dc(d)
-}
-
-// DeltaType is the type of a change (addition, deletion, etc)
-type DeltaType string
-
-const (
- Added DeltaType = "Added"
- Updated DeltaType = "Updated"
- Deleted DeltaType = "Deleted"
- // The other types are obvious. You'll get Sync deltas when:
- // * A watch expires/errors out and a new list/watch cycle is started.
- // * You've turned on periodic syncs.
- // (Anything that trigger's DeltaFIFO's Replace() method.)
- Sync DeltaType = "Sync"
-)
-
-// Delta is the type stored by a DeltaFIFO. It tells you what change
-// happened, and the object's state after* that change.
-//
-// [*] Unless the change is a deletion, and then you'll get the final
-// state of the object before it was deleted.
-type Delta struct {
- Type DeltaType
- Object interface{}
-}
-
-// Deltas is a list of one or more 'Delta's to an individual object.
-// The oldest delta is at index 0, the newest delta is the last one.
-type Deltas []Delta
-
-// Oldest is a convenience function that returns the oldest delta, or
-// nil if there are no deltas.
-func (d Deltas) Oldest() *Delta {
- if len(d) > 0 {
- return &d[0]
- }
- return nil
-}
-
-// Newest is a convenience function that returns the newest delta, or
-// nil if there are no deltas.
-func (d Deltas) Newest() *Delta {
- if n := len(d); n > 0 {
- return &d[n-1]
- }
- return nil
-}
-
-// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
-// the objects in the slice. This allows Get/List to return an object that we
-// know won't be clobbered by a subsequent call to a delta compressor.
-func copyDeltas(d Deltas) Deltas {
- d2 := make(Deltas, len(d))
- copy(d2, d)
- return d2
-}
-
-// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
-// an object was deleted but the watch deletion event was missed. In this
-// case we don't know the final "resting" state of the object, so there's
-// a chance the included `Obj` is stale.
-type DeletedFinalStateUnknown struct {
- Key string
- Obj interface{}
-}