diff options
author | Huabing Zhao <zhaohuabing@gmail.com> | 2018-06-29 13:49:52 +0800 |
---|---|---|
committer | Huabing Zhao <zhaohuabing@gmail.com> | 2018-06-29 14:59:18 +0800 |
commit | bead27c83ac3fd97b6c49a2ac0e01e574ce09a18 (patch) | |
tree | 99aacc73fb60031f40ffce14fa3828470f1a2395 /src/kube2msb/vendor/k8s.io/kubernetes | |
parent | 9ddcc8470c7ed6a2cbddfb079b6396bb58436827 (diff) |
Remove juju/ratelimit to avoid LGPL issue
Change-Id: I1bf246a713d07fc9891f23ea85fb40c7864f05ef
Issue-ID: OOM-1271
Signed-off-by: Huabing Zhao <zhaohuabing@gmail.com>
Diffstat (limited to 'src/kube2msb/vendor/k8s.io/kubernetes')
-rw-r--r-- | src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go | 85 |
1 files changed, 56 insertions, 29 deletions
diff --git a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go index 482ba7d..e671c04 100644 --- a/src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go +++ b/src/kube2msb/vendor/k8s.io/kubernetes/pkg/util/flowcontrol/throttle.go @@ -18,8 +18,9 @@ package flowcontrol import ( "sync" + "time" - "github.com/juju/ratelimit" + "golang.org/x/time/rate" ) type RateLimiter interface { @@ -30,15 +31,14 @@ type RateLimiter interface { Accept() // Stop stops the rate limiter, subsequent calls to CanAccept will return false Stop() - // Saturation returns a percentage number which describes how saturated - // this rate limiter is. - // Usually we use token bucket rate limiter. In that case, - // 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use. - Saturation() float64 + // QPS returns QPS of this rate limiter + QPS() float32 } type tokenBucketRateLimiter struct { - limiter *ratelimit.Bucket + limiter *rate.Limiter + clock Clock + qps float32 } // NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach. @@ -47,28 +47,57 @@ type tokenBucketRateLimiter struct { // The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'. // The maximum number of tokens in the bucket is capped at 'burst'. func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { - limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst)) - return &tokenBucketRateLimiter{limiter} + limiter := rate.NewLimiter(rate.Limit(qps), burst) + return newTokenBucketRateLimiter(limiter, realClock{}, qps) } -func (t *tokenBucketRateLimiter) TryAccept() bool { - return t.limiter.TakeAvailable(1) == 1 +// An injectable, mockable clock interface. +type Clock interface { + Now() time.Time + Sleep(time.Duration) } -func (t *tokenBucketRateLimiter) Saturation() float64 { - capacity := t.limiter.Capacity() - avail := t.limiter.Available() - return float64(capacity-avail) / float64(capacity) +type realClock struct{} + +func (realClock) Now() time.Time { + return time.Now() +} +func (realClock) Sleep(d time.Duration) { + time.Sleep(d) +} + +// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter +// but allows an injectable clock, for testing. +func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter { + limiter := rate.NewLimiter(rate.Limit(qps), burst) + return newTokenBucketRateLimiter(limiter, c, qps) +} + +func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter { + return &tokenBucketRateLimiter{ + limiter: limiter, + clock: c, + qps: qps, + } +} + +func (t *tokenBucketRateLimiter) TryAccept() bool { + return t.limiter.AllowN(t.clock.Now(), 1) } // Accept will block until a token becomes available func (t *tokenBucketRateLimiter) Accept() { - t.limiter.Wait(1) + now := t.clock.Now() + t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now)) } func (t *tokenBucketRateLimiter) Stop() { } +func (t *tokenBucketRateLimiter) QPS() float32 { + return t.qps +} + type fakeAlwaysRateLimiter struct{} func NewFakeAlwaysRateLimiter() RateLimiter { @@ -79,34 +108,28 @@ func (t *fakeAlwaysRateLimiter) TryAccept() bool { return true } -func (t *fakeAlwaysRateLimiter) Saturation() float64 { - return 0 -} - func (t *fakeAlwaysRateLimiter) Stop() {} func (t *fakeAlwaysRateLimiter) Accept() {} +func (t *fakeAlwaysRateLimiter) QPS() float32 { + return 1 +} + type fakeNeverRateLimiter struct { wg sync.WaitGroup } func NewFakeNeverRateLimiter() RateLimiter { - wg := sync.WaitGroup{} - wg.Add(1) - return &fakeNeverRateLimiter{ - wg: wg, - } + rl := fakeNeverRateLimiter{} + rl.wg.Add(1) + return &rl } func (t *fakeNeverRateLimiter) TryAccept() bool { return false } -func (t *fakeNeverRateLimiter) Saturation() float64 { - return 1 -} - func (t *fakeNeverRateLimiter) Stop() { t.wg.Done() } @@ -114,3 +137,7 @@ func (t *fakeNeverRateLimiter) Stop() { func (t *fakeNeverRateLimiter) Accept() { t.wg.Wait() } + +func (t *fakeNeverRateLimiter) QPS() float32 { + return 1 +} |