summaryrefslogtreecommitdiffstats
path: root/vnftest/onap/runners/iteration.py
diff options
context:
space:
mode:
Diffstat (limited to 'vnftest/onap/runners/iteration.py')
-rw-r--r--vnftest/onap/runners/iteration.py38
1 files changed, 24 insertions, 14 deletions
diff --git a/vnftest/onap/runners/iteration.py b/vnftest/onap/runners/iteration.py
index c0bd74f..9c9ab2c 100644
--- a/vnftest/onap/runners/iteration.py
+++ b/vnftest/onap/runners/iteration.py
@@ -25,6 +25,7 @@ import time
import traceback
import os
+from vnftest.common.exceptions import VnftestException
from vnftest.onap.runners import base
@@ -34,8 +35,8 @@ LOG = logging.getLogger(__name__)
QUEUE_PUT_TIMEOUT = 10
-def _worker_process(queue, cls, method_name, step_cfg,
- context_cfg, aborted, output_queue):
+def _worker_process(result_queue, cls, method_name, step_cfg,
+ context_cfg, input_params, aborted, output_queue):
sequence = 1
@@ -50,7 +51,7 @@ def _worker_process(queue, cls, method_name, step_cfg,
runner_cfg['runner_id'] = os.getpid()
- step = cls(step_cfg, context_cfg)
+ step = cls(step_cfg, context_cfg, input_params)
if "setup" in run_step:
step.setup()
@@ -66,15 +67,16 @@ def _worker_process(queue, cls, method_name, step_cfg,
{"runner": runner_cfg["runner_id"],
"sequence": sequence})
- data = {}
+ results = {}
errors = []
+ fatal_error = False
try:
- result = method(data)
- if result:
+ output = method(results)
+ if output:
# add timeout for put so we don't block test
# if we do timeout we don't care about dropping individual KPIs
- output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
+ output_queue.put(output, True, QUEUE_PUT_TIMEOUT)
except AssertionError as assertion:
# SLA validation failed in step, determine what to do now
if sla_action == "assert":
@@ -92,20 +94,26 @@ def _worker_process(queue, cls, method_name, step_cfg,
step_cfg['options']['rate'] -= delta
sequence = 1
continue
+ except VnftestException:
+ errors.append(traceback.format_exc())
+ LOG.exception("")
+ LOG.info("Abort the task")
+ fatal_error = True
+
except Exception:
errors.append(traceback.format_exc())
LOG.exception("")
time.sleep(interval)
- step_output = {
+ step_results = {
'timestamp': time.time(),
'sequence': sequence,
- 'data': data,
+ 'data': results,
'errors': errors
}
- queue.put(step_output, True, QUEUE_PUT_TIMEOUT)
+ result_queue.put(step_results, True, QUEUE_PUT_TIMEOUT)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"],
@@ -114,7 +122,7 @@ def _worker_process(queue, cls, method_name, step_cfg,
sequence += 1
if (errors and sla_action is None) or \
- (sequence > iterations or aborted.is_set()):
+ (sequence > iterations or aborted.is_set()) or fatal_error:
LOG.info("worker END")
break
if "teardown" in run_step:
@@ -128,8 +136,10 @@ def _worker_process(queue, cls, method_name, step_cfg,
LOG.exception("")
raise SystemExit(1)
- LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("queue.qsize() = %s", result_queue.qsize())
LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
+ if fatal_error:
+ raise SystemExit(1)
class IterationRunner(base.Runner):
@@ -149,11 +159,11 @@ If the step ends before the time has elapsed, it will be started again.
"""
__execution_type__ = 'Iteration'
- def _run_step(self, cls, method, step_cfg, context_cfg):
+ def _run_step(self, cls, method, step_cfg, context_cfg, input_params):
name = "{}-{}-{}".format(self.__execution_type__, step_cfg.get("type"), os.getpid())
self.process = multiprocessing.Process(
name=name,
target=_worker_process,
args=(self.result_queue, cls, method, step_cfg,
- context_cfg, self.aborted, self.output_queue))
+ context_cfg, input_params, self.aborted, self.output_queue))
self.process.start()