aboutsummaryrefslogtreecommitdiffstats
path: root/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance
diff options
context:
space:
mode:
authorHuabingZhao <zhao.huabing@zte.com.cn>2018-02-28 11:10:50 +0800
committerHuabingZhao <zhao.huabing@zte.com.cn>2018-02-28 11:10:56 +0800
commitd77dc45e7eee74a7c39e850070103fcbbc8f38b0 (patch)
tree42ba2358aa53a99e30d7f493a619a40b67f9ec4a /openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance
parenteba92f2ec4bd3783633fe2408eeae582b811c70a (diff)
Support IP Hash LB policy
Issue-ID: MSB-154 Change-Id: I11b8e3a314c6045183971bf2207b9ccee7df10c2 Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
Diffstat (limited to 'openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance')
-rw-r--r--openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua24
-rw-r--r--openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua55
-rw-r--r--openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua135
3 files changed, 193 insertions, 21 deletions
diff --git a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua
index 48dc1d8..ac9bb1d 100644
--- a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua
+++ b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua
@@ -1,6 +1,6 @@
--[[
- Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
+ Copyright (C) 2018 ZTE, Inc. and others. All rights reserved. (ZTE)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -18,14 +18,16 @@
local b = require "ngx.balancer"
local baseupstream = require "loadbalance.baseupstream"
-local log_util = require('lib.utils.log_util')
-local error_handler = require('lib.utils.error_handler')
+local stats = require "monitor.stats"
+local svc_util = require 'lib.utils.svc_util'
+
+local servers = ngx.ctx.backservers
+local svc_key = ngx.ctx.svc_key
+local svc_info = ngx.ctx.svc_info
+local svc_get_connect_timeout = svc_util.get_connect_timeout
+local svc_get_send_timeout = svc_util.get_send_timeout
+local svc_get_read_timeout = svc_util.get_read_timeout
-local log = log_util.log
-local ngx_ctx = ngx.ctx
-local servers = ngx_ctx.backservers
-local svc_key = ngx_ctx.svc_key
-local error_svc_not_found = error_handler.svc_not_found
local status = b.get_last_failure()
if status == nil then
@@ -35,6 +37,7 @@ elseif status == "failed" then
local last_peer = ngx.ctx.last_peer
--mark the srv failed one time
baseupstream.mark_srv_failed(svc_key,last_peer)
+ stats.backend_failed()
end
local server,err = baseupstream.get_backserver(svc_key,servers)
@@ -46,5 +49,6 @@ if baseupstream.can_retry(svc_key,servers) then
b.set_more_tries(1)
end
b.set_current_peer(server["ip"],server["port"])
---log("upstreamserver",server["ip"]..":"..server["port"])
-ngx.ctx.last_peer = { ip=server["ip"], port=server["port"] } \ No newline at end of file
+b.set_timeouts(svc_get_connect_timeout(svc_info), svc_get_send_timeout(svc_info), svc_get_read_timeout(svc_info))
+ngx.ctx.last_peer = { ip=server["ip"], port=server["port"] }
+stats.forward_backend()
diff --git a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua
index 4af6dfa..9361eea 100644
--- a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua
+++ b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua
@@ -1,6 +1,6 @@
--[[
- Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
+ Copyright (C) 2017-2018 ZTE, Inc. and others. All rights reserved. (ZTE)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,10 +19,14 @@
local _M = {
_VERSION = '1.0.0'
}
-local policymodule = require "loadbalance.policy.roundrobin"
+
+local roundrobin = require "loadbalance.policy.roundrobin"
+local consistent_hash = require "loadbalance.policy.consistent_hash"
local tbl_util = require('lib.utils.table_util')
-local peerwatcher = require "loadbalance.peerwatcher"
+local svc_util = require('lib.utils.svc_util')
+local peerwatcher = require "core.peerwatcher"
local tbl_isempty = tbl_util.isempty
+local svc_use_own_upstream = svc_util.use_own_upstream
function _M.get_backserver(svc_key,servers)
if tbl_isempty(servers) then return nil,"server list is empty" end
@@ -42,14 +46,43 @@ function _M.get_backserver(svc_key,servers)
return nil,"only one server but is not available"
end
end
- for i=ngx.ctx.tried_num+1,servers_num do
- ngx.ctx.tried_num = ngx.ctx.tried_num+1
- server = policymodule.select_backserver(servers,svc_key)
- if peerwatcher.is_server_ok(svc_key,server) then
- return server,""
- end
+
+ -- A temporary solution, plase modify it when add lb_policy to svc_info
+ local svc_info = ngx.ctx.svc_info
+ if svc_use_own_upstream(svc_info) then
+ svc_info.lb_policy = "ip_hash"
+ else
+ svc_info.lb_policy = "roundrobin"
end
- return nil,"serveral server but no one is available"
+
+
+ local mode = svc_info.lb_policy
+ if mode ~= nil then
+ if mode == "ip_hash" then
+ -- iphash
+ for i=ngx.ctx.tried_num+1,servers_num do
+ ngx.ctx.tried_num = ngx.ctx.tried_num+1
+ server = consistent_hash.select_backserver(servers,svc_key)
+ if peerwatcher.is_server_ok(svc_key,server) then
+ return server,""
+ end
+ end
+ return nil,"serveral server but no one is available"
+
+ elseif mode == "roundrobin" then
+ -- roundrobin
+ for i=ngx.ctx.tried_num+1,servers_num do
+ ngx.ctx.tried_num = ngx.ctx.tried_num+1
+ server = roundrobin.select_backserver(servers,svc_key)
+ if peerwatcher.is_server_ok(svc_key,server) then
+ return server,""
+ end
+ end
+ return nil,"serveral server but no one is available"
+ end
+
+ end
+
end
function _M.can_retry(svc_key,servers)
@@ -63,4 +96,4 @@ end
function _M.check_and_reset_srv_status_ifneed(svc_key, servers)
peerwatcher.check_and_reset_srv_status_ifneed(svc_key,servers)
end
-return _M \ No newline at end of file
+return _M
diff --git a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua
new file mode 100644
index 0000000..b3cd46e
--- /dev/null
+++ b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua
@@ -0,0 +1,135 @@
+--[[
+
+ Copyright (C) 2018 ZTE, Inc. and others. All rights reserved. (ZTE)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+--]]
+
+local _M = {}
+_M._VERSION = '1.0.0'
+
+local floor = math.floor
+local str_byte = string.byte
+local tab_sort = table.sort
+local tab_insert = table.insert
+
+local MOD = 2 ^ 32
+local REPLICAS = 20
+local LUCKY_NUM = 13
+
+
+local tbl_util = require('lib.utils.table_util')
+local tbl_isempty = tbl_util.isempty
+local tbl_isequal = require('pl.tablex')
+local peerwatcher = require "core.peerwatcher"
+local ngx_var = ngx.var
+local hash_data = {}
+
+local function hash_string(str)
+ local key = 0
+ for i = 1, #str do
+ key = (key * 31 + str_byte(str, i)) % MOD
+ end
+ return key
+end
+
+
+local function init_consistent_hash_state(servers)
+ local weight_sum = 0
+ local weight = 1
+ for _, srv in ipairs(servers) do
+ if srv.weight and srv.weight ~= 0 then
+ weight = srv.weight
+ end
+ weight_sum = weight_sum + weight
+ end
+
+ local circle, members = {}, 0
+ for index, srv in ipairs(servers) do
+ local key = ("%s:%s"):format(srv.ip, srv.port)
+ local base_hash = hash_string(key)
+ for c = 1, REPLICAS * weight_sum do
+ local hash = (base_hash * c * LUCKY_NUM) % MOD
+ tab_insert(circle, { hash, index })
+ end
+
+ members = members + 1
+ end
+ tab_sort(circle, function(a, b) return a[1] < b[1] end)
+ return { circle = circle, members = members }
+end
+
+local function update_consistent_hash_state(hash_data,servers,svckey)
+ -- compare servers in ctx with servers in cache
+ -- update the hash data if changes occur
+ local serverscache = hash_data[svckey].servers
+ tab_sort(serverscache, function(a, b) return a.ip < b.ip end)
+ tab_sort(servers, function(a, b) return a.ip < b.ip end)
+ if not tbl_isequal.deepcompare(serverscache, servers, false) then
+ local tmp_chash = init_consistent_hash_state(servers)
+ hash_data[svckey].servers =servers
+ hash_data[svckey].chash = tmp_chash
+ end
+end
+
+local function binary_search(circle, key)
+ local size = #circle
+ local st, ed, mid = 1, size
+
+ while st <= ed do
+ mid = floor((st + ed) / 2)
+ if circle[mid][1] < key then
+ st = mid + 1
+ else
+ ed = mid - 1
+ end
+ end
+
+ return st == size + 1 and 1 or st
+end
+
+
+function _M.select_backserver(servers,svckey)
+
+ if hash_data[svckey] == nil then
+ local tbl = {}
+ tbl['servers'] = {}
+ tbl['chash'] = {}
+ hash_data[svckey] = tbl
+ end
+
+ if tbl_isempty(hash_data[svckey].servers) then
+ local tmp_chash = init_consistent_hash_state(servers)
+ hash_data[svckey].servers = servers
+ hash_data[svckey].chash = tmp_chash
+ else
+ update_consistent_hash_state(hash_data,servers,svckey)
+ end
+
+ local chash = hash_data[svckey].chash
+ local circle = chash.circle
+ local hash_key = ngx_var.remote_addr
+ local st = binary_search(circle, hash_string(hash_key))
+ local size = #circle
+ local ed = st + size - 1
+ for i = st, ed do
+ local idx = circle[(i - 1) % size + 1][2]
+ if peerwatcher.is_server_ok(svckey,hash_data[svckey].servers[idx]) then
+ return hash_data[svckey].servers[idx]
+ end
+ end
+ return nil, "consistent hash: no servers available"
+end
+
+return _M