summaryrefslogtreecommitdiffstats
path: root/vnftest/core/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'vnftest/core/task.py')
-rw-r--r--vnftest/core/task.py59
1 files changed, 28 insertions, 31 deletions
diff --git a/vnftest/core/task.py b/vnftest/core/task.py
index 91e3d3c..065ad7e 100644
--- a/vnftest/core/task.py
+++ b/vnftest/core/task.py
@@ -72,7 +72,6 @@ class Task(object): # pragma: no cover
output_config['DEFAULT']['dispatcher'] = out_types
def start(self, args, **kwargs):
- Context.initialize(args.vnfdescriptor, args.environment)
atexit.register(self.atexit_handler)
task_id = getattr(args, 'task_id')
@@ -102,15 +101,15 @@ class Task(object): # pragma: no cover
if args.suite:
# 1.parse suite, return suite_params info
- task_files, task_args, task_args_fnames = \
+ task_files, task_args_list, task_args_fnames = \
parser.parse_suite()
else:
task_files = [parser.path]
- task_args = [args.task_args]
+ task_args_list = [args.task_args]
task_args_fnames = [args.task_args_file]
- LOG.debug("task_files:%s, task_args:%s, task_args_fnames:%s",
- task_files, task_args, task_args_fnames)
+ LOG.debug("task_files:%s, task_args_list:%s, task_args_fnames:%s",
+ task_files, task_args_list, task_args_fnames)
if args.parse_only:
sys.exit(0)
@@ -121,10 +120,18 @@ class Task(object): # pragma: no cover
one_task_start_time = time.time()
# the output of the previous task is the input of the new task
inputs = copy.deepcopy(self.outputs)
+ task_args_file = task_args_fnames[i]
+ task_args = task_args_list[i]
+ try:
+ if task_args_file:
+ with open(task_args_file) as f:
+ inputs.update(parse_task_args("task_args_file", f.read()))
+ inputs.update(parse_task_args("task_args", task_args))
+ except TypeError:
+ raise TypeError()
parser.path = task_files[i]
steps, run_in_parallel, meet_precondition, ret_context = \
- parser.parse_task(self.task_id, task_args[i],
- task_args_fnames[i], inputs)
+ parser.parse_task(self.task_id, inputs)
self.context = ret_context
@@ -135,7 +142,7 @@ class Task(object): # pragma: no cover
case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
try:
- data = self._run(steps, run_in_parallel, args.output_file)
+ data = self._run(steps, run_in_parallel, args.output_file, inputs)
except KeyboardInterrupt:
raise
except Exception:
@@ -256,7 +263,7 @@ class Task(object): # pragma: no cover
for dispatcher in dispatchers:
dispatcher.flush_result_data(result)
- def _run(self, steps, run_in_parallel, output_file):
+ def _run(self, steps, run_in_parallel, output_file, inputs):
"""Deploys context and calls runners"""
if self.context:
self.context.deploy()
@@ -266,14 +273,14 @@ class Task(object): # pragma: no cover
# Start all background steps
for step in filter(_is_background_step, steps):
step["runner"] = dict(type="Duration", duration=1000000000)
- runner = self.run_one_step(step, output_file)
+ runner = self.run_one_step(step, output_file, inputs)
background_runners.append(runner)
runners = []
if run_in_parallel:
for step in steps:
if not _is_background_step(step):
- runner = self.run_one_step(step, output_file)
+ runner = self.run_one_step(step, output_file, inputs)
runners.append(runner)
# Wait for runners to finish
@@ -287,7 +294,7 @@ class Task(object): # pragma: no cover
# run serially
for step in steps:
if not _is_background_step(step):
- runner = self.run_one_step(step, output_file)
+ runner = self.run_one_step(step, output_file, inputs)
status = runner_join(runner, background_runners, self.outputs, result)
if status != 0:
LOG.error('Step NO.%s: "%s" ERROR!',
@@ -331,7 +338,7 @@ class Task(object): # pragma: no cover
else:
return op
- def run_one_step(self, step_cfg, output_file):
+ def run_one_step(self, step_cfg, output_file, inputs):
"""run one step using context"""
# default runner is Iteration
if 'runner' not in step_cfg:
@@ -344,8 +351,8 @@ class Task(object): # pragma: no cover
LOG.info("Starting runner of type '%s'", runner_cfg["type"])
# Previous steps output is the input of the next step.
- input_params = copy.deepcopy(self.outputs)
- runner.run(step_cfg, self.context, input_params)
+ inputs.update(self.outputs)
+ runner.run(step_cfg, self.context, inputs)
return runner
@@ -388,12 +395,13 @@ class TaskParser(object): # pragma: no cover
with open(self.path) as stream:
cfg = yaml_load(stream)
except IOError as ioerror:
- sys.exit(ioerror)
+ LOG.error("Open suite file failed", ioerror)
+ raise
self._check_schema(cfg["schema"], "suite")
LOG.info("\nStarting step:%s", cfg["name"])
- cur_pod = os.environ.get('NODE_NAME', None)
+ cur_pod = os.environ.get('NODE_NAME', "default")
cur_installer = os.environ.get('INSTALLER_TYPE', None)
valid_task_files = []
@@ -420,21 +428,10 @@ class TaskParser(object): # pragma: no cover
return valid_task_files, valid_task_args, valid_task_args_fnames
- def parse_task(self, task_id, task_args=None, task_args_file=None, inputs=None):
+ def parse_task(self, task_id, inputs=None):
"""parses the task file and return an context and step instances"""
LOG.info("Parsing task config: %s", self.path)
-
- try:
- kw = {}
- kw.update(inputs)
- if task_args_file:
- with open(task_args_file) as f:
- kw.update(parse_task_args("task_args_file", f.read()))
- kw.update(parse_task_args("task_args", task_args))
- kw['vnf_descriptor'] = Context.vnf_descriptor
- except TypeError:
- raise TypeError()
-
+ kw = inputs
try:
with utils.load_resource(self.path) as f:
try:
@@ -460,7 +457,7 @@ class TaskParser(object): # pragma: no cover
name_suffix = '-{}'.format(task_id[:8])
try:
context_cfg['name'] = '{}{}'.format(context_cfg['name'],
- name_suffix)
+ name_suffix)
except KeyError:
pass
# default to CSAR context