############################################################################## # Copyright 2018 EuropeanSoftwareMarketingLtd. # =================================================================== # Licensed under the ApacheLicense, Version2.0 (the"License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # # software distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under # the License ############################################################################## # vnftest comment: this is a modified copy of # rally/rally/benchmark/runners/search.py """A runner that runs a specific time before it returns """ from __future__ import absolute_import import logging import multiprocessing import time import traceback from collections import Mapping from contextlib import contextmanager from itertools import takewhile import os from six.moves import zip from vnftest.runners import base LOG = logging.getLogger(__name__) class SearchRunnerHelper(object): def __init__(self, cls, method_name, step_cfg, context_cfg, aborted): super(SearchRunnerHelper, self).__init__() self.cls = cls self.method_name = method_name self.step_cfg = step_cfg self.context_cfg = context_cfg self.aborted = aborted self.runner_cfg = step_cfg['runner'] self.run_step = self.runner_cfg.get("run_step", "setup,run,teardown") self.timeout = self.runner_cfg.get("timeout", 60) self.interval = self.runner_cfg.get("interval", 1) self.step = None self.method = None def __call__(self, *args, **kwargs): if self.method is None: raise RuntimeError return self.method(*args, **kwargs) @contextmanager def get_step_instance(self): self.step = self.cls(self.step_cfg, self.context_cfg) if 'setup' in self.run_step: self.step.setup() self.method = getattr(self.step, self.method_name) LOG.info("worker START, timeout %d sec, class %s", self.timeout, self.cls) try: yield self finally: if 'teardown' in self.run_step: self.step.teardown() def is_not_done(self): if 'run' not in self.run_step: raise StopIteration max_time = time.time() + self.timeout abort_iter = iter(self.aborted.is_set, True) time_iter = takewhile(lambda t_now: t_now <= max_time, iter(time.time, -1)) for seq, _ in enumerate(zip(abort_iter, time_iter), 1): yield seq time.sleep(self.interval) class SearchRunner(base.Runner): """Run a step for a certain amount of time If the step ends before the time has elapsed, it will be started again. Parameters timeout - amount of time the step will be run for type: int unit: seconds default: 1 sec interval - time to wait between each step invocation type: int unit: seconds default: 1 sec """ __execution_type__ = 'Search' def __init__(self, config): super(SearchRunner, self).__init__(config) self.runner_cfg = None self.runner_id = None self.sla_action = None self.worker_helper = None def _worker_run_once(self, sequence): LOG.debug("runner=%s seq=%s START", self.runner_id, sequence) data = {} errors = "" try: self.worker_helper(data) except AssertionError as assertion: # SLA validation failed in step, determine what to do now if self.sla_action == "assert": raise elif self.sla_action == "monitor": LOG.warning("SLA validation failed: %s", assertion.args) errors = assertion.args except Exception as e: errors = traceback.format_exc() LOG.exception(e) record = { 'runner_id': self.runner_id, 'step': { 'timestamp': time.time(), 'sequence': sequence, 'data': data, 'errors': errors, }, } self.result_queue.put(record) LOG.debug("runner=%s seq=%s END", self.runner_id, sequence) # Have to search through all the VNF KPIs kpi_done = any(kpi.get('done') for kpi in data.values() if isinstance(kpi, Mapping)) return kpi_done or (errors and self.sla_action is None) def _worker_run(self, cls, method_name, step_cfg, context_cfg): self.runner_cfg = step_cfg['runner'] self.runner_id = self.runner_cfg['runner_id'] = os.getpid() self.worker_helper = SearchRunnerHelper(cls, method_name, step_cfg, context_cfg, self.aborted) try: self.sla_action = step_cfg['sla'].get('action', 'assert') except KeyError: self.sla_action = None self.result_queue.put({ 'runner_id': self.runner_id, 'step_cfg': step_cfg, 'context_cfg': context_cfg }) with self.worker_helper.get_step_instance(): for sequence in self.worker_helper.is_not_done(): if self._worker_run_once(sequence): LOG.info("worker END") break def _run_step(self, cls, method, step_cfg, context_cfg): name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( name=name, target=self._worker_run, args=(cls, method, step_cfg, context_cfg)) self.process.start()