summaryrefslogtreecommitdiffstats
path: root/vnftest/onap/runners
diff options
context:
space:
mode:
Diffstat (limited to 'vnftest/onap/runners')
-rw-r--r--vnftest/onap/runners/__init__.py0
-rwxr-xr-xvnftest/onap/runners/base.py250
-rw-r--r--vnftest/onap/runners/duration.py145
-rwxr-xr-xvnftest/onap/runners/dynamictp.py179
-rw-r--r--vnftest/onap/runners/iteration.py169
-rw-r--r--vnftest/onap/runners/search.py180
-rw-r--r--vnftest/onap/runners/sequence.py149
7 files changed, 0 insertions, 1072 deletions
diff --git a/vnftest/onap/runners/__init__.py b/vnftest/onap/runners/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/vnftest/onap/runners/__init__.py
+++ /dev/null
diff --git a/vnftest/onap/runners/base.py b/vnftest/onap/runners/base.py
deleted file mode 100755
index 15d8a8d..0000000
--- a/vnftest/onap/runners/base.py
+++ /dev/null
@@ -1,250 +0,0 @@
-##############################################################################
-# Copyright 2018 EuropeanSoftwareMarketingLtd.
-# ===================================================================
-# Licensed under the ApacheLicense, Version2.0 (the"License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# software distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and limitations under
-# the License
-##############################################################################
-# vnftest comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-import logging
-import multiprocessing
-import subprocess
-import time
-import traceback
-import importlib
-from Queue import Empty
-
-import vnftest.common.utils as utils
-from vnftest.onap.steps import base as base_step
-from vnftest.onap.steps.onap_api_call import OnapApiCall
-
-log = logging.getLogger(__name__)
-
-
-def _execute_shell_command(command):
- """execute shell script with error handling"""
- exitcode = 0
- try:
- output = subprocess.check_output(command, shell=True)
- except Exception:
- exitcode = -1
- output = traceback.format_exc()
- log.error("exec command '%s' error:\n ", command)
- log.error(traceback.format_exc())
-
- return exitcode, output
-
-
-def _single_action(seconds, command, queue):
- """entrypoint for the single action process"""
- log.debug("single action, fires after %d seconds (from now)", seconds)
- time.sleep(seconds)
- log.debug("single action: executing command: '%s'", command)
- ret_code, data = _execute_shell_command(command)
- if ret_code < 0:
- log.error("single action error! command:%s", command)
- queue.put({'single-action-data': data})
- return
- log.debug("single action data: \n%s", data)
- queue.put({'single-action-data': data})
-
-
-def _periodic_action(interval, command, queue):
- """entrypoint for the periodic action process"""
- log.debug("periodic action, fires every: %d seconds", interval)
- time_spent = 0
- while True:
- time.sleep(interval)
- time_spent += interval
- log.debug("periodic action, executing command: '%s'", command)
- ret_code, data = _execute_shell_command(command)
- if ret_code < 0:
- log.error("periodic action error! command:%s", command)
- queue.put({'periodic-action-data': data})
- break
- log.debug("periodic action data: \n%s", data)
- queue.put({'periodic-action-data': data})
-
-
-class Runner(object):
- runners = []
-
- @staticmethod
- def get_cls(runner_type):
- """return class of specified type"""
- for runner in utils.itersubclasses(Runner):
- if runner_type == runner.__execution_type__:
- return runner
- raise RuntimeError("No such runner_type %s" % runner_type)
-
- @staticmethod
- def get_types():
- """return a list of known runner type (class) names"""
- types = []
- for runner in utils.itersubclasses(Runner):
- types.append(runner)
- return types
-
- @staticmethod
- def get(runner_cfg):
- """Returns instance of a step runner for execution type.
- """
- return Runner.get_cls(runner_cfg["type"])(runner_cfg)
-
- @staticmethod
- def release(runner):
- """Release the runner"""
- if runner in Runner.runners:
- Runner.runners.remove(runner)
-
- @staticmethod
- def terminate(runner):
- """Terminate the runner"""
- if runner.process and runner.process.is_alive():
- runner.process.terminate()
-
- @staticmethod
- def terminate_all():
- """Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners", exc_info=True)
-
- # release dumper process as some errors before any runner is created
- if not Runner.runners:
- return
-
- for runner in Runner.runners:
- log.debug("Terminating runner: %s", runner)
- if runner.process:
- runner.process.terminate()
- runner.process.join()
- if runner.periodic_action_process:
- log.debug("Terminating periodic action process")
- runner.periodic_action_process.terminate()
- runner.periodic_action_process = None
- Runner.release(runner)
-
- def __init__(self, config):
- self.config = config
- self.periodic_action_process = None
- self.output_queue = multiprocessing.Queue()
- self.result_queue = multiprocessing.Queue()
- self.process = None
- self.aborted = multiprocessing.Event()
- Runner.runners.append(self)
-
- def run_post_stop_action(self):
- """run a potentially configured post-stop action"""
- if "post-stop-action" in self.config:
- command = self.config["post-stop-action"]["command"]
- log.debug("post stop action: command: '%s'", command)
- ret_code, data = _execute_shell_command(command)
- if ret_code < 0:
- log.error("post action error! command:%s", command)
- self.result_queue.put({'post-stop-action-data': data})
- return
- log.debug("post-stop data: \n%s", data)
- self.result_queue.put({'post-stop-action-data': data})
-
- def _run_step(self, cls, method_name, step_cfg, context_cfg, input_params):
- raise NotImplementedError
-
- def run(self, step_cfg, context_cfg, input_params):
- step_type = step_cfg["type"]
- class_name = base_step.Step.get(step_type)
- path_split = class_name.split(".")
- module_path = ".".join(path_split[:-1])
- module = importlib.import_module(module_path)
- cls = getattr(module, path_split[-1])
-
- self.config['object'] = class_name
- self.aborted.clear()
-
- # run a potentially configured pre-start action
- if "pre-start-action" in self.config:
- command = self.config["pre-start-action"]["command"]
- log.debug("pre start action: command: '%s'", command)
- ret_code, data = _execute_shell_command(command)
- if ret_code < 0:
- log.error("pre-start action error! command:%s", command)
- self.result_queue.put({'pre-start-action-data': data})
- return
- log.debug("pre-start data: \n%s", data)
- self.result_queue.put({'pre-start-action-data': data})
-
- if "single-shot-action" in self.config:
- single_action_process = multiprocessing.Process(
- target=_single_action,
- name="single-shot-action",
- args=(self.config["single-shot-action"]["after"],
- self.config["single-shot-action"]["command"],
- self.result_queue))
- single_action_process.start()
-
- if "periodic-action" in self.config:
- self.periodic_action_process = multiprocessing.Process(
- target=_periodic_action,
- name="periodic-action",
- args=(self.config["periodic-action"]["interval"],
- self.config["periodic-action"]["command"],
- self.result_queue))
- self.periodic_action_process.start()
-
- self._run_step(cls, "run", step_cfg, context_cfg, input_params)
-
- def abort(self):
- """Abort the execution of a step"""
- self.aborted.set()
-
- QUEUE_JOIN_INTERVAL = 5
-
- def poll(self, timeout=QUEUE_JOIN_INTERVAL):
- self.process.join(timeout)
- return self.process.exitcode
-
- def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
- while self.process.exitcode is None:
- # drain the queue while we are running otherwise we won't terminate
- outputs.update(self.get_output())
- result.extend(self.get_result())
- self.process.join(interval)
- # drain after the process has exited
- outputs.update(self.get_output())
- result.extend(self.get_result())
-
- self.process.terminate()
- if self.periodic_action_process:
- self.periodic_action_process.join(1)
- self.periodic_action_process.terminate()
- self.periodic_action_process = None
-
- self.run_post_stop_action()
- return self.process.exitcode
-
- def get_output(self):
- result = {}
- while not self.output_queue.empty():
- log.debug("output_queue size %s", self.output_queue.qsize())
- try:
- result.update(self.output_queue.get(True, 1))
- except Empty:
- pass
- return result
-
- def get_result(self):
- result = []
- while not self.result_queue.empty():
- log.debug("result_queue size %s", self.result_queue.qsize())
- try:
- result.append(self.result_queue.get(True, 1))
- except Empty:
- pass
- return result
diff --git a/vnftest/onap/runners/duration.py b/vnftest/onap/runners/duration.py
deleted file mode 100644
index 7e539e5..0000000
--- a/vnftest/onap/runners/duration.py
+++ /dev/null
@@ -1,145 +0,0 @@
-##############################################################################
-# Copyright 2018 EuropeanSoftwareMarketingLtd.
-# ===================================================================
-# Licensed under the ApacheLicense, Version2.0 (the"License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# software distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and limitations under
-# the License
-##############################################################################
-# vnftest comment: this is a modified copy of
-# rally/rally/benchmark/runners/duration.py
-
-"""A runner that runs a specific time before it returns
-"""
-
-from __future__ import absolute_import
-import os
-import multiprocessing
-import logging
-import traceback
-import time
-
-from vnftest.onap.runners import base
-
-LOG = logging.getLogger(__name__)
-
-
-QUEUE_PUT_TIMEOUT = 10
-
-
-def _worker_process(queue, cls, method_name, step_cfg,
- context_cfg, aborted, output_queue):
-
- sequence = 1
-
- runner_cfg = step_cfg['runner']
-
- interval = runner_cfg.get("interval", 1)
- duration = runner_cfg.get("duration", 60)
- LOG.info("Worker START, duration is %ds", duration)
- LOG.debug("class is %s", cls)
-
- runner_cfg['runner_id'] = os.getpid()
-
- step = cls(step_cfg, context_cfg)
- step.setup()
- method = getattr(step, method_name)
-
- sla_action = None
- if "sla" in step_cfg:
- sla_action = step_cfg["sla"].get("action", "assert")
-
- start = time.time()
- timeout = start + duration
- while True:
-
- LOG.debug("runner=%(runner)s seq=%(sequence)s START",
- {"runner": runner_cfg["runner_id"], "sequence": sequence})
-
- data = {}
- errors = ""
-
- try:
- result = method(data)
- except AssertionError as assertion:
- # SLA validation failed in scenario, determine what to do now
- if sla_action == "assert":
- raise
- elif sla_action == "monitor":
- LOG.warning("SLA validation failed: %s", assertion.args)
- errors = assertion.args
- # catch all exceptions because with multiprocessing we can have un-picklable exception
- # problems https://bugs.python.org/issue9400
- except Exception:
- errors = traceback.format_exc()
- LOG.exception("")
- else:
- if result:
- # add timeout for put so we don't block test
- # if we do timeout we don't care about dropping individual KPIs
- output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
-
- time.sleep(interval)
-
- step_output = {
- 'timestamp': time.time(),
- 'sequence': sequence,
- 'data': data,
- 'errors': errors
- }
-
- queue.put(step_output, True, QUEUE_PUT_TIMEOUT)
-
- LOG.debug("runner=%(runner)s seq=%(sequence)s END",
- {"runner": runner_cfg["runner_id"], "sequence": sequence})
-
- sequence += 1
-
- if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
- LOG.info("Worker END")
- break
-
- try:
- step.teardown()
- except Exception:
- # catch any exception in teardown and convert to simple exception
- # never pass exceptions back to multiprocessing, because some exceptions can
- # be unpicklable
- # https://bugs.python.org/issue9400
- LOG.exception("")
- raise SystemExit(1)
-
- LOG.debug("queue.qsize() = %s", queue.qsize())
- LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
-
-
-class DurationRunner(base.Runner):
- """Run a scenario for a certain amount of time
-
-If the scenario ends before the time has elapsed, it will be started again.
-
- Parameters
- duration - amount of time the scenario will be run for
- type: int
- unit: seconds
- default: 1 sec
- interval - time to wait between each scenario invocation
- type: int
- unit: seconds
- default: 1 sec
- """
- __execution_type__ = 'Duration'
-
- def _run_step(self, cls, method, step_cfg, context_cfg):
- name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
- self.process = multiprocessing.Process(
- name=name,
- target=_worker_process,
- args=(self.result_queue, cls, method, step_cfg,
- context_cfg, self.aborted, self.output_queue))
- self.process.start()
diff --git a/vnftest/onap/runners/dynamictp.py b/vnftest/onap/runners/dynamictp.py
deleted file mode 100755
index 5ea0910..0000000
--- a/vnftest/onap/runners/dynamictp.py
+++ /dev/null
@@ -1,179 +0,0 @@
-##############################################################################
-# Copyright 2018 EuropeanSoftwareMarketingLtd.
-# ===================================================================
-# Licensed under the ApacheLicense, Version2.0 (the"License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# software distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and limitations under
-# the License
-##############################################################################
-# vnftest comment: this is a modified copy of
-# rally/rally/step/runners/dynamictp.py
-
-"""A runner that searches for the max throughput with binary search
-"""
-
-import logging
-import multiprocessing
-import time
-import traceback
-
-import os
-
-from vnftest.onap.runners import base
-
-LOG = logging.getLogger(__name__)
-
-
-def _worker_process(queue, cls, method_name, step_cfg,
- context_cfg, aborted): # pragma: no cover
-
- runner_cfg = step_cfg['runner']
- iterations = runner_cfg.get("iterations", 1)
- interval = runner_cfg.get("interval", 1)
- run_step = runner_cfg.get("run_step", "setup,run,teardown")
- delta = runner_cfg.get("delta", 1000)
- options_cfg = step_cfg['options']
- initial_rate = options_cfg.get("pps", 1000000)
- LOG.info("worker START, class %s", cls)
-
- runner_cfg['runner_id'] = os.getpid()
-
- step = cls(step_cfg, context_cfg)
- if "setup" in run_step:
- step.setup()
-
- method = getattr(step, method_name)
-
- queue.put({'runner_id': runner_cfg['runner_id'],
- 'step_cfg': step_cfg,
- 'context_cfg': context_cfg})
-
- if "run" in run_step:
- iterator = 0
- search_max = initial_rate
- search_min = 0
- while iterator < iterations:
- search_min = int(search_min / 2)
- step_cfg['options']['pps'] = search_max
- search_max_found = False
- max_throuput_found = False
- sequence = 0
-
- last_min_data = {'packets_per_second': 0}
-
- while True:
- sequence += 1
-
- data = {}
- errors = ""
- too_high = False
-
- LOG.debug("sequence: %s search_min: %s search_max: %s",
- sequence, search_min, search_max)
-
- try:
- method(data)
- except AssertionError as assertion:
- LOG.warning("SLA validation failed: %s" % assertion.args)
- too_high = True
- except Exception as e:
- errors = traceback.format_exc()
- LOG.exception(e)
-
- actual_pps = data['packets_per_second']
-
- if too_high:
- search_max = actual_pps
-
- if not search_max_found:
- search_max_found = True
- else:
- last_min_data = data
- search_min = actual_pps
-
- # Check if the actual rate is well below the asked rate
- if step_cfg['options']['pps'] > actual_pps * 1.5:
- search_max = actual_pps
- LOG.debug("Sender reached max tput: %s", search_max)
- elif not search_max_found:
- search_max = int(actual_pps * 1.5)
-
- if ((search_max - search_min) < delta) or \
- (search_max <= search_min) or (10 <= sequence):
- if last_min_data['packets_per_second'] > 0:
- data = last_min_data
-
- step_output = {
- 'timestamp': time.time(),
- 'sequence': sequence,
- 'data': data,
- 'errors': errors
- }
-
- record = {
- 'runner_id': runner_cfg['runner_id'],
- 'step': step_output
- }
-
- queue.put(record)
- max_throuput_found = True
-
- if errors or aborted.is_set() or max_throuput_found:
- LOG.info("worker END")
- break
-
- if not search_max_found:
- step_cfg['options']['pps'] = search_max
- else:
- step_cfg['options']['pps'] = \
- (search_max - search_min) / 2 + search_min
-
- time.sleep(interval)
-
- iterator += 1
- LOG.debug("iterator: %s iterations: %s", iterator, iterations)
-
- if "teardown" in run_step:
- try:
- step.teardown()
- except Exception:
- # catch any exception in teardown and convert to simple exception
- # never pass exceptions back to multiprocessing, because some exceptions can
- # be unpicklable
- # https://bugs.python.org/issue9400
- LOG.exception("")
- raise SystemExit(1)
-
- LOG.debug("queue.qsize() = %s", queue.qsize())
-
-
-class IterationRunner(base.Runner):
- """Run a step to find the max throughput
-
-If the step ends before the time has elapsed, it will be started again.
-
- Parameters
- interval - time to wait between each step invocation
- type: int
- unit: seconds
- default: 1 sec
- delta - stop condition for the search.
- type: int
- unit: pps
- default: 1000 pps
- """
- __execution_type__ = 'Dynamictp'
-
- def _run_step(self, cls, method, step_cfg, context_cfg):
- name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
- self.process = multiprocessing.Process(
- name=name,
- target=_worker_process,
- args=(self.result_queue, cls, method, step_cfg,
- context_cfg, self.aborted))
- self.process.start()
diff --git a/vnftest/onap/runners/iteration.py b/vnftest/onap/runners/iteration.py
deleted file mode 100644
index 9c9ab2c..0000000
--- a/vnftest/onap/runners/iteration.py
+++ /dev/null
@@ -1,169 +0,0 @@
-##############################################################################
-# Copyright 2018 EuropeanSoftwareMarketingLtd.
-# ===================================================================
-# Licensed under the ApacheLicense, Version2.0 (the"License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# software distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and limitations under
-# the License
-##############################################################################
-# vnftest comment: this is a modified copy of
-# rally/rally/benchmark/runners/iteration.py
-
-"""A runner that runs a configurable number of times before it returns
-"""
-
-from __future__ import absolute_import
-
-import logging
-import multiprocessing
-import time
-import traceback
-
-import os
-from vnftest.common.exceptions import VnftestException
-
-from vnftest.onap.runners import base
-
-LOG = logging.getLogger(__name__)
-
-
-QUEUE_PUT_TIMEOUT = 10
-
-
-def _worker_process(result_queue, cls, method_name, step_cfg,
- context_cfg, input_params, aborted, output_queue):
-
- sequence = 1
-
- runner_cfg = step_cfg['runner']
-
- interval = runner_cfg.get("interval", 1)
- iterations = runner_cfg.get("iterations", 1)
- run_step = runner_cfg.get("run_step", "setup,run,teardown")
-
- delta = runner_cfg.get("delta", 2)
- LOG.info("worker START, iterations %d times, class %s", iterations, cls)
-
- runner_cfg['runner_id'] = os.getpid()
-
- step = cls(step_cfg, context_cfg, input_params)
- if "setup" in run_step:
- step.setup()
-
- method = getattr(step, method_name)
-
- sla_action = None
- if "sla" in step_cfg:
- sla_action = step_cfg["sla"].get("action", "assert")
- if "run" in run_step:
- while True:
-
- LOG.debug("runner=%(runner)s seq=%(sequence)s START",
- {"runner": runner_cfg["runner_id"],
- "sequence": sequence})
-
- results = {}
- errors = []
- fatal_error = False
-
- try:
- output = method(results)
- if output:
- # add timeout for put so we don't block test
- # if we do timeout we don't care about dropping individual KPIs
- output_queue.put(output, True, QUEUE_PUT_TIMEOUT)
- except AssertionError as assertion:
- # SLA validation failed in step, determine what to do now
- if sla_action == "assert":
- raise
- elif sla_action == "monitor":
- LOG.warning("SLA validation failed: %s", assertion.args)
- errors.append(assertion.args)
- elif sla_action == "rate-control":
- try:
- step_cfg['options']['rate']
- except KeyError:
- step_cfg.setdefault('options', {})
- step_cfg['options']['rate'] = 100
-
- step_cfg['options']['rate'] -= delta
- sequence = 1
- continue
- except VnftestException:
- errors.append(traceback.format_exc())
- LOG.exception("")
- LOG.info("Abort the task")
- fatal_error = True
-
- except Exception:
- errors.append(traceback.format_exc())
- LOG.exception("")
-
- time.sleep(interval)
-
- step_results = {
- 'timestamp': time.time(),
- 'sequence': sequence,
- 'data': results,
- 'errors': errors
- }
-
- result_queue.put(step_results, True, QUEUE_PUT_TIMEOUT)
-
- LOG.debug("runner=%(runner)s seq=%(sequence)s END",
- {"runner": runner_cfg["runner_id"],
- "sequence": sequence})
-
- sequence += 1
-
- if (errors and sla_action is None) or \
- (sequence > iterations or aborted.is_set()) or fatal_error:
- LOG.info("worker END")
- break
- if "teardown" in run_step:
- try:
- step.teardown()
- except Exception:
- # catch any exception in teardown and convert to simple exception
- # never pass exceptions back to multiprocessing, because some exceptions can
- # be unpicklable
- # https://bugs.python.org/issue9400
- LOG.exception("")
- raise SystemExit(1)
-
- LOG.debug("queue.qsize() = %s", result_queue.qsize())
- LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
- if fatal_error:
- raise SystemExit(1)
-
-
-class IterationRunner(base.Runner):
- """Run a step for a configurable number of times
-
-If the step ends before the time has elapsed, it will be started again.
-
- Parameters
- iterations - amount of times the step will be run for
- type: int
- unit: na
- default: 1
- interval - time to wait between each step invocation
- type: int
- unit: seconds
- default: 1 sec
- """
- __execution_type__ = 'Iteration'
-
- def _run_step(self, cls, method, step_cfg, context_cfg, input_params):
- name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
- self.process = multiprocessing.Process(
- name=name,
- target=_worker_process,
- args=(self.result_queue, cls, method, step_cfg,
- context_cfg, input_params, self.aborted, self.output_queue))
- self.process.start()
diff --git a/vnftest/onap/runners/search.py b/vnftest/onap/runners/search.py
deleted file mode 100644
index d5bd417..0000000
--- a/vnftest/onap/runners/search.py
+++ /dev/null
@@ -1,180 +0,0 @@
-##############################################################################
-# Copyright 2018 EuropeanSoftwareMarketingLtd.
-# ===================================================================
-# Licensed under the ApacheLicense, Version2.0 (the"License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# software distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and limitations under
-# the License
-##############################################################################
-# vnftest comment: this is a modified copy of
-# rally/rally/benchmark/runners/search.py
-
-"""A runner that runs a specific time before it returns
-"""
-
-from __future__ import absolute_import
-
-import logging
-import multiprocessing
-import time
-import traceback
-from contextlib import contextmanager
-from itertools import takewhile
-
-import os
-from collections import Mapping
-from six.moves import zip
-
-from vnftest.onap.runners import base
-
-LOG = logging.getLogger(__name__)
-
-
-class SearchRunnerHelper(object):
-
- def __init__(self, cls, method_name, step_cfg, context_cfg, aborted):
- super(SearchRunnerHelper, self).__init__()
- self.cls = cls
- self.method_name = method_name
- self.step_cfg = step_cfg
- self.context_cfg = context_cfg
- self.aborted = aborted
- self.runner_cfg = step_cfg['runner']
- self.run_step = self.runner_cfg.get("run_step", "setup,run,teardown")
- self.timeout = self.runner_cfg.get("timeout", 60)
- self.interval = self.runner_cfg.get("interval", 1)
- self.step = None
- self.method = None
-
- def __call__(self, *args, **kwargs):
- if self.method is None:
- raise RuntimeError
- return self.method(*args, **kwargs)
-
- @contextmanager
- def get_step_instance(self):
- self.step = self.cls(self.step_cfg, self.context_cfg)
-
- if 'setup' in self.run_step:
- self.step.setup()
-
- self.method = getattr(self.step, self.method_name)
- LOG.info("worker START, timeout %d sec, class %s", self.timeout, self.cls)
- try:
- yield self
- finally:
- if 'teardown' in self.run_step:
- self.step.teardown()
-
- def is_not_done(self):
- if 'run' not in self.run_step:
- raise StopIteration
-
- max_time = time.time() + self.timeout
-
- abort_iter = iter(self.aborted.is_set, True)
- time_iter = takewhile(lambda t_now: t_now <= max_time, iter(time.time, -1))
-
- for seq, _ in enumerate(zip(abort_iter, time_iter), 1):
- yield seq
- time.sleep(self.interval)
-
-
-class SearchRunner(base.Runner):
- """Run a step for a certain amount of time
-
-If the step ends before the time has elapsed, it will be started again.
-
- Parameters
- timeout - amount of time the step will be run for
- type: int
- unit: seconds
- default: 1 sec
- interval - time to wait between each step invocation
- type: int
- unit: seconds
- default: 1 sec
- """
- __execution_type__ = 'Search'
-
- def __init__(self, config):
- super(SearchRunner, self).__init__(config)
- self.runner_cfg = None
- self.runner_id = None
- self.sla_action = None
- self.worker_helper = None
-
- def _worker_run_once(self, sequence):
- LOG.debug("runner=%s seq=%s START", self.runner_id, sequence)
-
- data = {}
- errors = ""
-
- try:
- self.worker_helper(data)
- except AssertionError as assertion:
- # SLA validation failed in step, determine what to do now
- if self.sla_action == "assert":
- raise
- elif self.sla_action == "monitor":
- LOG.warning("SLA validation failed: %s", assertion.args)
- errors = assertion.args
- except Exception as e:
- errors = traceback.format_exc()
- LOG.exception(e)
-
- record = {
- 'runner_id': self.runner_id,
- 'step': {
- 'timestamp': time.time(),
- 'sequence': sequence,
- 'data': data,
- 'errors': errors,
- },
- }
-
- self.result_queue.put(record)
-
- LOG.debug("runner=%s seq=%s END", self.runner_id, sequence)
-
- # Have to search through all the VNF KPIs
- kpi_done = any(kpi.get('done') for kpi in data.values() if isinstance(kpi, Mapping))
-
- return kpi_done or (errors and self.sla_action is None)
-
- def _worker_run(self, cls, method_name, step_cfg, context_cfg):
- self.runner_cfg = step_cfg['runner']
- self.runner_id = self.runner_cfg['runner_id'] = os.getpid()
-
- self.worker_helper = SearchRunnerHelper(cls, method_name, step_cfg,
- context_cfg, self.aborted)
-
- try:
- self.sla_action = step_cfg['sla'].get('action', 'assert')
- except KeyError:
- self.sla_action = None
-
- self.result_queue.put({
- 'runner_id': self.runner_id,
- 'step_cfg': step_cfg,
- 'context_cfg': context_cfg
- })
-
- with self.worker_helper.get_step_instance():
- for sequence in self.worker_helper.is_not_done():
- if self._worker_run_once(sequence):
- LOG.info("worker END")
- break
-
- def _run_step(self, cls, method, step_cfg, context_cfg):
- name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
- self.process = multiprocessing.Process(
- name=name,
- target=self._worker_run,
- args=(cls, method, step_cfg, context_cfg))
- self.process.start()
diff --git a/vnftest/onap/runners/sequence.py b/vnftest/onap/runners/sequence.py
deleted file mode 100644
index b341495..0000000
--- a/vnftest/onap/runners/sequence.py
+++ /dev/null
@@ -1,149 +0,0 @@
-##############################################################################
-# Copyright 2018 EuropeanSoftwareMarketingLtd.
-# ===================================================================
-# Licensed under the ApacheLicense, Version2.0 (the"License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# software distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and limitations under
-# the License
-##############################################################################
-# vnftest comment: this is a modified copy of
-# rally/rally/benchmark/runners/sequence.py
-
-"""A runner that every run changes a specified input value to the step.
-The input value in the sequence is specified in a list in the input file.
-"""
-
-from __future__ import absolute_import
-
-import logging
-import multiprocessing
-import time
-import traceback
-
-import os
-
-from vnftest.onap.runners import base
-
-LOG = logging.getLogger(__name__)
-
-
-def _worker_process(queue, cls, method_name, step_cfg,
- context_cfg, aborted, output_queue):
-
- sequence = 1
-
- runner_cfg = step_cfg['runner']
-
- interval = runner_cfg.get("interval", 1)
- arg_name = runner_cfg.get('step_option_name')
- sequence_values = runner_cfg.get('sequence')
-
- if 'options' not in step_cfg:
- step_cfg['options'] = {}
-
- options = step_cfg['options']
-
- runner_cfg['runner_id'] = os.getpid()
-
- LOG.info("worker START, sequence_values(%s, %s), class %s",
- arg_name, sequence_values, cls)
-
- step = cls(step_cfg, context_cfg)
- step.setup()
- method = getattr(step, method_name)
-
- sla_action = None
- if "sla" in step_cfg:
- sla_action = step_cfg["sla"].get("action", "assert")
-
- for value in sequence_values:
- options[arg_name] = value
-
- LOG.debug("runner=%(runner)s seq=%(sequence)s START",
- {"runner": runner_cfg["runner_id"], "sequence": sequence})
-
- data = {}
- errors = ""
-
- try:
- result = method(data)
- except AssertionError as assertion:
- # SLA validation failed in step, determine what to do now
- if sla_action == "assert":
- raise
- elif sla_action == "monitor":
- LOG.warning("SLA validation failed: %s", assertion.args)
- errors = assertion.args
- except Exception as e:
- errors = traceback.format_exc()
- LOG.exception(e)
- else:
- if result:
- output_queue.put(result)
-
- time.sleep(interval)
-
- step_output = {
- 'timestamp': time.time(),
- 'sequence': sequence,
- 'data': data,
- 'errors': errors
- }
-
- queue.put(step_output)
-
- LOG.debug("runner=%(runner)s seq=%(sequence)s END",
- {"runner": runner_cfg["runner_id"], "sequence": sequence})
-
- sequence += 1
-
- if (errors and sla_action is None) or aborted.is_set():
- break
-
- try:
- step.teardown()
- except Exception:
- # catch any exception in teardown and convert to simple exception
- # never pass exceptions back to multiprocessing, because some exceptions can
- # be unpicklable
- # https://bugs.python.org/issue9400
- LOG.exception("")
- raise SystemExit(1)
- LOG.info("worker END")
- LOG.debug("queue.qsize() = %s", queue.qsize())
- LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
-
-
-class SequenceRunner(base.Runner):
- """Run a step by changing an input value defined in a list
-
- Parameters
- interval - time to wait between each step invocation
- type: int
- unit: seconds
- default: 1 sec
- step_option_name - name of the option that is increased each invocation
- type: string
- unit: na
- default: none
- sequence - list of values which are executed in their respective steps
- type: [int]
- unit: na
- default: none
- """
-
- __execution_type__ = 'Sequence'
-
- def _run_step(self, cls, method, step_cfg, context_cfg):
- name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
- self.process = multiprocessing.Process(
- name=name,
- target=_worker_process,
- args=(self.result_queue, cls, method, step_cfg,
- context_cfg, self.aborted, self.output_queue))
- self.process.start()