summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ocata/ocata/vesagent/event_domain/fault_vm.py9
-rw-r--r--ocata/ocata/vesagent/tasks.py90
2 files changed, 98 insertions, 1 deletions
diff --git a/ocata/ocata/vesagent/event_domain/fault_vm.py b/ocata/ocata/vesagent/event_domain/fault_vm.py
index 314847c0..5ece71ab 100644
--- a/ocata/ocata/vesagent/event_domain/fault_vm.py
+++ b/ocata/ocata/vesagent/event_domain/fault_vm.py
@@ -108,3 +108,12 @@ def buildBacklog_fault_vm(vimid, backlog_input):
logger.debug("with backlog: %s" % backlog)
return backlog
+
+### process backlog with domain:"fault", type:"vm"
+
+def processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog):
+ logger.debug("vesAgentConfig:%s, vesAgentState:%s, oneBacklog: %s"
+ % (vesAgentConfig, vesAgentState, oneBacklog))
+
+ return
+
diff --git a/ocata/ocata/vesagent/tasks.py b/ocata/ocata/vesagent/tasks.py
index 2c94c8e6..21f1e201 100644
--- a/ocata/ocata/vesagent/tasks.py
+++ b/ocata/ocata/vesagent/tasks.py
@@ -22,6 +22,8 @@ import time
from django.core.cache import cache
+from ocata.vesagent.event_domain.fault_vm import processBacklog_fault_vm
+
logger = logging.getLogger(__name__)
@@ -83,7 +85,7 @@ def processBacklogsOfOneVIM(vimid):
:param vimid:
:return:
'''
- backlog_count = 3
+ backlog_count = 0
next_time_slot = 10
try:
@@ -100,9 +102,95 @@ def processBacklogsOfOneVIM(vimid):
return 0,next_time_slot
+ vesAgentStateStr = cache.get("VesAgentBacklogs.state.%s" % (vimid))
+ vesAgentState = json.loads(vesAgentStateStr) if vesAgentStateStr is not None else {}
+
+ ves_info = vesAgentConfig.get("subscription", None)
+ if ves_info is None:
+ logger.warn("VesAgentBacklogs.config.%s: ves subscription corrupts:%s" % (vimid, vesAgentConfigStr))
+ return 0,next_time_slot
+
+ poll_interval_default = vesAgentConfig.get("poll_interval_default", None)
+ if poll_interval_default is None:
+ logger.warn("VesAgentBacklogs.config.%s: poll_interval_default corrupts:%s" % (vimid, vesAgentConfigStr))
+ return 0,next_time_slot
+
+ if poll_interval_default == 0:
+ # invalid interval value
+ logger.warn("VesAgentBacklogs.config.%s: poll_interval_default invalid:%s" % (vimid, vesAgentConfigStr))
+ return 0,next_time_slot
+
+ backlogs_list = vesAgentConfig.get("backlogs", None)
+ if backlogs_list is None:
+ logger.warn("VesAgentBacklogs.config.%s: backlogs corrupts:%s" % (vimid, vesAgentConfigStr))
+ return 0,next_time_slot
+
+ for backlog in backlogs_list:
+ backlog_count_tmp, next_time_slot_tmp = processOneBacklog(
+ vesAgentConfig, vesAgentState, poll_interval_default, backlog)
+ logger.debug("processOneBacklog return with %s,%s" % (backlog_count_tmp, next_time_slot_tmp))
+ backlog_count += backlog_count_tmp
+ next_time_slot = next_time_slot_tmp if next_time_slot > next_time_slot_tmp else next_time_slot
+
+ pass
+
+ # save back the updated backlogs state
+ vesAgentStateStr = json.dumps(vesAgentState)
+ cache.set("VesAgentBacklogs.state.%s" % vimid, vesAgentStateStr, None)
+
+ except Exception as e:
+ logger.error("exception:%s" % str(e))
+
+ return backlog_count, next_time_slot
+
+
+def processOneBacklog(vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog):
+ logger.info("Process one backlog")
+ #logger.debug("vesAgentConfig:%s, vesAgentState:%s, poll_interval_default:%s, oneBacklog: %s"
+ # % (vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog))
+
+ backlog_count = 1
+ next_time_slot = 10
+ try:
+ timestamp_now = int(time.time())
+ backlog_uuid = oneBacklog.get("backlog_uuid", None)
+ if backlog_uuid is None:
+ # warning: uuid is None, omit this backlog
+ logger.warn("backlog without uuid: %s" % oneBacklog)
+ return 0, next_time_slot
+
+ backlogState = vesAgentState.get("%s" % (backlog_uuid), None)
+ if backlogState is None:
+ initialBacklogState = {
+ "timestamp": timestamp_now
+ }
+ vesAgentState["%s" % (backlog_uuid)] = initialBacklogState
+ backlogState = initialBacklogState
+
+ time_expiration = backlogState["timestamp"] \
+ + oneBacklog.get("poll_interval", poll_interval_default)
+ # check if poll interval expires
+ if timestamp_now < time_expiration:
+ # not expired yet
+ logger.info("return without dispatching, not expired yet")
+ return backlog_count, next_time_slot
+
+ logger.info("Dispatching backlog")
+
+ # collect data in case of expiration
+ if oneBacklog["domain"] == "fault" and oneBacklog["type"] == "vm":
+ processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog)
+ else:
+ logger.warn("Dispatching backlog fails due to unsupported backlog domain %s,type:%s"
+ % (oneBacklog["domain"], oneBacklog["type"]))
+ backlog_count = 0
+ pass
+ # update timestamp and internal state
+ backlogState["timestamp"] = timestamp_now
except Exception as e:
logger.error("exception:%s" % str(e))
+ logger.info("return")
return backlog_count, next_time_slot