aboutsummaryrefslogtreecommitdiffstats
path: root/src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go')
-rw-r--r--src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go85
1 files changed, 56 insertions, 29 deletions
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go
index 482ba7d..e671c04 100644
--- a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go
+++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go
@@ -18,8 +18,9 @@ package flowcontrol
import (
"sync"
+ "time"
- "github.com/juju/ratelimit"
+ "golang.org/x/time/rate"
)
type RateLimiter interface {
@@ -30,15 +31,14 @@ type RateLimiter interface {
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
- // Saturation returns a percentage number which describes how saturated
- // this rate limiter is.
- // Usually we use token bucket rate limiter. In that case,
- // 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
- Saturation() float64
+ // QPS returns QPS of this rate limiter
+ QPS() float32
}
type tokenBucketRateLimiter struct {
- limiter *ratelimit.Bucket
+ limiter *rate.Limiter
+ clock Clock
+ qps float32
}
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
@@ -47,28 +47,57 @@ type tokenBucketRateLimiter struct {
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
- limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
- return &tokenBucketRateLimiter{limiter}
+ limiter := rate.NewLimiter(rate.Limit(qps), burst)
+ return newTokenBucketRateLimiter(limiter, realClock{}, qps)
}
-func (t *tokenBucketRateLimiter) TryAccept() bool {
- return t.limiter.TakeAvailable(1) == 1
+// An injectable, mockable clock interface.
+type Clock interface {
+ Now() time.Time
+ Sleep(time.Duration)
}
-func (t *tokenBucketRateLimiter) Saturation() float64 {
- capacity := t.limiter.Capacity()
- avail := t.limiter.Available()
- return float64(capacity-avail) / float64(capacity)
+type realClock struct{}
+
+func (realClock) Now() time.Time {
+ return time.Now()
+}
+func (realClock) Sleep(d time.Duration) {
+ time.Sleep(d)
+}
+
+// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
+// but allows an injectable clock, for testing.
+func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
+ limiter := rate.NewLimiter(rate.Limit(qps), burst)
+ return newTokenBucketRateLimiter(limiter, c, qps)
+}
+
+func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
+ return &tokenBucketRateLimiter{
+ limiter: limiter,
+ clock: c,
+ qps: qps,
+ }
+}
+
+func (t *tokenBucketRateLimiter) TryAccept() bool {
+ return t.limiter.AllowN(t.clock.Now(), 1)
}
// Accept will block until a token becomes available
func (t *tokenBucketRateLimiter) Accept() {
- t.limiter.Wait(1)
+ now := t.clock.Now()
+ t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
}
func (t *tokenBucketRateLimiter) Stop() {
}
+func (t *tokenBucketRateLimiter) QPS() float32 {
+ return t.qps
+}
+
type fakeAlwaysRateLimiter struct{}
func NewFakeAlwaysRateLimiter() RateLimiter {
@@ -79,34 +108,28 @@ func (t *fakeAlwaysRateLimiter) TryAccept() bool {
return true
}
-func (t *fakeAlwaysRateLimiter) Saturation() float64 {
- return 0
-}
-
func (t *fakeAlwaysRateLimiter) Stop() {}
func (t *fakeAlwaysRateLimiter) Accept() {}
+func (t *fakeAlwaysRateLimiter) QPS() float32 {
+ return 1
+}
+
type fakeNeverRateLimiter struct {
wg sync.WaitGroup
}
func NewFakeNeverRateLimiter() RateLimiter {
- wg := sync.WaitGroup{}
- wg.Add(1)
- return &fakeNeverRateLimiter{
- wg: wg,
- }
+ rl := fakeNeverRateLimiter{}
+ rl.wg.Add(1)
+ return &rl
}
func (t *fakeNeverRateLimiter) TryAccept() bool {
return false
}
-func (t *fakeNeverRateLimiter) Saturation() float64 {
- return 1
-}
-
func (t *fakeNeverRateLimiter) Stop() {
t.wg.Done()
}
@@ -114,3 +137,7 @@ func (t *fakeNeverRateLimiter) Stop() {
func (t *fakeNeverRateLimiter) Accept() {
t.wg.Wait()
}
+
+func (t *fakeNeverRateLimiter) QPS() float32 {
+ return 1
+}