From 30497ac60f062adba7dae751110dd9fc87ef04db Mon Sep 17 00:00:00 2001 From: Moshe Date: Wed, 14 Mar 2018 14:22:13 +0200 Subject: Onboard package test case Issue-ID: VNFSDK-196 Change-Id: I7ba6de4fe6c55b370f9787c23ad16f1afc26f678 Signed-off-by: Moshe --- vnftest/onap/runners/__init__.py | 0 vnftest/onap/runners/base.py | 250 -------------------------------------- vnftest/onap/runners/duration.py | 145 ---------------------- vnftest/onap/runners/dynamictp.py | 179 --------------------------- vnftest/onap/runners/iteration.py | 169 -------------------------- vnftest/onap/runners/search.py | 180 --------------------------- vnftest/onap/runners/sequence.py | 149 ----------------------- 7 files changed, 1072 deletions(-) delete mode 100644 vnftest/onap/runners/__init__.py delete mode 100755 vnftest/onap/runners/base.py delete mode 100644 vnftest/onap/runners/duration.py delete mode 100755 vnftest/onap/runners/dynamictp.py delete mode 100644 vnftest/onap/runners/iteration.py delete mode 100644 vnftest/onap/runners/search.py delete mode 100644 vnftest/onap/runners/sequence.py (limited to 'vnftest/onap/runners') diff --git a/vnftest/onap/runners/__init__.py b/vnftest/onap/runners/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/vnftest/onap/runners/base.py b/vnftest/onap/runners/base.py deleted file mode 100755 index 15d8a8d..0000000 --- a/vnftest/onap/runners/base.py +++ /dev/null @@ -1,250 +0,0 @@ -############################################################################## -# Copyright 2018 EuropeanSoftwareMarketingLtd. -# =================================================================== -# Licensed under the ApacheLicense, Version2.0 (the"License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under -# the License -############################################################################## -# vnftest comment: this is a modified copy of -# rally/rally/benchmark/runners/base.py - -from __future__ import absolute_import -import logging -import multiprocessing -import subprocess -import time -import traceback -import importlib -from Queue import Empty - -import vnftest.common.utils as utils -from vnftest.onap.steps import base as base_step -from vnftest.onap.steps.onap_api_call import OnapApiCall - -log = logging.getLogger(__name__) - - -def _execute_shell_command(command): - """execute shell script with error handling""" - exitcode = 0 - try: - output = subprocess.check_output(command, shell=True) - except Exception: - exitcode = -1 - output = traceback.format_exc() - log.error("exec command '%s' error:\n ", command) - log.error(traceback.format_exc()) - - return exitcode, output - - -def _single_action(seconds, command, queue): - """entrypoint for the single action process""" - log.debug("single action, fires after %d seconds (from now)", seconds) - time.sleep(seconds) - log.debug("single action: executing command: '%s'", command) - ret_code, data = _execute_shell_command(command) - if ret_code < 0: - log.error("single action error! command:%s", command) - queue.put({'single-action-data': data}) - return - log.debug("single action data: \n%s", data) - queue.put({'single-action-data': data}) - - -def _periodic_action(interval, command, queue): - """entrypoint for the periodic action process""" - log.debug("periodic action, fires every: %d seconds", interval) - time_spent = 0 - while True: - time.sleep(interval) - time_spent += interval - log.debug("periodic action, executing command: '%s'", command) - ret_code, data = _execute_shell_command(command) - if ret_code < 0: - log.error("periodic action error! command:%s", command) - queue.put({'periodic-action-data': data}) - break - log.debug("periodic action data: \n%s", data) - queue.put({'periodic-action-data': data}) - - -class Runner(object): - runners = [] - - @staticmethod - def get_cls(runner_type): - """return class of specified type""" - for runner in utils.itersubclasses(Runner): - if runner_type == runner.__execution_type__: - return runner - raise RuntimeError("No such runner_type %s" % runner_type) - - @staticmethod - def get_types(): - """return a list of known runner type (class) names""" - types = [] - for runner in utils.itersubclasses(Runner): - types.append(runner) - return types - - @staticmethod - def get(runner_cfg): - """Returns instance of a step runner for execution type. - """ - return Runner.get_cls(runner_cfg["type"])(runner_cfg) - - @staticmethod - def release(runner): - """Release the runner""" - if runner in Runner.runners: - Runner.runners.remove(runner) - - @staticmethod - def terminate(runner): - """Terminate the runner""" - if runner.process and runner.process.is_alive(): - runner.process.terminate() - - @staticmethod - def terminate_all(): - """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners", exc_info=True) - - # release dumper process as some errors before any runner is created - if not Runner.runners: - return - - for runner in Runner.runners: - log.debug("Terminating runner: %s", runner) - if runner.process: - runner.process.terminate() - runner.process.join() - if runner.periodic_action_process: - log.debug("Terminating periodic action process") - runner.periodic_action_process.terminate() - runner.periodic_action_process = None - Runner.release(runner) - - def __init__(self, config): - self.config = config - self.periodic_action_process = None - self.output_queue = multiprocessing.Queue() - self.result_queue = multiprocessing.Queue() - self.process = None - self.aborted = multiprocessing.Event() - Runner.runners.append(self) - - def run_post_stop_action(self): - """run a potentially configured post-stop action""" - if "post-stop-action" in self.config: - command = self.config["post-stop-action"]["command"] - log.debug("post stop action: command: '%s'", command) - ret_code, data = _execute_shell_command(command) - if ret_code < 0: - log.error("post action error! command:%s", command) - self.result_queue.put({'post-stop-action-data': data}) - return - log.debug("post-stop data: \n%s", data) - self.result_queue.put({'post-stop-action-data': data}) - - def _run_step(self, cls, method_name, step_cfg, context_cfg, input_params): - raise NotImplementedError - - def run(self, step_cfg, context_cfg, input_params): - step_type = step_cfg["type"] - class_name = base_step.Step.get(step_type) - path_split = class_name.split(".") - module_path = ".".join(path_split[:-1]) - module = importlib.import_module(module_path) - cls = getattr(module, path_split[-1]) - - self.config['object'] = class_name - self.aborted.clear() - - # run a potentially configured pre-start action - if "pre-start-action" in self.config: - command = self.config["pre-start-action"]["command"] - log.debug("pre start action: command: '%s'", command) - ret_code, data = _execute_shell_command(command) - if ret_code < 0: - log.error("pre-start action error! command:%s", command) - self.result_queue.put({'pre-start-action-data': data}) - return - log.debug("pre-start data: \n%s", data) - self.result_queue.put({'pre-start-action-data': data}) - - if "single-shot-action" in self.config: - single_action_process = multiprocessing.Process( - target=_single_action, - name="single-shot-action", - args=(self.config["single-shot-action"]["after"], - self.config["single-shot-action"]["command"], - self.result_queue)) - single_action_process.start() - - if "periodic-action" in self.config: - self.periodic_action_process = multiprocessing.Process( - target=_periodic_action, - name="periodic-action", - args=(self.config["periodic-action"]["interval"], - self.config["periodic-action"]["command"], - self.result_queue)) - self.periodic_action_process.start() - - self._run_step(cls, "run", step_cfg, context_cfg, input_params) - - def abort(self): - """Abort the execution of a step""" - self.aborted.set() - - QUEUE_JOIN_INTERVAL = 5 - - def poll(self, timeout=QUEUE_JOIN_INTERVAL): - self.process.join(timeout) - return self.process.exitcode - - def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL): - while self.process.exitcode is None: - # drain the queue while we are running otherwise we won't terminate - outputs.update(self.get_output()) - result.extend(self.get_result()) - self.process.join(interval) - # drain after the process has exited - outputs.update(self.get_output()) - result.extend(self.get_result()) - - self.process.terminate() - if self.periodic_action_process: - self.periodic_action_process.join(1) - self.periodic_action_process.terminate() - self.periodic_action_process = None - - self.run_post_stop_action() - return self.process.exitcode - - def get_output(self): - result = {} - while not self.output_queue.empty(): - log.debug("output_queue size %s", self.output_queue.qsize()) - try: - result.update(self.output_queue.get(True, 1)) - except Empty: - pass - return result - - def get_result(self): - result = [] - while not self.result_queue.empty(): - log.debug("result_queue size %s", self.result_queue.qsize()) - try: - result.append(self.result_queue.get(True, 1)) - except Empty: - pass - return result diff --git a/vnftest/onap/runners/duration.py b/vnftest/onap/runners/duration.py deleted file mode 100644 index 7e539e5..0000000 --- a/vnftest/onap/runners/duration.py +++ /dev/null @@ -1,145 +0,0 @@ -############################################################################## -# Copyright 2018 EuropeanSoftwareMarketingLtd. -# =================================================================== -# Licensed under the ApacheLicense, Version2.0 (the"License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under -# the License -############################################################################## -# vnftest comment: this is a modified copy of -# rally/rally/benchmark/runners/duration.py - -"""A runner that runs a specific time before it returns -""" - -from __future__ import absolute_import -import os -import multiprocessing -import logging -import traceback -import time - -from vnftest.onap.runners import base - -LOG = logging.getLogger(__name__) - - -QUEUE_PUT_TIMEOUT = 10 - - -def _worker_process(queue, cls, method_name, step_cfg, - context_cfg, aborted, output_queue): - - sequence = 1 - - runner_cfg = step_cfg['runner'] - - interval = runner_cfg.get("interval", 1) - duration = runner_cfg.get("duration", 60) - LOG.info("Worker START, duration is %ds", duration) - LOG.debug("class is %s", cls) - - runner_cfg['runner_id'] = os.getpid() - - step = cls(step_cfg, context_cfg) - step.setup() - method = getattr(step, method_name) - - sla_action = None - if "sla" in step_cfg: - sla_action = step_cfg["sla"].get("action", "assert") - - start = time.time() - timeout = start + duration - while True: - - LOG.debug("runner=%(runner)s seq=%(sequence)s START", - {"runner": runner_cfg["runner_id"], "sequence": sequence}) - - data = {} - errors = "" - - try: - result = method(data) - except AssertionError as assertion: - # SLA validation failed in scenario, determine what to do now - if sla_action == "assert": - raise - elif sla_action == "monitor": - LOG.warning("SLA validation failed: %s", assertion.args) - errors = assertion.args - # catch all exceptions because with multiprocessing we can have un-picklable exception - # problems https://bugs.python.org/issue9400 - except Exception: - errors = traceback.format_exc() - LOG.exception("") - else: - if result: - # add timeout for put so we don't block test - # if we do timeout we don't care about dropping individual KPIs - output_queue.put(result, True, QUEUE_PUT_TIMEOUT) - - time.sleep(interval) - - step_output = { - 'timestamp': time.time(), - 'sequence': sequence, - 'data': data, - 'errors': errors - } - - queue.put(step_output, True, QUEUE_PUT_TIMEOUT) - - LOG.debug("runner=%(runner)s seq=%(sequence)s END", - {"runner": runner_cfg["runner_id"], "sequence": sequence}) - - sequence += 1 - - if (errors and sla_action is None) or time.time() > timeout or aborted.is_set(): - LOG.info("Worker END") - break - - try: - step.teardown() - except Exception: - # catch any exception in teardown and convert to simple exception - # never pass exceptions back to multiprocessing, because some exceptions can - # be unpicklable - # https://bugs.python.org/issue9400 - LOG.exception("") - raise SystemExit(1) - - LOG.debug("queue.qsize() = %s", queue.qsize()) - LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) - - -class DurationRunner(base.Runner): - """Run a scenario for a certain amount of time - -If the scenario ends before the time has elapsed, it will be started again. - - Parameters - duration - amount of time the scenario will be run for - type: int - unit: seconds - default: 1 sec - interval - time to wait between each scenario invocation - type: int - unit: seconds - default: 1 sec - """ - __execution_type__ = 'Duration' - - def _run_step(self, cls, method, step_cfg, context_cfg): - name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid()) - self.process = multiprocessing.Process( - name=name, - target=_worker_process, - args=(self.result_queue, cls, method, step_cfg, - context_cfg, self.aborted, self.output_queue)) - self.process.start() diff --git a/vnftest/onap/runners/dynamictp.py b/vnftest/onap/runners/dynamictp.py deleted file mode 100755 index 5ea0910..0000000 --- a/vnftest/onap/runners/dynamictp.py +++ /dev/null @@ -1,179 +0,0 @@ -############################################################################## -# Copyright 2018 EuropeanSoftwareMarketingLtd. -# =================================================================== -# Licensed under the ApacheLicense, Version2.0 (the"License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under -# the License -############################################################################## -# vnftest comment: this is a modified copy of -# rally/rally/step/runners/dynamictp.py - -"""A runner that searches for the max throughput with binary search -""" - -import logging -import multiprocessing -import time -import traceback - -import os - -from vnftest.onap.runners import base - -LOG = logging.getLogger(__name__) - - -def _worker_process(queue, cls, method_name, step_cfg, - context_cfg, aborted): # pragma: no cover - - runner_cfg = step_cfg['runner'] - iterations = runner_cfg.get("iterations", 1) - interval = runner_cfg.get("interval", 1) - run_step = runner_cfg.get("run_step", "setup,run,teardown") - delta = runner_cfg.get("delta", 1000) - options_cfg = step_cfg['options'] - initial_rate = options_cfg.get("pps", 1000000) - LOG.info("worker START, class %s", cls) - - runner_cfg['runner_id'] = os.getpid() - - step = cls(step_cfg, context_cfg) - if "setup" in run_step: - step.setup() - - method = getattr(step, method_name) - - queue.put({'runner_id': runner_cfg['runner_id'], - 'step_cfg': step_cfg, - 'context_cfg': context_cfg}) - - if "run" in run_step: - iterator = 0 - search_max = initial_rate - search_min = 0 - while iterator < iterations: - search_min = int(search_min / 2) - step_cfg['options']['pps'] = search_max - search_max_found = False - max_throuput_found = False - sequence = 0 - - last_min_data = {'packets_per_second': 0} - - while True: - sequence += 1 - - data = {} - errors = "" - too_high = False - - LOG.debug("sequence: %s search_min: %s search_max: %s", - sequence, search_min, search_max) - - try: - method(data) - except AssertionError as assertion: - LOG.warning("SLA validation failed: %s" % assertion.args) - too_high = True - except Exception as e: - errors = traceback.format_exc() - LOG.exception(e) - - actual_pps = data['packets_per_second'] - - if too_high: - search_max = actual_pps - - if not search_max_found: - search_max_found = True - else: - last_min_data = data - search_min = actual_pps - - # Check if the actual rate is well below the asked rate - if step_cfg['options']['pps'] > actual_pps * 1.5: - search_max = actual_pps - LOG.debug("Sender reached max tput: %s", search_max) - elif not search_max_found: - search_max = int(actual_pps * 1.5) - - if ((search_max - search_min) < delta) or \ - (search_max <= search_min) or (10 <= sequence): - if last_min_data['packets_per_second'] > 0: - data = last_min_data - - step_output = { - 'timestamp': time.time(), - 'sequence': sequence, - 'data': data, - 'errors': errors - } - - record = { - 'runner_id': runner_cfg['runner_id'], - 'step': step_output - } - - queue.put(record) - max_throuput_found = True - - if errors or aborted.is_set() or max_throuput_found: - LOG.info("worker END") - break - - if not search_max_found: - step_cfg['options']['pps'] = search_max - else: - step_cfg['options']['pps'] = \ - (search_max - search_min) / 2 + search_min - - time.sleep(interval) - - iterator += 1 - LOG.debug("iterator: %s iterations: %s", iterator, iterations) - - if "teardown" in run_step: - try: - step.teardown() - except Exception: - # catch any exception in teardown and convert to simple exception - # never pass exceptions back to multiprocessing, because some exceptions can - # be unpicklable - # https://bugs.python.org/issue9400 - LOG.exception("") - raise SystemExit(1) - - LOG.debug("queue.qsize() = %s", queue.qsize()) - - -class IterationRunner(base.Runner): - """Run a step to find the max throughput - -If the step ends before the time has elapsed, it will be started again. - - Parameters - interval - time to wait between each step invocation - type: int - unit: seconds - default: 1 sec - delta - stop condition for the search. - type: int - unit: pps - default: 1000 pps - """ - __execution_type__ = 'Dynamictp' - - def _run_step(self, cls, method, step_cfg, context_cfg): - name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid()) - self.process = multiprocessing.Process( - name=name, - target=_worker_process, - args=(self.result_queue, cls, method, step_cfg, - context_cfg, self.aborted)) - self.process.start() diff --git a/vnftest/onap/runners/iteration.py b/vnftest/onap/runners/iteration.py deleted file mode 100644 index 9c9ab2c..0000000 --- a/vnftest/onap/runners/iteration.py +++ /dev/null @@ -1,169 +0,0 @@ -############################################################################## -# Copyright 2018 EuropeanSoftwareMarketingLtd. -# =================================================================== -# Licensed under the ApacheLicense, Version2.0 (the"License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under -# the License -############################################################################## -# vnftest comment: this is a modified copy of -# rally/rally/benchmark/runners/iteration.py - -"""A runner that runs a configurable number of times before it returns -""" - -from __future__ import absolute_import - -import logging -import multiprocessing -import time -import traceback - -import os -from vnftest.common.exceptions import VnftestException - -from vnftest.onap.runners import base - -LOG = logging.getLogger(__name__) - - -QUEUE_PUT_TIMEOUT = 10 - - -def _worker_process(result_queue, cls, method_name, step_cfg, - context_cfg, input_params, aborted, output_queue): - - sequence = 1 - - runner_cfg = step_cfg['runner'] - - interval = runner_cfg.get("interval", 1) - iterations = runner_cfg.get("iterations", 1) - run_step = runner_cfg.get("run_step", "setup,run,teardown") - - delta = runner_cfg.get("delta", 2) - LOG.info("worker START, iterations %d times, class %s", iterations, cls) - - runner_cfg['runner_id'] = os.getpid() - - step = cls(step_cfg, context_cfg, input_params) - if "setup" in run_step: - step.setup() - - method = getattr(step, method_name) - - sla_action = None - if "sla" in step_cfg: - sla_action = step_cfg["sla"].get("action", "assert") - if "run" in run_step: - while True: - - LOG.debug("runner=%(runner)s seq=%(sequence)s START", - {"runner": runner_cfg["runner_id"], - "sequence": sequence}) - - results = {} - errors = [] - fatal_error = False - - try: - output = method(results) - if output: - # add timeout for put so we don't block test - # if we do timeout we don't care about dropping individual KPIs - output_queue.put(output, True, QUEUE_PUT_TIMEOUT) - except AssertionError as assertion: - # SLA validation failed in step, determine what to do now - if sla_action == "assert": - raise - elif sla_action == "monitor": - LOG.warning("SLA validation failed: %s", assertion.args) - errors.append(assertion.args) - elif sla_action == "rate-control": - try: - step_cfg['options']['rate'] - except KeyError: - step_cfg.setdefault('options', {}) - step_cfg['options']['rate'] = 100 - - step_cfg['options']['rate'] -= delta - sequence = 1 - continue - except VnftestException: - errors.append(traceback.format_exc()) - LOG.exception("") - LOG.info("Abort the task") - fatal_error = True - - except Exception: - errors.append(traceback.format_exc()) - LOG.exception("") - - time.sleep(interval) - - step_results = { - 'timestamp': time.time(), - 'sequence': sequence, - 'data': results, - 'errors': errors - } - - result_queue.put(step_results, True, QUEUE_PUT_TIMEOUT) - - LOG.debug("runner=%(runner)s seq=%(sequence)s END", - {"runner": runner_cfg["runner_id"], - "sequence": sequence}) - - sequence += 1 - - if (errors and sla_action is None) or \ - (sequence > iterations or aborted.is_set()) or fatal_error: - LOG.info("worker END") - break - if "teardown" in run_step: - try: - step.teardown() - except Exception: - # catch any exception in teardown and convert to simple exception - # never pass exceptions back to multiprocessing, because some exceptions can - # be unpicklable - # https://bugs.python.org/issue9400 - LOG.exception("") - raise SystemExit(1) - - LOG.debug("queue.qsize() = %s", result_queue.qsize()) - LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) - if fatal_error: - raise SystemExit(1) - - -class IterationRunner(base.Runner): - """Run a step for a configurable number of times - -If the step ends before the time has elapsed, it will be started again. - - Parameters - iterations - amount of times the step will be run for - type: int - unit: na - default: 1 - interval - time to wait between each step invocation - type: int - unit: seconds - default: 1 sec - """ - __execution_type__ = 'Iteration' - - def _run_step(self, cls, method, step_cfg, context_cfg, input_params): - name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid()) - self.process = multiprocessing.Process( - name=name, - target=_worker_process, - args=(self.result_queue, cls, method, step_cfg, - context_cfg, input_params, self.aborted, self.output_queue)) - self.process.start() diff --git a/vnftest/onap/runners/search.py b/vnftest/onap/runners/search.py deleted file mode 100644 index d5bd417..0000000 --- a/vnftest/onap/runners/search.py +++ /dev/null @@ -1,180 +0,0 @@ -############################################################################## -# Copyright 2018 EuropeanSoftwareMarketingLtd. -# =================================================================== -# Licensed under the ApacheLicense, Version2.0 (the"License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under -# the License -############################################################################## -# vnftest comment: this is a modified copy of -# rally/rally/benchmark/runners/search.py - -"""A runner that runs a specific time before it returns -""" - -from __future__ import absolute_import - -import logging -import multiprocessing -import time -import traceback -from contextlib import contextmanager -from itertools import takewhile - -import os -from collections import Mapping -from six.moves import zip - -from vnftest.onap.runners import base - -LOG = logging.getLogger(__name__) - - -class SearchRunnerHelper(object): - - def __init__(self, cls, method_name, step_cfg, context_cfg, aborted): - super(SearchRunnerHelper, self).__init__() - self.cls = cls - self.method_name = method_name - self.step_cfg = step_cfg - self.context_cfg = context_cfg - self.aborted = aborted - self.runner_cfg = step_cfg['runner'] - self.run_step = self.runner_cfg.get("run_step", "setup,run,teardown") - self.timeout = self.runner_cfg.get("timeout", 60) - self.interval = self.runner_cfg.get("interval", 1) - self.step = None - self.method = None - - def __call__(self, *args, **kwargs): - if self.method is None: - raise RuntimeError - return self.method(*args, **kwargs) - - @contextmanager - def get_step_instance(self): - self.step = self.cls(self.step_cfg, self.context_cfg) - - if 'setup' in self.run_step: - self.step.setup() - - self.method = getattr(self.step, self.method_name) - LOG.info("worker START, timeout %d sec, class %s", self.timeout, self.cls) - try: - yield self - finally: - if 'teardown' in self.run_step: - self.step.teardown() - - def is_not_done(self): - if 'run' not in self.run_step: - raise StopIteration - - max_time = time.time() + self.timeout - - abort_iter = iter(self.aborted.is_set, True) - time_iter = takewhile(lambda t_now: t_now <= max_time, iter(time.time, -1)) - - for seq, _ in enumerate(zip(abort_iter, time_iter), 1): - yield seq - time.sleep(self.interval) - - -class SearchRunner(base.Runner): - """Run a step for a certain amount of time - -If the step ends before the time has elapsed, it will be started again. - - Parameters - timeout - amount of time the step will be run for - type: int - unit: seconds - default: 1 sec - interval - time to wait between each step invocation - type: int - unit: seconds - default: 1 sec - """ - __execution_type__ = 'Search' - - def __init__(self, config): - super(SearchRunner, self).__init__(config) - self.runner_cfg = None - self.runner_id = None - self.sla_action = None - self.worker_helper = None - - def _worker_run_once(self, sequence): - LOG.debug("runner=%s seq=%s START", self.runner_id, sequence) - - data = {} - errors = "" - - try: - self.worker_helper(data) - except AssertionError as assertion: - # SLA validation failed in step, determine what to do now - if self.sla_action == "assert": - raise - elif self.sla_action == "monitor": - LOG.warning("SLA validation failed: %s", assertion.args) - errors = assertion.args - except Exception as e: - errors = traceback.format_exc() - LOG.exception(e) - - record = { - 'runner_id': self.runner_id, - 'step': { - 'timestamp': time.time(), - 'sequence': sequence, - 'data': data, - 'errors': errors, - }, - } - - self.result_queue.put(record) - - LOG.debug("runner=%s seq=%s END", self.runner_id, sequence) - - # Have to search through all the VNF KPIs - kpi_done = any(kpi.get('done') for kpi in data.values() if isinstance(kpi, Mapping)) - - return kpi_done or (errors and self.sla_action is None) - - def _worker_run(self, cls, method_name, step_cfg, context_cfg): - self.runner_cfg = step_cfg['runner'] - self.runner_id = self.runner_cfg['runner_id'] = os.getpid() - - self.worker_helper = SearchRunnerHelper(cls, method_name, step_cfg, - context_cfg, self.aborted) - - try: - self.sla_action = step_cfg['sla'].get('action', 'assert') - except KeyError: - self.sla_action = None - - self.result_queue.put({ - 'runner_id': self.runner_id, - 'step_cfg': step_cfg, - 'context_cfg': context_cfg - }) - - with self.worker_helper.get_step_instance(): - for sequence in self.worker_helper.is_not_done(): - if self._worker_run_once(sequence): - LOG.info("worker END") - break - - def _run_step(self, cls, method, step_cfg, context_cfg): - name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid()) - self.process = multiprocessing.Process( - name=name, - target=self._worker_run, - args=(cls, method, step_cfg, context_cfg)) - self.process.start() diff --git a/vnftest/onap/runners/sequence.py b/vnftest/onap/runners/sequence.py deleted file mode 100644 index b341495..0000000 --- a/vnftest/onap/runners/sequence.py +++ /dev/null @@ -1,149 +0,0 @@ -############################################################################## -# Copyright 2018 EuropeanSoftwareMarketingLtd. -# =================================================================== -# Licensed under the ApacheLicense, Version2.0 (the"License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under -# the License -############################################################################## -# vnftest comment: this is a modified copy of -# rally/rally/benchmark/runners/sequence.py - -"""A runner that every run changes a specified input value to the step. -The input value in the sequence is specified in a list in the input file. -""" - -from __future__ import absolute_import - -import logging -import multiprocessing -import time -import traceback - -import os - -from vnftest.onap.runners import base - -LOG = logging.getLogger(__name__) - - -def _worker_process(queue, cls, method_name, step_cfg, - context_cfg, aborted, output_queue): - - sequence = 1 - - runner_cfg = step_cfg['runner'] - - interval = runner_cfg.get("interval", 1) - arg_name = runner_cfg.get('step_option_name') - sequence_values = runner_cfg.get('sequence') - - if 'options' not in step_cfg: - step_cfg['options'] = {} - - options = step_cfg['options'] - - runner_cfg['runner_id'] = os.getpid() - - LOG.info("worker START, sequence_values(%s, %s), class %s", - arg_name, sequence_values, cls) - - step = cls(step_cfg, context_cfg) - step.setup() - method = getattr(step, method_name) - - sla_action = None - if "sla" in step_cfg: - sla_action = step_cfg["sla"].get("action", "assert") - - for value in sequence_values: - options[arg_name] = value - - LOG.debug("runner=%(runner)s seq=%(sequence)s START", - {"runner": runner_cfg["runner_id"], "sequence": sequence}) - - data = {} - errors = "" - - try: - result = method(data) - except AssertionError as assertion: - # SLA validation failed in step, determine what to do now - if sla_action == "assert": - raise - elif sla_action == "monitor": - LOG.warning("SLA validation failed: %s", assertion.args) - errors = assertion.args - except Exception as e: - errors = traceback.format_exc() - LOG.exception(e) - else: - if result: - output_queue.put(result) - - time.sleep(interval) - - step_output = { - 'timestamp': time.time(), - 'sequence': sequence, - 'data': data, - 'errors': errors - } - - queue.put(step_output) - - LOG.debug("runner=%(runner)s seq=%(sequence)s END", - {"runner": runner_cfg["runner_id"], "sequence": sequence}) - - sequence += 1 - - if (errors and sla_action is None) or aborted.is_set(): - break - - try: - step.teardown() - except Exception: - # catch any exception in teardown and convert to simple exception - # never pass exceptions back to multiprocessing, because some exceptions can - # be unpicklable - # https://bugs.python.org/issue9400 - LOG.exception("") - raise SystemExit(1) - LOG.info("worker END") - LOG.debug("queue.qsize() = %s", queue.qsize()) - LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) - - -class SequenceRunner(base.Runner): - """Run a step by changing an input value defined in a list - - Parameters - interval - time to wait between each step invocation - type: int - unit: seconds - default: 1 sec - step_option_name - name of the option that is increased each invocation - type: string - unit: na - default: none - sequence - list of values which are executed in their respective steps - type: [int] - unit: na - default: none - """ - - __execution_type__ = 'Sequence' - - def _run_step(self, cls, method, step_cfg, context_cfg): - name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid()) - self.process = multiprocessing.Process( - name=name, - target=_worker_process, - args=(self.result_queue, cls, method, step_cfg, - context_cfg, self.aborted, self.output_queue)) - self.process.start() -- cgit 1.2.3-korg