diff options
author | 2018-02-28 11:10:50 +0800 | |
---|---|---|
committer | 2018-02-28 11:10:56 +0800 | |
commit | d77dc45e7eee74a7c39e850070103fcbbc8f38b0 (patch) | |
tree | 42ba2358aa53a99e30d7f493a619a40b67f9ec4a /openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance | |
parent | eba92f2ec4bd3783633fe2408eeae582b811c70a (diff) |
Support IP Hash LB policy
Issue-ID: MSB-154
Change-Id: I11b8e3a314c6045183971bf2207b9ccee7df10c2
Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
Diffstat (limited to 'openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance')
3 files changed, 193 insertions, 21 deletions
diff --git a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua index 48dc1d8..ac9bb1d 100644 --- a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua +++ b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua @@ -1,6 +1,6 @@ --[[
- Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
+ Copyright (C) 2018 ZTE, Inc. and others. All rights reserved. (ZTE)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -18,14 +18,16 @@ local b = require "ngx.balancer"
local baseupstream = require "loadbalance.baseupstream"
-local log_util = require('lib.utils.log_util')
-local error_handler = require('lib.utils.error_handler')
+local stats = require "monitor.stats"
+local svc_util = require 'lib.utils.svc_util'
+
+local servers = ngx.ctx.backservers
+local svc_key = ngx.ctx.svc_key
+local svc_info = ngx.ctx.svc_info
+local svc_get_connect_timeout = svc_util.get_connect_timeout
+local svc_get_send_timeout = svc_util.get_send_timeout
+local svc_get_read_timeout = svc_util.get_read_timeout
-local log = log_util.log
-local ngx_ctx = ngx.ctx
-local servers = ngx_ctx.backservers
-local svc_key = ngx_ctx.svc_key
-local error_svc_not_found = error_handler.svc_not_found
local status = b.get_last_failure()
if status == nil then
@@ -35,6 +37,7 @@ elseif status == "failed" then local last_peer = ngx.ctx.last_peer
--mark the srv failed one time
baseupstream.mark_srv_failed(svc_key,last_peer)
+ stats.backend_failed()
end
local server,err = baseupstream.get_backserver(svc_key,servers)
@@ -46,5 +49,6 @@ if baseupstream.can_retry(svc_key,servers) then b.set_more_tries(1)
end
b.set_current_peer(server["ip"],server["port"])
---log("upstreamserver",server["ip"]..":"..server["port"])
-ngx.ctx.last_peer = { ip=server["ip"], port=server["port"] }
\ No newline at end of file +b.set_timeouts(svc_get_connect_timeout(svc_info), svc_get_send_timeout(svc_info), svc_get_read_timeout(svc_info))
+ngx.ctx.last_peer = { ip=server["ip"], port=server["port"] }
+stats.forward_backend()
diff --git a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua index 4af6dfa..9361eea 100644 --- a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua +++ b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua @@ -1,6 +1,6 @@ --[[
- Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
+ Copyright (C) 2017-2018 ZTE, Inc. and others. All rights reserved. (ZTE)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,10 +19,14 @@ local _M = {
_VERSION = '1.0.0'
}
-local policymodule = require "loadbalance.policy.roundrobin"
+
+local roundrobin = require "loadbalance.policy.roundrobin"
+local consistent_hash = require "loadbalance.policy.consistent_hash"
local tbl_util = require('lib.utils.table_util')
-local peerwatcher = require "loadbalance.peerwatcher"
+local svc_util = require('lib.utils.svc_util')
+local peerwatcher = require "core.peerwatcher"
local tbl_isempty = tbl_util.isempty
+local svc_use_own_upstream = svc_util.use_own_upstream
function _M.get_backserver(svc_key,servers)
if tbl_isempty(servers) then return nil,"server list is empty" end
@@ -42,14 +46,43 @@ function _M.get_backserver(svc_key,servers) return nil,"only one server but is not available"
end
end
- for i=ngx.ctx.tried_num+1,servers_num do
- ngx.ctx.tried_num = ngx.ctx.tried_num+1
- server = policymodule.select_backserver(servers,svc_key)
- if peerwatcher.is_server_ok(svc_key,server) then
- return server,""
- end
+
+ -- A temporary solution, plase modify it when add lb_policy to svc_info
+ local svc_info = ngx.ctx.svc_info
+ if svc_use_own_upstream(svc_info) then
+ svc_info.lb_policy = "ip_hash"
+ else
+ svc_info.lb_policy = "roundrobin"
end
- return nil,"serveral server but no one is available"
+
+
+ local mode = svc_info.lb_policy
+ if mode ~= nil then
+ if mode == "ip_hash" then
+ -- iphash
+ for i=ngx.ctx.tried_num+1,servers_num do
+ ngx.ctx.tried_num = ngx.ctx.tried_num+1
+ server = consistent_hash.select_backserver(servers,svc_key)
+ if peerwatcher.is_server_ok(svc_key,server) then
+ return server,""
+ end
+ end
+ return nil,"serveral server but no one is available"
+
+ elseif mode == "roundrobin" then
+ -- roundrobin
+ for i=ngx.ctx.tried_num+1,servers_num do
+ ngx.ctx.tried_num = ngx.ctx.tried_num+1
+ server = roundrobin.select_backserver(servers,svc_key)
+ if peerwatcher.is_server_ok(svc_key,server) then
+ return server,""
+ end
+ end
+ return nil,"serveral server but no one is available"
+ end
+
+ end
+
end
function _M.can_retry(svc_key,servers)
@@ -63,4 +96,4 @@ end function _M.check_and_reset_srv_status_ifneed(svc_key, servers)
peerwatcher.check_and_reset_srv_status_ifneed(svc_key,servers)
end
-return _M
\ No newline at end of file +return _M
diff --git a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua new file mode 100644 index 0000000..b3cd46e --- /dev/null +++ b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua @@ -0,0 +1,135 @@ +--[[ + + Copyright (C) 2018 ZTE, Inc. and others. All rights reserved. (ZTE) + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--]] + +local _M = {} +_M._VERSION = '1.0.0' + +local floor = math.floor +local str_byte = string.byte +local tab_sort = table.sort +local tab_insert = table.insert + +local MOD = 2 ^ 32 +local REPLICAS = 20 +local LUCKY_NUM = 13 + + +local tbl_util = require('lib.utils.table_util') +local tbl_isempty = tbl_util.isempty +local tbl_isequal = require('pl.tablex') +local peerwatcher = require "core.peerwatcher" +local ngx_var = ngx.var +local hash_data = {} + +local function hash_string(str) + local key = 0 + for i = 1, #str do + key = (key * 31 + str_byte(str, i)) % MOD + end + return key +end + + +local function init_consistent_hash_state(servers) + local weight_sum = 0 + local weight = 1 + for _, srv in ipairs(servers) do + if srv.weight and srv.weight ~= 0 then + weight = srv.weight + end + weight_sum = weight_sum + weight + end + + local circle, members = {}, 0 + for index, srv in ipairs(servers) do + local key = ("%s:%s"):format(srv.ip, srv.port) + local base_hash = hash_string(key) + for c = 1, REPLICAS * weight_sum do + local hash = (base_hash * c * LUCKY_NUM) % MOD + tab_insert(circle, { hash, index }) + end + + members = members + 1 + end + tab_sort(circle, function(a, b) return a[1] < b[1] end) + return { circle = circle, members = members } +end + +local function update_consistent_hash_state(hash_data,servers,svckey) + -- compare servers in ctx with servers in cache + -- update the hash data if changes occur + local serverscache = hash_data[svckey].servers + tab_sort(serverscache, function(a, b) return a.ip < b.ip end) + tab_sort(servers, function(a, b) return a.ip < b.ip end) + if not tbl_isequal.deepcompare(serverscache, servers, false) then + local tmp_chash = init_consistent_hash_state(servers) + hash_data[svckey].servers =servers + hash_data[svckey].chash = tmp_chash + end +end + +local function binary_search(circle, key) + local size = #circle + local st, ed, mid = 1, size + + while st <= ed do + mid = floor((st + ed) / 2) + if circle[mid][1] < key then + st = mid + 1 + else + ed = mid - 1 + end + end + + return st == size + 1 and 1 or st +end + + +function _M.select_backserver(servers,svckey) + + if hash_data[svckey] == nil then + local tbl = {} + tbl['servers'] = {} + tbl['chash'] = {} + hash_data[svckey] = tbl + end + + if tbl_isempty(hash_data[svckey].servers) then + local tmp_chash = init_consistent_hash_state(servers) + hash_data[svckey].servers = servers + hash_data[svckey].chash = tmp_chash + else + update_consistent_hash_state(hash_data,servers,svckey) + end + + local chash = hash_data[svckey].chash + local circle = chash.circle + local hash_key = ngx_var.remote_addr + local st = binary_search(circle, hash_string(hash_key)) + local size = #circle + local ed = st + size - 1 + for i = st, ed do + local idx = circle[(i - 1) % size + 1][2] + if peerwatcher.is_server_ok(svckey,hash_data[svckey].servers[idx]) then + return hash_data[svckey].servers[idx] + end + end + return nil, "consistent hash: no servers available" +end + +return _M |