summaryrefslogtreecommitdiffstats
path: root/vnftest/onap/runners
diff options
context:
space:
mode:
Diffstat (limited to 'vnftest/onap/runners')
-rw-r--r--vnftest/onap/runners/__init__.py0
-rwxr-xr-xvnftest/onap/runners/base.py250
-rw-r--r--vnftest/onap/runners/duration.py145
-rwxr-xr-xvnftest/onap/runners/dynamictp.py179
-rw-r--r--vnftest/onap/runners/iteration.py160
-rw-r--r--vnftest/onap/runners/search.py180
-rw-r--r--vnftest/onap/runners/sequence.py149
7 files changed, 1063 insertions, 0 deletions
diff --git a/vnftest/onap/runners/__init__.py b/vnftest/onap/runners/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/vnftest/onap/runners/__init__.py
diff --git a/vnftest/onap/runners/base.py b/vnftest/onap/runners/base.py
new file mode 100755
index 0000000..5170bbe
--- /dev/null
+++ b/vnftest/onap/runners/base.py
@@ -0,0 +1,250 @@
+##############################################################################
+# Copyright 2018 EuropeanSoftwareMarketingLtd.
+# ===================================================================
+# Licensed under the ApacheLicense, Version2.0 (the"License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# software distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under
+# the License
+##############################################################################
+# vnftest comment: this is a modified copy of
+# rally/rally/benchmark/runners/base.py
+
+from __future__ import absolute_import
+import logging
+import multiprocessing
+import subprocess
+import time
+import traceback
+import importlib
+from Queue import Empty
+
+import vnftest.common.utils as utils
+from vnftest.onap.steps import base as base_step
+from vnftest.onap.steps.onap_api_call import OnapApiCall
+
+log = logging.getLogger(__name__)
+
+
+def _execute_shell_command(command):
+ """execute shell script with error handling"""
+ exitcode = 0
+ try:
+ output = subprocess.check_output(command, shell=True)
+ except Exception:
+ exitcode = -1
+ output = traceback.format_exc()
+ log.error("exec command '%s' error:\n ", command)
+ log.error(traceback.format_exc())
+
+ return exitcode, output
+
+
+def _single_action(seconds, command, queue):
+ """entrypoint for the single action process"""
+ log.debug("single action, fires after %d seconds (from now)", seconds)
+ time.sleep(seconds)
+ log.debug("single action: executing command: '%s'", command)
+ ret_code, data = _execute_shell_command(command)
+ if ret_code < 0:
+ log.error("single action error! command:%s", command)
+ queue.put({'single-action-data': data})
+ return
+ log.debug("single action data: \n%s", data)
+ queue.put({'single-action-data': data})
+
+
+def _periodic_action(interval, command, queue):
+ """entrypoint for the periodic action process"""
+ log.debug("periodic action, fires every: %d seconds", interval)
+ time_spent = 0
+ while True:
+ time.sleep(interval)
+ time_spent += interval
+ log.debug("periodic action, executing command: '%s'", command)
+ ret_code, data = _execute_shell_command(command)
+ if ret_code < 0:
+ log.error("periodic action error! command:%s", command)
+ queue.put({'periodic-action-data': data})
+ break
+ log.debug("periodic action data: \n%s", data)
+ queue.put({'periodic-action-data': data})
+
+
+class Runner(object):
+ runners = []
+
+ @staticmethod
+ def get_cls(runner_type):
+ """return class of specified type"""
+ for runner in utils.itersubclasses(Runner):
+ if runner_type == runner.__execution_type__:
+ return runner
+ raise RuntimeError("No such runner_type %s" % runner_type)
+
+ @staticmethod
+ def get_types():
+ """return a list of known runner type (class) names"""
+ types = []
+ for runner in utils.itersubclasses(Runner):
+ types.append(runner)
+ return types
+
+ @staticmethod
+ def get(runner_cfg):
+ """Returns instance of a step runner for execution type.
+ """
+ return Runner.get_cls(runner_cfg["type"])(runner_cfg)
+
+ @staticmethod
+ def release(runner):
+ """Release the runner"""
+ if runner in Runner.runners:
+ Runner.runners.remove(runner)
+
+ @staticmethod
+ def terminate(runner):
+ """Terminate the runner"""
+ if runner.process and runner.process.is_alive():
+ runner.process.terminate()
+
+ @staticmethod
+ def terminate_all():
+ """Terminate all runners (subprocesses)"""
+ log.debug("Terminating all runners", exc_info=True)
+
+ # release dumper process as some errors before any runner is created
+ if not Runner.runners:
+ return
+
+ for runner in Runner.runners:
+ log.debug("Terminating runner: %s", runner)
+ if runner.process:
+ runner.process.terminate()
+ runner.process.join()
+ if runner.periodic_action_process:
+ log.debug("Terminating periodic action process")
+ runner.periodic_action_process.terminate()
+ runner.periodic_action_process = None
+ Runner.release(runner)
+
+ def __init__(self, config):
+ self.config = config
+ self.periodic_action_process = None
+ self.output_queue = multiprocessing.Queue()
+ self.result_queue = multiprocessing.Queue()
+ self.process = None
+ self.aborted = multiprocessing.Event()
+ Runner.runners.append(self)
+
+ def run_post_stop_action(self):
+ """run a potentially configured post-stop action"""
+ if "post-stop-action" in self.config:
+ command = self.config["post-stop-action"]["command"]
+ log.debug("post stop action: command: '%s'", command)
+ ret_code, data = _execute_shell_command(command)
+ if ret_code < 0:
+ log.error("post action error! command:%s", command)
+ self.result_queue.put({'post-stop-action-data': data})
+ return
+ log.debug("post-stop data: \n%s", data)
+ self.result_queue.put({'post-stop-action-data': data})
+
+ def _run_step(self, cls, method_name, step_cfg, context_cfg):
+ raise NotImplementedError
+
+ def run(self, step_cfg, context_cfg):
+ step_type = step_cfg["type"]
+ class_name = base_step.Step.get(step_type)
+ path_split = class_name.split(".")
+ module_path = ".".join(path_split[:-1])
+ module = importlib.import_module(module_path)
+ cls = getattr(module, path_split[-1])
+
+ self.config['object'] = class_name
+ self.aborted.clear()
+
+ # run a potentially configured pre-start action
+ if "pre-start-action" in self.config:
+ command = self.config["pre-start-action"]["command"]
+ log.debug("pre start action: command: '%s'", command)
+ ret_code, data = _execute_shell_command(command)
+ if ret_code < 0:
+ log.error("pre-start action error! command:%s", command)
+ self.result_queue.put({'pre-start-action-data': data})
+ return
+ log.debug("pre-start data: \n%s", data)
+ self.result_queue.put({'pre-start-action-data': data})
+
+ if "single-shot-action" in self.config:
+ single_action_process = multiprocessing.Process(
+ target=_single_action,
+ name="single-shot-action",
+ args=(self.config["single-shot-action"]["after"],
+ self.config["single-shot-action"]["command"],
+ self.result_queue))
+ single_action_process.start()
+
+ if "periodic-action" in self.config:
+ self.periodic_action_process = multiprocessing.Process(
+ target=_periodic_action,
+ name="periodic-action",
+ args=(self.config["periodic-action"]["interval"],
+ self.config["periodic-action"]["command"],
+ self.result_queue))
+ self.periodic_action_process.start()
+
+ self._run_step(cls, "run", step_cfg, context_cfg)
+
+ def abort(self):
+ """Abort the execution of a step"""
+ self.aborted.set()
+
+ QUEUE_JOIN_INTERVAL = 5
+
+ def poll(self, timeout=QUEUE_JOIN_INTERVAL):
+ self.process.join(timeout)
+ return self.process.exitcode
+
+ def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+ while self.process.exitcode is None:
+ # drain the queue while we are running otherwise we won't terminate
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+ self.process.join(interval)
+ # drain after the process has exited
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+
+ self.process.terminate()
+ if self.periodic_action_process:
+ self.periodic_action_process.join(1)
+ self.periodic_action_process.terminate()
+ self.periodic_action_process = None
+
+ self.run_post_stop_action()
+ return self.process.exitcode
+
+ def get_output(self):
+ result = {}
+ while not self.output_queue.empty():
+ log.debug("output_queue size %s", self.output_queue.qsize())
+ try:
+ result.update(self.output_queue.get(True, 1))
+ except Empty:
+ pass
+ return result
+
+ def get_result(self):
+ result = []
+ while not self.result_queue.empty():
+ log.debug("result_queue size %s", self.result_queue.qsize())
+ try:
+ result.append(self.result_queue.get(True, 1))
+ except Empty:
+ pass
+ return result
diff --git a/vnftest/onap/runners/duration.py b/vnftest/onap/runners/duration.py
new file mode 100644
index 0000000..7e539e5
--- /dev/null
+++ b/vnftest/onap/runners/duration.py
@@ -0,0 +1,145 @@
+##############################################################################
+# Copyright 2018 EuropeanSoftwareMarketingLtd.
+# ===================================================================
+# Licensed under the ApacheLicense, Version2.0 (the"License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# software distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under
+# the License
+##############################################################################
+# vnftest comment: this is a modified copy of
+# rally/rally/benchmark/runners/duration.py
+
+"""A runner that runs a specific time before it returns
+"""
+
+from __future__ import absolute_import
+import os
+import multiprocessing
+import logging
+import traceback
+import time
+
+from vnftest.onap.runners import base
+
+LOG = logging.getLogger(__name__)
+
+
+QUEUE_PUT_TIMEOUT = 10
+
+
+def _worker_process(queue, cls, method_name, step_cfg,
+ context_cfg, aborted, output_queue):
+
+ sequence = 1
+
+ runner_cfg = step_cfg['runner']
+
+ interval = runner_cfg.get("interval", 1)
+ duration = runner_cfg.get("duration", 60)
+ LOG.info("Worker START, duration is %ds", duration)
+ LOG.debug("class is %s", cls)
+
+ runner_cfg['runner_id'] = os.getpid()
+
+ step = cls(step_cfg, context_cfg)
+ step.setup()
+ method = getattr(step, method_name)
+
+ sla_action = None
+ if "sla" in step_cfg:
+ sla_action = step_cfg["sla"].get("action", "assert")
+
+ start = time.time()
+ timeout = start + duration
+ while True:
+
+ LOG.debug("runner=%(runner)s seq=%(sequence)s START",
+ {"runner": runner_cfg["runner_id"], "sequence": sequence})
+
+ data = {}
+ errors = ""
+
+ try:
+ result = method(data)
+ except AssertionError as assertion:
+ # SLA validation failed in scenario, determine what to do now
+ if sla_action == "assert":
+ raise
+ elif sla_action == "monitor":
+ LOG.warning("SLA validation failed: %s", assertion.args)
+ errors = assertion.args
+ # catch all exceptions because with multiprocessing we can have un-picklable exception
+ # problems https://bugs.python.org/issue9400
+ except Exception:
+ errors = traceback.format_exc()
+ LOG.exception("")
+ else:
+ if result:
+ # add timeout for put so we don't block test
+ # if we do timeout we don't care about dropping individual KPIs
+ output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
+
+ time.sleep(interval)
+
+ step_output = {
+ 'timestamp': time.time(),
+ 'sequence': sequence,
+ 'data': data,
+ 'errors': errors
+ }
+
+ queue.put(step_output, True, QUEUE_PUT_TIMEOUT)
+
+ LOG.debug("runner=%(runner)s seq=%(sequence)s END",
+ {"runner": runner_cfg["runner_id"], "sequence": sequence})
+
+ sequence += 1
+
+ if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
+ LOG.info("Worker END")
+ break
+
+ try:
+ step.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
+
+
+class DurationRunner(base.Runner):
+ """Run a scenario for a certain amount of time
+
+If the scenario ends before the time has elapsed, it will be started again.
+
+ Parameters
+ duration - amount of time the scenario will be run for
+ type: int
+ unit: seconds
+ default: 1 sec
+ interval - time to wait between each scenario invocation
+ type: int
+ unit: seconds
+ default: 1 sec
+ """
+ __execution_type__ = 'Duration'
+
+ def _run_step(self, cls, method, step_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
+ self.process = multiprocessing.Process(
+ name=name,
+ target=_worker_process,
+ args=(self.result_queue, cls, method, step_cfg,
+ context_cfg, self.aborted, self.output_queue))
+ self.process.start()
diff --git a/vnftest/onap/runners/dynamictp.py b/vnftest/onap/runners/dynamictp.py
new file mode 100755
index 0000000..5ea0910
--- /dev/null
+++ b/vnftest/onap/runners/dynamictp.py
@@ -0,0 +1,179 @@
+##############################################################################
+# Copyright 2018 EuropeanSoftwareMarketingLtd.
+# ===================================================================
+# Licensed under the ApacheLicense, Version2.0 (the"License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# software distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under
+# the License
+##############################################################################
+# vnftest comment: this is a modified copy of
+# rally/rally/step/runners/dynamictp.py
+
+"""A runner that searches for the max throughput with binary search
+"""
+
+import logging
+import multiprocessing
+import time
+import traceback
+
+import os
+
+from vnftest.onap.runners import base
+
+LOG = logging.getLogger(__name__)
+
+
+def _worker_process(queue, cls, method_name, step_cfg,
+ context_cfg, aborted): # pragma: no cover
+
+ runner_cfg = step_cfg['runner']
+ iterations = runner_cfg.get("iterations", 1)
+ interval = runner_cfg.get("interval", 1)
+ run_step = runner_cfg.get("run_step", "setup,run,teardown")
+ delta = runner_cfg.get("delta", 1000)
+ options_cfg = step_cfg['options']
+ initial_rate = options_cfg.get("pps", 1000000)
+ LOG.info("worker START, class %s", cls)
+
+ runner_cfg['runner_id'] = os.getpid()
+
+ step = cls(step_cfg, context_cfg)
+ if "setup" in run_step:
+ step.setup()
+
+ method = getattr(step, method_name)
+
+ queue.put({'runner_id': runner_cfg['runner_id'],
+ 'step_cfg': step_cfg,
+ 'context_cfg': context_cfg})
+
+ if "run" in run_step:
+ iterator = 0
+ search_max = initial_rate
+ search_min = 0
+ while iterator < iterations:
+ search_min = int(search_min / 2)
+ step_cfg['options']['pps'] = search_max
+ search_max_found = False
+ max_throuput_found = False
+ sequence = 0
+
+ last_min_data = {'packets_per_second': 0}
+
+ while True:
+ sequence += 1
+
+ data = {}
+ errors = ""
+ too_high = False
+
+ LOG.debug("sequence: %s search_min: %s search_max: %s",
+ sequence, search_min, search_max)
+
+ try:
+ method(data)
+ except AssertionError as assertion:
+ LOG.warning("SLA validation failed: %s" % assertion.args)
+ too_high = True
+ except Exception as e:
+ errors = traceback.format_exc()
+ LOG.exception(e)
+
+ actual_pps = data['packets_per_second']
+
+ if too_high:
+ search_max = actual_pps
+
+ if not search_max_found:
+ search_max_found = True
+ else:
+ last_min_data = data
+ search_min = actual_pps
+
+ # Check if the actual rate is well below the asked rate
+ if step_cfg['options']['pps'] > actual_pps * 1.5:
+ search_max = actual_pps
+ LOG.debug("Sender reached max tput: %s", search_max)
+ elif not search_max_found:
+ search_max = int(actual_pps * 1.5)
+
+ if ((search_max - search_min) < delta) or \
+ (search_max <= search_min) or (10 <= sequence):
+ if last_min_data['packets_per_second'] > 0:
+ data = last_min_data
+
+ step_output = {
+ 'timestamp': time.time(),
+ 'sequence': sequence,
+ 'data': data,
+ 'errors': errors
+ }
+
+ record = {
+ 'runner_id': runner_cfg['runner_id'],
+ 'step': step_output
+ }
+
+ queue.put(record)
+ max_throuput_found = True
+
+ if errors or aborted.is_set() or max_throuput_found:
+ LOG.info("worker END")
+ break
+
+ if not search_max_found:
+ step_cfg['options']['pps'] = search_max
+ else:
+ step_cfg['options']['pps'] = \
+ (search_max - search_min) / 2 + search_min
+
+ time.sleep(interval)
+
+ iterator += 1
+ LOG.debug("iterator: %s iterations: %s", iterator, iterations)
+
+ if "teardown" in run_step:
+ try:
+ step.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+
+
+class IterationRunner(base.Runner):
+ """Run a step to find the max throughput
+
+If the step ends before the time has elapsed, it will be started again.
+
+ Parameters
+ interval - time to wait between each step invocation
+ type: int
+ unit: seconds
+ default: 1 sec
+ delta - stop condition for the search.
+ type: int
+ unit: pps
+ default: 1000 pps
+ """
+ __execution_type__ = 'Dynamictp'
+
+ def _run_step(self, cls, method, step_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
+ self.process = multiprocessing.Process(
+ name=name,
+ target=_worker_process,
+ args=(self.result_queue, cls, method, step_cfg,
+ context_cfg, self.aborted))
+ self.process.start()
diff --git a/vnftest/onap/runners/iteration.py b/vnftest/onap/runners/iteration.py
new file mode 100644
index 0000000..9bac92e
--- /dev/null
+++ b/vnftest/onap/runners/iteration.py
@@ -0,0 +1,160 @@
+##############################################################################
+# Copyright 2018 EuropeanSoftwareMarketingLtd.
+# ===================================================================
+# Licensed under the ApacheLicense, Version2.0 (the"License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# software distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under
+# the License
+##############################################################################
+# vnftest comment: this is a modified copy of
+# rally/rally/benchmark/runners/iteration.py
+
+"""A runner that runs a configurable number of times before it returns
+"""
+
+from __future__ import absolute_import
+
+import logging
+import multiprocessing
+import time
+import traceback
+
+import os
+
+from vnftest.onap.runners import base
+
+LOG = logging.getLogger(__name__)
+
+
+QUEUE_PUT_TIMEOUT = 10
+
+
+def _worker_process(queue, cls, method_name, step_cfg,
+ context_cfg, aborted, output_queue):
+
+ sequence = 1
+
+ runner_cfg = step_cfg['runner']
+
+ interval = runner_cfg.get("interval", 1)
+ iterations = runner_cfg.get("iterations", 1)
+ run_step = runner_cfg.get("run_step", "setup,run,teardown")
+
+ delta = runner_cfg.get("delta", 2)
+ LOG.info("worker START, iterations %d times, class %s", iterations, cls)
+
+ runner_cfg['runner_id'] = os.getpid()
+
+ step = cls(step_cfg, context_cfg)
+ if "setup" in run_step:
+ step.setup()
+
+ method = getattr(step, method_name)
+
+ sla_action = None
+ if "sla" in step_cfg:
+ sla_action = step_cfg["sla"].get("action", "assert")
+ if "run" in run_step:
+ while True:
+
+ LOG.debug("runner=%(runner)s seq=%(sequence)s START",
+ {"runner": runner_cfg["runner_id"],
+ "sequence": sequence})
+
+ data = {}
+ errors = ""
+
+ try:
+ result = method(data)
+ except AssertionError as assertion:
+ # SLA validation failed in step, determine what to do now
+ if sla_action == "assert":
+ raise
+ elif sla_action == "monitor":
+ LOG.warning("SLA validation failed: %s", assertion.args)
+ errors = assertion.args
+ elif sla_action == "rate-control":
+ try:
+ step_cfg['options']['rate']
+ except KeyError:
+ step_cfg.setdefault('options', {})
+ step_cfg['options']['rate'] = 100
+
+ step_cfg['options']['rate'] -= delta
+ sequence = 1
+ continue
+ except Exception:
+ errors = traceback.format_exc()
+ LOG.exception("")
+ else:
+ if result:
+ # add timeout for put so we don't block test
+ # if we do timeout we don't care about dropping individual KPIs
+ output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
+
+ time.sleep(interval)
+
+ step_output = {
+ 'timestamp': time.time(),
+ 'sequence': sequence,
+ 'data': data,
+ 'errors': errors
+ }
+
+ queue.put(step_output, True, QUEUE_PUT_TIMEOUT)
+
+ LOG.debug("runner=%(runner)s seq=%(sequence)s END",
+ {"runner": runner_cfg["runner_id"],
+ "sequence": sequence})
+
+ sequence += 1
+
+ if (errors and sla_action is None) or \
+ (sequence > iterations or aborted.is_set()):
+ LOG.info("worker END")
+ break
+ if "teardown" in run_step:
+ try:
+ step.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
+
+
+class IterationRunner(base.Runner):
+ """Run a step for a configurable number of times
+
+If the step ends before the time has elapsed, it will be started again.
+
+ Parameters
+ iterations - amount of times the step will be run for
+ type: int
+ unit: na
+ default: 1
+ interval - time to wait between each step invocation
+ type: int
+ unit: seconds
+ default: 1 sec
+ """
+ __execution_type__ = 'Iteration'
+
+ def _run_step(self, cls, method, step_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
+ self.process = multiprocessing.Process(
+ name=name,
+ target=_worker_process,
+ args=(self.result_queue, cls, method, step_cfg,
+ context_cfg, self.aborted, self.output_queue))
+ self.process.start()
diff --git a/vnftest/onap/runners/search.py b/vnftest/onap/runners/search.py
new file mode 100644
index 0000000..d5bd417
--- /dev/null
+++ b/vnftest/onap/runners/search.py
@@ -0,0 +1,180 @@
+##############################################################################
+# Copyright 2018 EuropeanSoftwareMarketingLtd.
+# ===================================================================
+# Licensed under the ApacheLicense, Version2.0 (the"License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# software distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under
+# the License
+##############################################################################
+# vnftest comment: this is a modified copy of
+# rally/rally/benchmark/runners/search.py
+
+"""A runner that runs a specific time before it returns
+"""
+
+from __future__ import absolute_import
+
+import logging
+import multiprocessing
+import time
+import traceback
+from contextlib import contextmanager
+from itertools import takewhile
+
+import os
+from collections import Mapping
+from six.moves import zip
+
+from vnftest.onap.runners import base
+
+LOG = logging.getLogger(__name__)
+
+
+class SearchRunnerHelper(object):
+
+ def __init__(self, cls, method_name, step_cfg, context_cfg, aborted):
+ super(SearchRunnerHelper, self).__init__()
+ self.cls = cls
+ self.method_name = method_name
+ self.step_cfg = step_cfg
+ self.context_cfg = context_cfg
+ self.aborted = aborted
+ self.runner_cfg = step_cfg['runner']
+ self.run_step = self.runner_cfg.get("run_step", "setup,run,teardown")
+ self.timeout = self.runner_cfg.get("timeout", 60)
+ self.interval = self.runner_cfg.get("interval", 1)
+ self.step = None
+ self.method = None
+
+ def __call__(self, *args, **kwargs):
+ if self.method is None:
+ raise RuntimeError
+ return self.method(*args, **kwargs)
+
+ @contextmanager
+ def get_step_instance(self):
+ self.step = self.cls(self.step_cfg, self.context_cfg)
+
+ if 'setup' in self.run_step:
+ self.step.setup()
+
+ self.method = getattr(self.step, self.method_name)
+ LOG.info("worker START, timeout %d sec, class %s", self.timeout, self.cls)
+ try:
+ yield self
+ finally:
+ if 'teardown' in self.run_step:
+ self.step.teardown()
+
+ def is_not_done(self):
+ if 'run' not in self.run_step:
+ raise StopIteration
+
+ max_time = time.time() + self.timeout
+
+ abort_iter = iter(self.aborted.is_set, True)
+ time_iter = takewhile(lambda t_now: t_now <= max_time, iter(time.time, -1))
+
+ for seq, _ in enumerate(zip(abort_iter, time_iter), 1):
+ yield seq
+ time.sleep(self.interval)
+
+
+class SearchRunner(base.Runner):
+ """Run a step for a certain amount of time
+
+If the step ends before the time has elapsed, it will be started again.
+
+ Parameters
+ timeout - amount of time the step will be run for
+ type: int
+ unit: seconds
+ default: 1 sec
+ interval - time to wait between each step invocation
+ type: int
+ unit: seconds
+ default: 1 sec
+ """
+ __execution_type__ = 'Search'
+
+ def __init__(self, config):
+ super(SearchRunner, self).__init__(config)
+ self.runner_cfg = None
+ self.runner_id = None
+ self.sla_action = None
+ self.worker_helper = None
+
+ def _worker_run_once(self, sequence):
+ LOG.debug("runner=%s seq=%s START", self.runner_id, sequence)
+
+ data = {}
+ errors = ""
+
+ try:
+ self.worker_helper(data)
+ except AssertionError as assertion:
+ # SLA validation failed in step, determine what to do now
+ if self.sla_action == "assert":
+ raise
+ elif self.sla_action == "monitor":
+ LOG.warning("SLA validation failed: %s", assertion.args)
+ errors = assertion.args
+ except Exception as e:
+ errors = traceback.format_exc()
+ LOG.exception(e)
+
+ record = {
+ 'runner_id': self.runner_id,
+ 'step': {
+ 'timestamp': time.time(),
+ 'sequence': sequence,
+ 'data': data,
+ 'errors': errors,
+ },
+ }
+
+ self.result_queue.put(record)
+
+ LOG.debug("runner=%s seq=%s END", self.runner_id, sequence)
+
+ # Have to search through all the VNF KPIs
+ kpi_done = any(kpi.get('done') for kpi in data.values() if isinstance(kpi, Mapping))
+
+ return kpi_done or (errors and self.sla_action is None)
+
+ def _worker_run(self, cls, method_name, step_cfg, context_cfg):
+ self.runner_cfg = step_cfg['runner']
+ self.runner_id = self.runner_cfg['runner_id'] = os.getpid()
+
+ self.worker_helper = SearchRunnerHelper(cls, method_name, step_cfg,
+ context_cfg, self.aborted)
+
+ try:
+ self.sla_action = step_cfg['sla'].get('action', 'assert')
+ except KeyError:
+ self.sla_action = None
+
+ self.result_queue.put({
+ 'runner_id': self.runner_id,
+ 'step_cfg': step_cfg,
+ 'context_cfg': context_cfg
+ })
+
+ with self.worker_helper.get_step_instance():
+ for sequence in self.worker_helper.is_not_done():
+ if self._worker_run_once(sequence):
+ LOG.info("worker END")
+ break
+
+ def _run_step(self, cls, method, step_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
+ self.process = multiprocessing.Process(
+ name=name,
+ target=self._worker_run,
+ args=(cls, method, step_cfg, context_cfg))
+ self.process.start()
diff --git a/vnftest/onap/runners/sequence.py b/vnftest/onap/runners/sequence.py
new file mode 100644
index 0000000..b341495
--- /dev/null
+++ b/vnftest/onap/runners/sequence.py
@@ -0,0 +1,149 @@
+##############################################################################
+# Copyright 2018 EuropeanSoftwareMarketingLtd.
+# ===================================================================
+# Licensed under the ApacheLicense, Version2.0 (the"License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# software distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under
+# the License
+##############################################################################
+# vnftest comment: this is a modified copy of
+# rally/rally/benchmark/runners/sequence.py
+
+"""A runner that every run changes a specified input value to the step.
+The input value in the sequence is specified in a list in the input file.
+"""
+
+from __future__ import absolute_import
+
+import logging
+import multiprocessing
+import time
+import traceback
+
+import os
+
+from vnftest.onap.runners import base
+
+LOG = logging.getLogger(__name__)
+
+
+def _worker_process(queue, cls, method_name, step_cfg,
+ context_cfg, aborted, output_queue):
+
+ sequence = 1
+
+ runner_cfg = step_cfg['runner']
+
+ interval = runner_cfg.get("interval", 1)
+ arg_name = runner_cfg.get('step_option_name')
+ sequence_values = runner_cfg.get('sequence')
+
+ if 'options' not in step_cfg:
+ step_cfg['options'] = {}
+
+ options = step_cfg['options']
+
+ runner_cfg['runner_id'] = os.getpid()
+
+ LOG.info("worker START, sequence_values(%s, %s), class %s",
+ arg_name, sequence_values, cls)
+
+ step = cls(step_cfg, context_cfg)
+ step.setup()
+ method = getattr(step, method_name)
+
+ sla_action = None
+ if "sla" in step_cfg:
+ sla_action = step_cfg["sla"].get("action", "assert")
+
+ for value in sequence_values:
+ options[arg_name] = value
+
+ LOG.debug("runner=%(runner)s seq=%(sequence)s START",
+ {"runner": runner_cfg["runner_id"], "sequence": sequence})
+
+ data = {}
+ errors = ""
+
+ try:
+ result = method(data)
+ except AssertionError as assertion:
+ # SLA validation failed in step, determine what to do now
+ if sla_action == "assert":
+ raise
+ elif sla_action == "monitor":
+ LOG.warning("SLA validation failed: %s", assertion.args)
+ errors = assertion.args
+ except Exception as e:
+ errors = traceback.format_exc()
+ LOG.exception(e)
+ else:
+ if result:
+ output_queue.put(result)
+
+ time.sleep(interval)
+
+ step_output = {
+ 'timestamp': time.time(),
+ 'sequence': sequence,
+ 'data': data,
+ 'errors': errors
+ }
+
+ queue.put(step_output)
+
+ LOG.debug("runner=%(runner)s seq=%(sequence)s END",
+ {"runner": runner_cfg["runner_id"], "sequence": sequence})
+
+ sequence += 1
+
+ if (errors and sla_action is None) or aborted.is_set():
+ break
+
+ try:
+ step.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+ LOG.info("worker END")
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
+
+
+class SequenceRunner(base.Runner):
+ """Run a step by changing an input value defined in a list
+
+ Parameters
+ interval - time to wait between each step invocation
+ type: int
+ unit: seconds
+ default: 1 sec
+ step_option_name - name of the option that is increased each invocation
+ type: string
+ unit: na
+ default: none
+ sequence - list of values which are executed in their respective steps
+ type: [int]
+ unit: na
+ default: none
+ """
+
+ __execution_type__ = 'Sequence'
+
+ def _run_step(self, cls, method, step_cfg, context_cfg):
+ name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
+ self.process = multiprocessing.Process(
+ name=name,
+ target=_worker_process,
+ args=(self.result_queue, cls, method, step_cfg,
+ context_cfg, self.aborted, self.output_queue))
+ self.process.start()