diff options
Diffstat (limited to 'ms/py-executor/resource_resolution/resource_resolution.py')
-rw-r--r-- | ms/py-executor/resource_resolution/resource_resolution.py | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/ms/py-executor/resource_resolution/resource_resolution.py b/ms/py-executor/resource_resolution/resource_resolution.py new file mode 100644 index 000000000..e4f162f8f --- /dev/null +++ b/ms/py-executor/resource_resolution/resource_resolution.py @@ -0,0 +1,294 @@ +"""Copyright 2020 Deutsche Telekom. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from enum import Enum, unique +from logging import Logger, getLogger +from types import TracebackType +from typing import Any, Dict, Generator, Optional, Type + +from google.protobuf import json_format + +from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput + +from .client import Client + + +@unique +class WorkflowMode(Enum): + """Workflow mode enumerator. + + Workflow can be executed in two modes: synchronously and asynchronously. + This enumerator stores valid values to set the mode: SYNC for synchronously mode and ASYNC for asynchronously. + """ + + SYNC = "sync" + ASYNC = "async" + + +class WorkflowExecution: + """Wokflow execution class. + + Describes workflow to call. Set blueprint name and version and workflow name to execute. + Workflow inputs are optional, by default set to empty directory. + Workflow mode is also optional. It is set by default to call workflow synchronously. + """ + + def __init__( + self, + blueprint_name: str, + blueprint_version: str, + workflow_name: str, + workflow_inputs: Dict[str, Any] = None, + workflow_mode: WorkflowMode = WorkflowMode.SYNC, + ) -> None: + """Initialize workflow execution. + + Get all needed information to execute workflow. + + Args: + blueprint_name (str): Blueprint name to execute workflow from. + blueprint_version (str): Blueprint version. + workflow_name (str): Name of the workflow to execute + workflow_inputs (Dict[str, Any], optional): Key-value workflow inputs. Defaults to None. + workflow_mode (WorkflowMode, optional): Workflow execution mode. It can be run synchronously or + asynchronously. Defaults to WorkflowMode.SYNC. + """ + self.blueprint_name: str = blueprint_name + self.blueprint_version: str = blueprint_version + self.workflow_name: str = workflow_name + if workflow_inputs is None: + workflow_inputs = {} + self.workflow_inputs: Dict[str, Any] = workflow_inputs + self.workflow_mode: WorkflowMode = workflow_mode + + @property + def message(self) -> ExecutionServiceInput: + """Workflow execution protobuf message. + + This message is going to be sent to gRPC server to execute workflow. + + Returns: + ExecutionServiceInput: Properly filled protobuf message. + """ + execution_msg: ExecutionServiceInput = ExecutionServiceInput() + execution_msg.actionIdentifiers.mode = self.workflow_mode.value + execution_msg.actionIdentifiers.blueprintName = self.blueprint_name + execution_msg.actionIdentifiers.blueprintVersion = self.blueprint_version + execution_msg.actionIdentifiers.actionName = self.workflow_name + execution_msg.payload.update({f"{self.workflow_name}-request": self.workflow_inputs}) + return execution_msg + + +class WorkflowExecutionResult: + """Result of workflow execution. + + Store both workflow data and the result returns by server. + """ + + def __init__(self, workflow_execution: WorkflowExecution, execution_output: ExecutionServiceOutput) -> None: + """Initialize workflow execution result object. + + Stores workflow execution data and execution result. + + Args: + workflow_execution (WorkflowExecution): WorkflowExecution object which was used to call request. + execution_output (ExecutionServiceOutput): gRPC server response. + """ + self.workflow_execution: WorkflowExecution = workflow_execution + self.execution_output: ExecutionServiceOutput = execution_output + + @property + def blueprint_name(self) -> str: + """Name of blueprint used to call workflow. + + This value is taken from server response not request (should be the same). + + Returns: + str: Blueprint name + """ + return self.execution_output.actionIdentifiers.blueprintName + + @property + def blueprint_version(self) -> str: + """Blueprint version. + + This value is taken from server response not request (should be the same). + + Returns: + str: Blueprint version + """ + return self.execution_output.actionIdentifiers.blueprintVersion + + @property + def workflow_name(self) -> str: + """Workflow name. + + This value is taken from server response not request (should be the same). + + Returns: + str: Workflow name + """ + return self.execution_output.actionIdentifiers.actionName + + @property + def has_error(self) -> bool: + """Returns bool if request returns error or not. + + Returns: + bool: True if response has status code different than 200 + """ + return self.execution_output.status.code != 200 + + @property + def error_message(self) -> str: + """Error message. + + This property is available only if response has error. Otherwise AttributeError will be raised. + + Raises: + AttributeError: Response has 200 response code and hasn't error message. + + Returns: + str: Error message returned by server + """ + if self.has_error: + return self.execution_output.status.errorMessage + raise AttributeError("Execution does not finish with error") + + @property + def payload(self) -> dict: + """Response payload. + + Payload retured by the server is migrated to Python dict. + + Returns: + dict: Response's payload. + """ + return json_format.MessageToDict(self.execution_output.payload) + + +class ResourceResolution: + """Resource resolution class. + + Helper class to connect to blueprintprocessor's gRPC server, send request to execute workflow and parse responses. + Blueprint with workflow must be deployed before workflow request. + It's possible to create both secre or unsecure connection (without SSL/TLS). + """ + + def __init__( + self, + *, + server_address: str = "127.0.0.1", + server_port: int = "9111", + use_ssl: bool = False, + root_certificates: bytes = None, + private_key: bytes = None, + certificate_chain: bytes = None, + # Authentication header configuration + use_header_auth: bool = False, + header_auth_token: str = None, + ) -> None: + """Resource resolution object initialization. + + Args: + server_address (str, optional): gRPC server address. Defaults to "127.0.0.1". + server_port (int, optional): gRPC server address port. Defaults to "9111". + use_ssl (bool, optional): Boolean flag to determine if secure channel should be created or not. + Defaults to False. + root_certificates (bytes, optional): The PEM-encoded root certificates. None if it shouldn't be used. + Defaults to None. + private_key (bytes, optional): The PEM-encoded private key as a byte string, or None if no private key + should be used. Defaults to None. + certificate_chain (bytes, optional): The PEM-encoded certificate chain as a byte string to use or or None if + no certificate chain should be used. Defaults to None. + use_header_auth (bool, optional): Boolean flag to determine if authorization headed shoud be added for + every call or not. Defaults to False. + header_auth_token (str, optional): Authorization token value. Defaults to None. + """ + # Logger + self.logger: Logger = getLogger(__name__) + # Client settings + self.client_server_address: str = server_address + self.client_server_port: str = server_port + self.client_use_ssl: bool = use_ssl + self.client_root_certificates: bytes = root_certificates + self.client_private_key: bytes = private_key + self.client_certificate_chain: bytes = certificate_chain + self.client_use_header_auth: bool = use_header_auth + self.client_header_auth_token: str = header_auth_token + self.client: Client = None + + def __enter__(self) -> "ResourceResolution": + """Enter ResourceResolution instance context. + + Client connection is created. + """ + self.client = Client( + server_address=f"{self.client_server_address}:{self.client_server_port}", + use_ssl=self.client_use_ssl, + root_certificates=self.client_root_certificates, + private_key=self.client_private_key, + certificate_chain=self.client_certificate_chain, + use_header_auth=self.client_use_header_auth, + header_auth_token=self.client_header_auth_token, + ) + return self + + def __exit__( + self, + unused_exc_type: Optional[Type[BaseException]], + unused_exc_value: Optional[BaseException], + unused_traceback: Optional[TracebackType], + ) -> None: + """Exit ResourceResolution instance context. + + Client connection is closed. + """ + self.client.close() + + def execute_workflows(self, *workflows: WorkflowExecution) -> Generator[WorkflowExecutionResult, None, None]: + """Execute provided workflows. + + Workflows are going to be execured using one gRPC API call. Depends of implementation that may has + some consequences. In some cases if any request fails all requests after that won't be called. + + Responses and zipped with workflows and WorkflowExecutionResult object is initialized and yielded. + + Raises: + AttributeError: Raises if client object is not created. It occurs only if you not uses context manager. + Then user have to create client instance for ResourceResolution object by himself calling: + ``` + resource_resoulution.client = Client( + server_address=f"{resource_resoulution.client_server_address}:{resource_resoulution.client_server_port}", + use_ssl=resource_resoulution.client_use_ssl, + root_certificates=resource_resoulution.client_root_certificates, + private_key=resource_resoulution.client_private_key, + certificate_chain=resource_resoulution.client_certificate_chain, + use_header_auth=resource_resoulution.client_use_header_auth, + header_auth_token=resource_resoulution.client_header_auth_token, + ) + ``` + Remeber also to close client connection. + + Returns: + Generator[WorkflowExecutionResult, None, None]: WorkflowExecutionResult object + with both WorkflowExection object and server response for it's request. + """ + self.logger.debug("Execute workflows") + if not self.client: + raise AttributeError("gRPC client not connected") + + for response, workflow in zip(self.client.process((workflow.message for workflow in workflows)), workflows): + yield WorkflowExecutionResult(workflow, response) |