diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py | 38 |
1 files changed, 16 insertions, 22 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py index 0c52e32..eb4efeb 100644 --- a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflow_runner.py @@ -30,6 +30,7 @@ from ..modeling import models from ..modeling import utils as modeling_utils from ..utils.imports import import_fullname + DEFAULT_TASK_MAX_ATTEMPTS = 30 DEFAULT_TASK_RETRY_INTERVAL = 30 @@ -68,30 +69,24 @@ class WorkflowRunner(object): self._resource_storage = resource_storage # the IDs are stored rather than the models themselves, so this module could be used - # by several threads without raising errors on model objects shared between threadsF + # by several threads without raising errors on model objects shared between threads if self._is_resume: - self._service_id = service_id - # self._service_id = self.execution.service.id - # self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name - self._workflow_name = workflow_name - self._validate_workflow_exists_for_service() self._execution_id = execution_id - + self._service_id = self.execution.service.id + self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name else: self._service_id = service_id self._workflow_name = workflow_name self._validate_workflow_exists_for_service() self._execution_id = self._create_execution_model(inputs).id - self._create_execution_model(inputs, execution_id) - self._workflow_context = WorkflowContext( name=self.__class__.__name__, model_storage=self._model_storage, resource_storage=resource_storage, service_id=service_id, - execution_id=execution_id, + execution_id=self._execution_id, workflow_name=self._workflow_name, task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval) @@ -100,13 +95,13 @@ class WorkflowRunner(object): executor = executor or ProcessExecutor(plugin_manager=plugin_manager) # transforming the execution inputs to dict, to pass them to the workflow function - # execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues()) + execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues()) - # if not self._is_resume: - # workflow_fn = self._get_workflow_fn() - # self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - # compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__) - # compiler.compile(self._tasks_graph) + if not self._is_resume: + workflow_fn = self._get_workflow_fn() + self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) + compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__) + compiler.compile(self._tasks_graph) self._engine = engine.Engine(executors={executor.__class__: executor}) @@ -116,7 +111,7 @@ class WorkflowRunner(object): @property def execution(self): - return self._model_storage.execution.get(self._execution_id) + return self._model_storage.execution.get(self.execution_id) @property def service(self): @@ -130,7 +125,7 @@ class WorkflowRunner(object): def cancel(self): self._engine.cancel_execution(ctx=self._workflow_context) - def _create_execution_model(self, inputs, execution_id): + def _create_execution_model(self, inputs): execution = models.Execution( created_at=datetime.utcnow(), service=self.service, @@ -148,7 +143,6 @@ class WorkflowRunner(object): supplied_inputs=inputs or {}) execution.inputs = modeling_utils.merge_parameter_values( inputs, workflow_inputs, model_cls=models.Input) - execution.id = execution_id # TODO: these two following calls should execute atomically self._validate_no_active_executions(execution) self._model_storage.execution.put(execution) @@ -156,17 +150,17 @@ class WorkflowRunner(object): def _validate_workflow_exists_for_service(self): if self._workflow_name not in self.service.workflows and \ - self._workflow_name not in builtin.BUILTIN_WORKFLOWS: + self._workflow_name not in builtin.BUILTIN_WORKFLOWS: raise exceptions.UndeclaredWorkflowError( 'No workflow policy {0} declared in service {1}' - .format(self._workflow_name, self.service.name)) + .format(self._workflow_name, self.service.name)) def _validate_no_active_executions(self, execution): active_executions = [e for e in self.service.executions if e.is_active()] if active_executions: raise exceptions.ActiveExecutionsError( "Can't start execution; Service {0} has an active execution with ID {1}" - .format(self.service.name, active_executions[0].id)) + .format(self.service.name, active_executions[0].id)) def _get_workflow_fn(self): if self._workflow_name in builtin.BUILTIN_WORKFLOWS: |