summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py38
1 files changed, 16 insertions, 22 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py
index 0c52e32..eb4efeb 100644
--- a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py
@@ -30,6 +30,7 @@ from ..modeling import models
from ..modeling import utils as modeling_utils
from ..utils.imports import import_fullname
+
DEFAULT_TASK_MAX_ATTEMPTS = 30
DEFAULT_TASK_RETRY_INTERVAL = 30
@@ -68,30 +69,24 @@ class WorkflowRunner(object):
self._resource_storage = resource_storage
# the IDs are stored rather than the models themselves, so this module could be used
- # by several threads without raising errors on model objects shared between threadsF
+ # by several threads without raising errors on model objects shared between threads
if self._is_resume:
- self._service_id = service_id
- # self._service_id = self.execution.service.id
- # self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
- self._workflow_name = workflow_name
- self._validate_workflow_exists_for_service()
self._execution_id = execution_id
-
+ self._service_id = self.execution.service.id
+ self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
else:
self._service_id = service_id
self._workflow_name = workflow_name
self._validate_workflow_exists_for_service()
self._execution_id = self._create_execution_model(inputs).id
- self._create_execution_model(inputs, execution_id)
-
self._workflow_context = WorkflowContext(
name=self.__class__.__name__,
model_storage=self._model_storage,
resource_storage=resource_storage,
service_id=service_id,
- execution_id=execution_id,
+ execution_id=self._execution_id,
workflow_name=self._workflow_name,
task_max_attempts=task_max_attempts,
task_retry_interval=task_retry_interval)
@@ -100,13 +95,13 @@ class WorkflowRunner(object):
executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
# transforming the execution inputs to dict, to pass them to the workflow function
- # execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues())
+ execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues())
- # if not self._is_resume:
- # workflow_fn = self._get_workflow_fn()
- # self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
- # compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
- # compiler.compile(self._tasks_graph)
+ if not self._is_resume:
+ workflow_fn = self._get_workflow_fn()
+ self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
+ compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
+ compiler.compile(self._tasks_graph)
self._engine = engine.Engine(executors={executor.__class__: executor})
@@ -116,7 +111,7 @@ class WorkflowRunner(object):
@property
def execution(self):
- return self._model_storage.execution.get(self._execution_id)
+ return self._model_storage.execution.get(self.execution_id)
@property
def service(self):
@@ -130,7 +125,7 @@ class WorkflowRunner(object):
def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
- def _create_execution_model(self, inputs, execution_id):
+ def _create_execution_model(self, inputs):
execution = models.Execution(
created_at=datetime.utcnow(),
service=self.service,
@@ -148,7 +143,6 @@ class WorkflowRunner(object):
supplied_inputs=inputs or {})
execution.inputs = modeling_utils.merge_parameter_values(
inputs, workflow_inputs, model_cls=models.Input)
- execution.id = execution_id
# TODO: these two following calls should execute atomically
self._validate_no_active_executions(execution)
self._model_storage.execution.put(execution)
@@ -156,17 +150,17 @@ class WorkflowRunner(object):
def _validate_workflow_exists_for_service(self):
if self._workflow_name not in self.service.workflows and \
- self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+ self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
raise exceptions.UndeclaredWorkflowError(
'No workflow policy {0} declared in service {1}'
- .format(self._workflow_name, self.service.name))
+ .format(self._workflow_name, self.service.name))
def _validate_no_active_executions(self, execution):
active_executions = [e for e in self.service.executions if e.is_active()]
if active_executions:
raise exceptions.ActiveExecutionsError(
"Can't start execution; Service {0} has an active execution with ID {1}"
- .format(self.service.name, active_executions[0].id))
+ .format(self.service.name, active_executions[0].id))
def _get_workflow_fn(self):
if self._workflow_name in builtin.BUILTIN_WORKFLOWS: