From a47080445507f1dc444144bfe9c948c9f8f16224 Mon Sep 17 00:00:00 2001 From: Yun Huang Date: Wed, 4 Jul 2018 17:56:16 +0800 Subject: Add logic to handle single backlog for ocata Dispatch the specific backlog to corresponding handler caching the runtime state into cache Change-Id: I3cd54f5e0b15210cf90c8d14b0ec423af858c0ac Issue-ID: MULTICLOUD-230 Signed-off-by: Yun Huang --- ocata/ocata/vesagent/event_domain/fault_vm.py | 9 +++ ocata/ocata/vesagent/tasks.py | 90 ++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/ocata/ocata/vesagent/event_domain/fault_vm.py b/ocata/ocata/vesagent/event_domain/fault_vm.py index 314847c0..5ece71ab 100644 --- a/ocata/ocata/vesagent/event_domain/fault_vm.py +++ b/ocata/ocata/vesagent/event_domain/fault_vm.py @@ -108,3 +108,12 @@ def buildBacklog_fault_vm(vimid, backlog_input): logger.debug("with backlog: %s" % backlog) return backlog + +### process backlog with domain:"fault", type:"vm" + +def processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog): + logger.debug("vesAgentConfig:%s, vesAgentState:%s, oneBacklog: %s" + % (vesAgentConfig, vesAgentState, oneBacklog)) + + return + diff --git a/ocata/ocata/vesagent/tasks.py b/ocata/ocata/vesagent/tasks.py index 2c94c8e6..21f1e201 100644 --- a/ocata/ocata/vesagent/tasks.py +++ b/ocata/ocata/vesagent/tasks.py @@ -22,6 +22,8 @@ import time from django.core.cache import cache +from ocata.vesagent.event_domain.fault_vm import processBacklog_fault_vm + logger = logging.getLogger(__name__) @@ -83,7 +85,7 @@ def processBacklogsOfOneVIM(vimid): :param vimid: :return: ''' - backlog_count = 3 + backlog_count = 0 next_time_slot = 10 try: @@ -100,9 +102,95 @@ def processBacklogsOfOneVIM(vimid): return 0,next_time_slot + vesAgentStateStr = cache.get("VesAgentBacklogs.state.%s" % (vimid)) + vesAgentState = json.loads(vesAgentStateStr) if vesAgentStateStr is not None else {} + + ves_info = vesAgentConfig.get("subscription", None) + if ves_info is None: + logger.warn("VesAgentBacklogs.config.%s: ves subscription corrupts:%s" % (vimid, vesAgentConfigStr)) + return 0,next_time_slot + + poll_interval_default = vesAgentConfig.get("poll_interval_default", None) + if poll_interval_default is None: + logger.warn("VesAgentBacklogs.config.%s: poll_interval_default corrupts:%s" % (vimid, vesAgentConfigStr)) + return 0,next_time_slot + + if poll_interval_default == 0: + # invalid interval value + logger.warn("VesAgentBacklogs.config.%s: poll_interval_default invalid:%s" % (vimid, vesAgentConfigStr)) + return 0,next_time_slot + + backlogs_list = vesAgentConfig.get("backlogs", None) + if backlogs_list is None: + logger.warn("VesAgentBacklogs.config.%s: backlogs corrupts:%s" % (vimid, vesAgentConfigStr)) + return 0,next_time_slot + + for backlog in backlogs_list: + backlog_count_tmp, next_time_slot_tmp = processOneBacklog( + vesAgentConfig, vesAgentState, poll_interval_default, backlog) + logger.debug("processOneBacklog return with %s,%s" % (backlog_count_tmp, next_time_slot_tmp)) + backlog_count += backlog_count_tmp + next_time_slot = next_time_slot_tmp if next_time_slot > next_time_slot_tmp else next_time_slot + + pass + + # save back the updated backlogs state + vesAgentStateStr = json.dumps(vesAgentState) + cache.set("VesAgentBacklogs.state.%s" % vimid, vesAgentStateStr, None) + + except Exception as e: + logger.error("exception:%s" % str(e)) + + return backlog_count, next_time_slot + + +def processOneBacklog(vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog): + logger.info("Process one backlog") + #logger.debug("vesAgentConfig:%s, vesAgentState:%s, poll_interval_default:%s, oneBacklog: %s" + # % (vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog)) + + backlog_count = 1 + next_time_slot = 10 + try: + timestamp_now = int(time.time()) + backlog_uuid = oneBacklog.get("backlog_uuid", None) + if backlog_uuid is None: + # warning: uuid is None, omit this backlog + logger.warn("backlog without uuid: %s" % oneBacklog) + return 0, next_time_slot + + backlogState = vesAgentState.get("%s" % (backlog_uuid), None) + if backlogState is None: + initialBacklogState = { + "timestamp": timestamp_now + } + vesAgentState["%s" % (backlog_uuid)] = initialBacklogState + backlogState = initialBacklogState + + time_expiration = backlogState["timestamp"] \ + + oneBacklog.get("poll_interval", poll_interval_default) + # check if poll interval expires + if timestamp_now < time_expiration: + # not expired yet + logger.info("return without dispatching, not expired yet") + return backlog_count, next_time_slot + + logger.info("Dispatching backlog") + + # collect data in case of expiration + if oneBacklog["domain"] == "fault" and oneBacklog["type"] == "vm": + processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog) + else: + logger.warn("Dispatching backlog fails due to unsupported backlog domain %s,type:%s" + % (oneBacklog["domain"], oneBacklog["type"])) + backlog_count = 0 + pass + # update timestamp and internal state + backlogState["timestamp"] = timestamp_now except Exception as e: logger.error("exception:%s" % str(e)) + logger.info("return") return backlog_count, next_time_slot -- cgit 1.2.3-korg