summaryrefslogtreecommitdiffstats
path: root/windriver/titanium_cloud/vesagent/tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'windriver/titanium_cloud/vesagent/tasks.py')
-rw-r--r--windriver/titanium_cloud/vesagent/tasks.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/windriver/titanium_cloud/vesagent/tasks.py b/windriver/titanium_cloud/vesagent/tasks.py
new file mode 100644
index 00000000..ac760ece
--- /dev/null
+++ b/windriver/titanium_cloud/vesagent/tasks.py
@@ -0,0 +1,196 @@
+# Copyright (c) 2017-2018 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+### VES agent workers
+from __future__ import absolute_import, unicode_literals
+from titanium_cloud.celery import app
+import os
+import logging
+import json
+import time
+
+from django.core.cache import cache
+
+from titanium_cloud.vesagent.event_domain.fault_vm import processBacklog_fault_vm
+
+logger = logging.getLogger(__name__)
+
+
+@app.task(bind=True)
+def scheduleBacklogs(self, vimid):
+ # make sure only one task runs here
+ # cannot get vimid ? logger.info("schedule with vimid:%" % (vimid))
+
+ logger.debug("scheduleBacklogs starts")
+ backlog_count, next_time_slot = processBacklogs()
+ logger.debug("processBacklogs return with %s, %s" % (backlog_count, next_time_slot))
+
+ # sleep for next_time_slot
+ while backlog_count > 0:
+ time.sleep(next_time_slot)
+ backlog_count, next_time_slot = processBacklogs()
+
+ logger.debug("scheduleBacklogs stops")
+
+
+def processBacklogs():
+ # find out count of valid backlog and the next time slot
+ backlog_count = 0
+ next_time_slot = 10
+ try:
+ #get the whole list of backlog
+ VesAgentBacklogsVimListStr = cache.get("VesAgentBacklogs.vimlist")
+ if VesAgentBacklogsVimListStr is None:
+ logger.warn("VesAgentBacklogs.vimlist cannot be found in cache")
+ return 0,next_time_slot
+
+ logger.debug("VesAgentBacklogs.vimlist: %s" % (VesAgentBacklogsVimListStr))
+
+ backlogsAllVims = json.loads(VesAgentBacklogsVimListStr)
+ if backlogsAllVims is None:
+ logger.warn("VesAgentBacklogs.vimlist is empty")
+ return 0,next_time_slot
+
+ for vimid in backlogsAllVims:
+ #iterate each backlogs
+ backlog_count_tmp,next_time_slot_tmp = processBacklogsOfOneVIM(vimid)
+ logger.debug("vimid:%s, backlog_count,next_time_slot:%s,%s"
+ %( vimid,backlog_count_tmp,next_time_slot_tmp ))
+ backlog_count += backlog_count_tmp
+ next_time_slot = next_time_slot_tmp if next_time_slot > next_time_slot_tmp else next_time_slot
+ pass
+
+ except Exception as e:
+ logger.error("exception:%s" % str(e))
+
+ return backlog_count, next_time_slot
+
+ pass
+
+
+def processBacklogsOfOneVIM(vimid):
+ '''
+ process all backlogs for a VIM, return count of valid backlogs
+ :param vimid:
+ :return:
+ '''
+ backlog_count = 0
+ next_time_slot = 10
+
+ try:
+ vesAgentConfigStr = cache.get("VesAgentBacklogs.config.%s" % (vimid))
+ if vesAgentConfigStr is None:
+ logger.warn("VesAgentBacklogs.config.%s cannot be found in cache" % (vimid))
+ return 0,next_time_slot
+
+ logger.debug("VesAgentBacklogs.config.%s: %s" % (vimid, vesAgentConfigStr))
+
+ vesAgentConfig = json.loads(vesAgentConfigStr)
+ if vesAgentConfig is None:
+ logger.warn("VesAgentBacklogs.config.%s corrupts" % (vimid))
+ return 0,next_time_slot
+
+
+ vesAgentStateStr = cache.get("VesAgentBacklogs.state.%s" % (vimid))
+ vesAgentState = json.loads(vesAgentStateStr) if vesAgentStateStr is not None else {}
+
+ ves_info = vesAgentConfig.get("subscription", None)
+ if ves_info is None:
+ logger.warn("VesAgentBacklogs.config.%s: ves subscription corrupts:%s" % (vimid, vesAgentConfigStr))
+ return 0,next_time_slot
+
+ poll_interval_default = vesAgentConfig.get("poll_interval_default", None)
+ if poll_interval_default is None:
+ logger.warn("VesAgentBacklogs.config.%s: poll_interval_default corrupts:%s" % (vimid, vesAgentConfigStr))
+ return 0,next_time_slot
+
+ if poll_interval_default == 0:
+ # invalid interval value
+ logger.warn("VesAgentBacklogs.config.%s: poll_interval_default invalid:%s" % (vimid, vesAgentConfigStr))
+ return 0,next_time_slot
+
+ backlogs_list = vesAgentConfig.get("backlogs", None)
+ if backlogs_list is None:
+ logger.warn("VesAgentBacklogs.config.%s: backlogs corrupts:%s" % (vimid, vesAgentConfigStr))
+ return 0,next_time_slot
+
+ for backlog in backlogs_list:
+ backlog_count_tmp, next_time_slot_tmp = processOneBacklog(
+ vesAgentConfig, vesAgentState, poll_interval_default, backlog)
+ logger.debug("processOneBacklog return with %s,%s" % (backlog_count_tmp, next_time_slot_tmp))
+ backlog_count += backlog_count_tmp
+ next_time_slot = next_time_slot_tmp if next_time_slot > next_time_slot_tmp else next_time_slot
+
+ pass
+
+ # save back the updated backlogs state
+ vesAgentStateStr = json.dumps(vesAgentState)
+ cache.set("VesAgentBacklogs.state.%s" % vimid, vesAgentStateStr, None)
+
+ except Exception as e:
+ logger.error("exception:%s" % str(e))
+
+ return backlog_count, next_time_slot
+
+
+def processOneBacklog(vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog):
+ logger.info("Process one backlog")
+ #logger.debug("vesAgentConfig:%s, vesAgentState:%s, poll_interval_default:%s, oneBacklog: %s"
+ # % (vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog))
+
+ backlog_count = 1
+ next_time_slot = 10
+ try:
+ timestamp_now = int(time.time())
+ backlog_uuid = oneBacklog.get("backlog_uuid", None)
+ if backlog_uuid is None:
+ # warning: uuid is None, omit this backlog
+ logger.warn("backlog without uuid: %s" % oneBacklog)
+ return 0, next_time_slot
+
+ backlogState = vesAgentState.get("%s" % (backlog_uuid), None)
+ if backlogState is None:
+ initialBacklogState = {
+ "timestamp": timestamp_now
+ }
+ vesAgentState["%s" % (backlog_uuid)] = initialBacklogState
+ backlogState = initialBacklogState
+
+ time_expiration = backlogState["timestamp"] \
+ + oneBacklog.get("poll_interval", poll_interval_default)
+ # check if poll interval expires
+ if timestamp_now < time_expiration:
+ # not expired yet
+ logger.info("return without dispatching, not expired yet")
+ return backlog_count, next_time_slot
+
+ logger.info("Dispatching backlog")
+
+ # collect data in case of expiration
+ if oneBacklog["domain"] == "fault" and oneBacklog["type"] == "vm":
+ processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog)
+ else:
+ logger.warn("Dispatching backlog fails due to unsupported backlog domain %s,type:%s"
+ % (oneBacklog["domain"], oneBacklog["type"]))
+ backlog_count = 0
+ pass
+
+ # update timestamp and internal state
+ backlogState["timestamp"] = timestamp_now
+ except Exception as e:
+ logger.error("exception:%s" % str(e))
+
+ logger.info("return")
+ return backlog_count, next_time_slot
+