From d77dc45e7eee74a7c39e850070103fcbbc8f38b0 Mon Sep 17 00:00:00 2001 From: HuabingZhao Date: Wed, 28 Feb 2018 11:10:50 +0800 Subject: Support IP Hash LB policy Issue-ID: MSB-154 Change-Id: I11b8e3a314c6045183971bf2207b9ccee7df10c2 Signed-off-by: HuabingZhao --- .../nginx/luaext/loadbalance/balancer.lua | 24 ++-- .../nginx/luaext/loadbalance/baseupstream.lua | 55 +++++++-- .../luaext/loadbalance/policy/consistent_hash.lua | 135 +++++++++++++++++++++ 3 files changed, 193 insertions(+), 21 deletions(-) create mode 100644 openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua (limited to 'openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance') diff --git a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua index 48dc1d8..ac9bb1d 100644 --- a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua +++ b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/balancer.lua @@ -1,6 +1,6 @@ --[[ - Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE) + Copyright (C) 2018 ZTE, Inc. and others. All rights reserved. (ZTE) Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,14 +18,16 @@ local b = require "ngx.balancer" local baseupstream = require "loadbalance.baseupstream" -local log_util = require('lib.utils.log_util') -local error_handler = require('lib.utils.error_handler') +local stats = require "monitor.stats" +local svc_util = require 'lib.utils.svc_util' + +local servers = ngx.ctx.backservers +local svc_key = ngx.ctx.svc_key +local svc_info = ngx.ctx.svc_info +local svc_get_connect_timeout = svc_util.get_connect_timeout +local svc_get_send_timeout = svc_util.get_send_timeout +local svc_get_read_timeout = svc_util.get_read_timeout -local log = log_util.log -local ngx_ctx = ngx.ctx -local servers = ngx_ctx.backservers -local svc_key = ngx_ctx.svc_key -local error_svc_not_found = error_handler.svc_not_found local status = b.get_last_failure() if status == nil then @@ -35,6 +37,7 @@ elseif status == "failed" then local last_peer = ngx.ctx.last_peer --mark the srv failed one time baseupstream.mark_srv_failed(svc_key,last_peer) + stats.backend_failed() end local server,err = baseupstream.get_backserver(svc_key,servers) @@ -46,5 +49,6 @@ if baseupstream.can_retry(svc_key,servers) then b.set_more_tries(1) end b.set_current_peer(server["ip"],server["port"]) ---log("upstreamserver",server["ip"]..":"..server["port"]) -ngx.ctx.last_peer = { ip=server["ip"], port=server["port"] } \ No newline at end of file +b.set_timeouts(svc_get_connect_timeout(svc_info), svc_get_send_timeout(svc_info), svc_get_read_timeout(svc_info)) +ngx.ctx.last_peer = { ip=server["ip"], port=server["port"] } +stats.forward_backend() diff --git a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua index 4af6dfa..9361eea 100644 --- a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua +++ b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/baseupstream.lua @@ -1,6 +1,6 @@ --[[ - Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE) + Copyright (C) 2017-2018 ZTE, Inc. and others. All rights reserved. (ZTE) Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,10 +19,14 @@ local _M = { _VERSION = '1.0.0' } -local policymodule = require "loadbalance.policy.roundrobin" + +local roundrobin = require "loadbalance.policy.roundrobin" +local consistent_hash = require "loadbalance.policy.consistent_hash" local tbl_util = require('lib.utils.table_util') -local peerwatcher = require "loadbalance.peerwatcher" +local svc_util = require('lib.utils.svc_util') +local peerwatcher = require "core.peerwatcher" local tbl_isempty = tbl_util.isempty +local svc_use_own_upstream = svc_util.use_own_upstream function _M.get_backserver(svc_key,servers) if tbl_isempty(servers) then return nil,"server list is empty" end @@ -42,14 +46,43 @@ function _M.get_backserver(svc_key,servers) return nil,"only one server but is not available" end end - for i=ngx.ctx.tried_num+1,servers_num do - ngx.ctx.tried_num = ngx.ctx.tried_num+1 - server = policymodule.select_backserver(servers,svc_key) - if peerwatcher.is_server_ok(svc_key,server) then - return server,"" - end + + -- A temporary solution, plase modify it when add lb_policy to svc_info + local svc_info = ngx.ctx.svc_info + if svc_use_own_upstream(svc_info) then + svc_info.lb_policy = "ip_hash" + else + svc_info.lb_policy = "roundrobin" end - return nil,"serveral server but no one is available" + + + local mode = svc_info.lb_policy + if mode ~= nil then + if mode == "ip_hash" then + -- iphash + for i=ngx.ctx.tried_num+1,servers_num do + ngx.ctx.tried_num = ngx.ctx.tried_num+1 + server = consistent_hash.select_backserver(servers,svc_key) + if peerwatcher.is_server_ok(svc_key,server) then + return server,"" + end + end + return nil,"serveral server but no one is available" + + elseif mode == "roundrobin" then + -- roundrobin + for i=ngx.ctx.tried_num+1,servers_num do + ngx.ctx.tried_num = ngx.ctx.tried_num+1 + server = roundrobin.select_backserver(servers,svc_key) + if peerwatcher.is_server_ok(svc_key,server) then + return server,"" + end + end + return nil,"serveral server but no one is available" + end + + end + end function _M.can_retry(svc_key,servers) @@ -63,4 +96,4 @@ end function _M.check_and_reset_srv_status_ifneed(svc_key, servers) peerwatcher.check_and_reset_srv_status_ifneed(svc_key,servers) end -return _M \ No newline at end of file +return _M diff --git a/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua new file mode 100644 index 0000000..b3cd46e --- /dev/null +++ b/openresty-ext/src/assembly/resources/openresty/nginx/luaext/loadbalance/policy/consistent_hash.lua @@ -0,0 +1,135 @@ +--[[ + + Copyright (C) 2018 ZTE, Inc. and others. All rights reserved. (ZTE) + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--]] + +local _M = {} +_M._VERSION = '1.0.0' + +local floor = math.floor +local str_byte = string.byte +local tab_sort = table.sort +local tab_insert = table.insert + +local MOD = 2 ^ 32 +local REPLICAS = 20 +local LUCKY_NUM = 13 + + +local tbl_util = require('lib.utils.table_util') +local tbl_isempty = tbl_util.isempty +local tbl_isequal = require('pl.tablex') +local peerwatcher = require "core.peerwatcher" +local ngx_var = ngx.var +local hash_data = {} + +local function hash_string(str) + local key = 0 + for i = 1, #str do + key = (key * 31 + str_byte(str, i)) % MOD + end + return key +end + + +local function init_consistent_hash_state(servers) + local weight_sum = 0 + local weight = 1 + for _, srv in ipairs(servers) do + if srv.weight and srv.weight ~= 0 then + weight = srv.weight + end + weight_sum = weight_sum + weight + end + + local circle, members = {}, 0 + for index, srv in ipairs(servers) do + local key = ("%s:%s"):format(srv.ip, srv.port) + local base_hash = hash_string(key) + for c = 1, REPLICAS * weight_sum do + local hash = (base_hash * c * LUCKY_NUM) % MOD + tab_insert(circle, { hash, index }) + end + + members = members + 1 + end + tab_sort(circle, function(a, b) return a[1] < b[1] end) + return { circle = circle, members = members } +end + +local function update_consistent_hash_state(hash_data,servers,svckey) + -- compare servers in ctx with servers in cache + -- update the hash data if changes occur + local serverscache = hash_data[svckey].servers + tab_sort(serverscache, function(a, b) return a.ip < b.ip end) + tab_sort(servers, function(a, b) return a.ip < b.ip end) + if not tbl_isequal.deepcompare(serverscache, servers, false) then + local tmp_chash = init_consistent_hash_state(servers) + hash_data[svckey].servers =servers + hash_data[svckey].chash = tmp_chash + end +end + +local function binary_search(circle, key) + local size = #circle + local st, ed, mid = 1, size + + while st <= ed do + mid = floor((st + ed) / 2) + if circle[mid][1] < key then + st = mid + 1 + else + ed = mid - 1 + end + end + + return st == size + 1 and 1 or st +end + + +function _M.select_backserver(servers,svckey) + + if hash_data[svckey] == nil then + local tbl = {} + tbl['servers'] = {} + tbl['chash'] = {} + hash_data[svckey] = tbl + end + + if tbl_isempty(hash_data[svckey].servers) then + local tmp_chash = init_consistent_hash_state(servers) + hash_data[svckey].servers = servers + hash_data[svckey].chash = tmp_chash + else + update_consistent_hash_state(hash_data,servers,svckey) + end + + local chash = hash_data[svckey].chash + local circle = chash.circle + local hash_key = ngx_var.remote_addr + local st = binary_search(circle, hash_string(hash_key)) + local size = #circle + local ed = st + size - 1 + for i = st, ed do + local idx = circle[(i - 1) % size + 1][2] + if peerwatcher.is_server_ok(svckey,hash_data[svckey].servers[idx]) then + return hash_data[svckey].servers[idx] + end + end + return nil, "consistent hash: no servers available" +end + +return _M -- cgit 1.2.3-korg