summaryrefslogtreecommitdiffstats
path: root/windriver/titanium_cloud/vesagent/tasks.py
blob: d4e67c957e5dda7c9636b67b5e346da5c34a530f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# Copyright (c) 2017-2018 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# VES agent workers
from __future__ import absolute_import, unicode_literals
from titanium_cloud.celery import app
# import os
import logging
import json
import time

from django.core.cache import cache

from titanium_cloud.vesagent.event_domain.fault_vm import processBacklog_fault_vm

logger = logging.getLogger(__name__)


@app.task(bind=True)
def scheduleBacklogs(self, vimid):
    # make sure only one task runs here
    # cannot get vimid ? logger.info("schedule with vimid:%" % (vimid))

    logger.debug("scheduleBacklogs starts")
    backlog_count, next_time_slot = processBacklogs()
    logger.debug("processBacklogs return with %s, %s" % (backlog_count, next_time_slot))

    # sleep for next_time_slot
    while backlog_count > 0:
        time.sleep(next_time_slot)
        backlog_count, next_time_slot = processBacklogs()

    logger.debug("scheduleBacklogs stops")


def processBacklogs():
    # find out count of valid backlog and the next time slot
    backlog_count = 0
    next_time_slot = 10
    try:
        # get the whole list of backlog
        VesAgentBacklogsVimListStr = cache.get("VesAgentBacklogs.vimlist")
        if VesAgentBacklogsVimListStr is None:
            logger.warn("VesAgentBacklogs.vimlist cannot be found in cache")
            return 0, next_time_slot

        logger.debug("VesAgentBacklogs.vimlist: %s" % VesAgentBacklogsVimListStr)

        backlogsAllVims = json.loads(VesAgentBacklogsVimListStr)
        if backlogsAllVims is None:
            logger.warn("VesAgentBacklogs.vimlist is empty")
            return 0, next_time_slot

        for vimid in backlogsAllVims:
            # iterate each backlogs
            backlog_count_tmp, next_time_slot_tmp = processBacklogsOfOneVIM(vimid)
            logger.debug("vimid:%s, backlog_count,next_time_slot:%s,%s"
                         % (vimid, backlog_count_tmp, next_time_slot_tmp))
            backlog_count += backlog_count_tmp
            next_time_slot = next_time_slot_tmp if next_time_slot > next_time_slot_tmp else next_time_slot
            pass

    except Exception as e:
        logger.error("exception:%s" % str(e))

    return backlog_count, next_time_slot

    pass


def processBacklogsOfOneVIM(vimid):
    '''
    process all backlogs for a VIM, return count of valid backlogs
    :param vimid:
    :return:
    '''
    backlog_count = 0
    next_time_slot = 10

    try:
        vesAgentConfigStr = cache.get("VesAgentBacklogs.config.%s" % vimid)
        if vesAgentConfigStr is None:
            logger.warn("VesAgentBacklogs.config.%s cannot be found in cache" % vimid)
            return 0, next_time_slot

        logger.debug("VesAgentBacklogs.config.%s: %s" % (vimid, vesAgentConfigStr))

        vesAgentConfig = json.loads(vesAgentConfigStr)
        if vesAgentConfig is None:
            logger.warn("VesAgentBacklogs.config.%s corrupts" % vimid)
            return 0, next_time_slot

        vesAgentStateStr = cache.get("VesAgentBacklogs.state.%s" % vimid)
        vesAgentState = json.loads(vesAgentStateStr) if vesAgentStateStr is not None else {}

        ves_info = vesAgentConfig.get("subscription", None)
        if ves_info is None:
            logger.warn("VesAgentBacklogs.config.%s: ves subscription corrupts:%s" % (vimid, vesAgentConfigStr))
            return 0, next_time_slot

        poll_interval_default = vesAgentConfig.get("poll_interval_default", None)
        if poll_interval_default is None:
            logger.warn("VesAgentBacklogs.config.%s: poll_interval_default corrupts:%s" % (vimid, vesAgentConfigStr))
            return 0, next_time_slot

        if poll_interval_default == 0:
            # invalid interval value
            logger.warn("VesAgentBacklogs.config.%s: poll_interval_default invalid:%s" % (vimid, vesAgentConfigStr))
            return 0, next_time_slot

        backlogs_list = vesAgentConfig.get("backlogs", None)
        if backlogs_list is None:
            logger.warn("VesAgentBacklogs.config.%s: backlogs corrupts:%s" % (vimid, vesAgentConfigStr))
            return 0, next_time_slot

        for backlog in backlogs_list:
            backlog_count_tmp, next_time_slot_tmp = \
                processOneBacklog(
                    vesAgentConfig, vesAgentState, poll_interval_default, backlog)
            logger.debug("processOneBacklog return with %s,%s" % (backlog_count_tmp, next_time_slot_tmp))
            backlog_count += backlog_count_tmp
            next_time_slot = next_time_slot_tmp if next_time_slot > next_time_slot_tmp else next_time_slot

            pass

        # save back the updated backlogs state
        vesAgentStateStr = json.dumps(vesAgentState)
        cache.set("VesAgentBacklogs.state.%s" % vimid, vesAgentStateStr, None)

    except Exception as e:
        logger.error("exception:%s" % str(e))

    return backlog_count, next_time_slot


def processOneBacklog(vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog):
    logger.info("Process one backlog")
    # logger.debug("vesAgentConfig:%s, vesAgentState:%s, poll_interval_default:%s, oneBacklog: %s"
    #             % (vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog))

    backlog_count = 1
    next_time_slot = 10
    try:
        timestamp_now = int(time.time())
        backlog_uuid = oneBacklog.get("backlog_uuid", None)
        if backlog_uuid is None:
            # warning: uuid is None, omit this backlog
            logger.warn("backlog without uuid: %s" % oneBacklog)
            return 0, next_time_slot

        backlogState = vesAgentState.get("%s" % (backlog_uuid), None)
        if backlogState is None:
            initialBacklogState = {
                "timestamp": timestamp_now
            }
            vesAgentState["%s" % (backlog_uuid)] = initialBacklogState
            backlogState = initialBacklogState

        time_expiration = \
            backlogState["timestamp"] + \
            oneBacklog.get("poll_interval", poll_interval_default)

        # check if poll interval expires
        if timestamp_now < time_expiration:
            # not expired yet
            logger.info("return without dispatching, not expired yet")
            return backlog_count, next_time_slot

        logger.info("Dispatching backlog")

        # collect data in case of expiration
        if oneBacklog["domain"] == "fault" and oneBacklog["type"] == "vm":
            processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog)
        else:
            logger.warn("Dispatching backlog fails due to unsupported backlog domain %s,type:%s"
                        % (oneBacklog["domain"], oneBacklog["type"]))
            backlog_count = 0
            pass

        # update timestamp and internal state
        backlogState["timestamp"] = timestamp_now
    except Exception as e:
        logger.error("exception:%s" % str(e))

    logger.info("return")
    return backlog_count, next_time_slot