From fff342d0769e1c9f4e7900fd66b91e82c6966ac3 Mon Sep 17 00:00:00 2001 From: Michal Jagiello Date: Mon, 9 Mar 2020 14:34:14 +0000 Subject: PyExecutor ResourceResoluton helper class. Create a class to call workflow execution requests to gRPC server. Create an interceptor to use header authorization for gRPC calls. Issue-ID: CCSDK-1989 Signed-off-by: Michal Jagiello Change-Id: Ia449a089e02e7a12e31bee5e3b7debee506d8426 --- ms/py-executor/resource_resolution/README | 63 +++++ .../resource_resolution/authorization.py | 64 +++++ ms/py-executor/resource_resolution/client.py | 31 ++- .../resource_resolution/resource_resolution.py | 294 +++++++++++++++++++++ .../tests/authorization_interceptor_test.py | 50 ++++ .../tests/resource_resolution_test.py | 105 ++++++++ 6 files changed, 601 insertions(+), 6 deletions(-) create mode 100644 ms/py-executor/resource_resolution/authorization.py create mode 100644 ms/py-executor/resource_resolution/resource_resolution.py create mode 100644 ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py create mode 100644 ms/py-executor/resource_resolution/tests/resource_resolution_test.py (limited to 'ms') diff --git a/ms/py-executor/resource_resolution/README b/ms/py-executor/resource_resolution/README index 222dae499..353600445 100644 --- a/ms/py-executor/resource_resolution/README +++ b/ms/py-executor/resource_resolution/README @@ -77,4 +77,67 @@ if __name__ == "__main__": for response in client.process(generate_messages()): print(response) +``` + +### Authorizarion header + +``` +from proto.BluePrintCommon_pb2 import ActionIdentifiers, CommonHeader +from proto.BluePrintProcessing_pb2 import ExecutionServiceInput +from resource_resolution.client import Client as ResourceResolutionClient + + +def generate_messages(): + commonHeader = CommonHeader() + commonHeader.requestId = "1234" + commonHeader.subRequestId = "1234-1" + commonHeader.originatorId = "CDS" + + actionIdentifiers = ActionIdentifiers() + actionIdentifiers.blueprintName = "sample-cba" + actionIdentifiers.blueprintVersion = "1.0.0" + actionIdentifiers.actionName = "SampleScript" + + input = ExecutionServiceInput(commonHeader=commonHeader, actionIdentifiers=actionIdentifiers) + + commonHeader2 = CommonHeader() + commonHeader2.requestId = "1235" + commonHeader2.subRequestId = "1234-2" + commonHeader2.originatorId = "CDS" + + input2 = ExecutionServiceInput(commonHeader=commonHeader2, actionIdentifiers=actionIdentifiers) + + yield from [input, input2] + + +if __name__ == "__main__": + with ResourceResolutionClient("127.0.0.1:9111", use_header_auth=True, header_auth_token="Token test") as client: + for response in client.process(generate_messages()): + print(response) + +``` + +# ResourceResoulution helper class + +## How to use examples + +### Insecure channel + +``` +from resource_resolution.resource_resolution import ResourceResolution, WorkflowExecution, WorkflowExecutionResult + + +if __name__ == "__main__": + with ResourceResolution(use_header_auth=True, header_auth_token="Basic token") as rr: + for response in rr.execute_workflows( # type: WorkflowExecutionResult + WorkflowExecution( + blueprint_name="blueprintName", + blueprint_version="1.0", + workflow_name="resource-assignment" + ) + ): + if response.has_error: + print(response.error_message) + else: + print(response.payload) ``` \ No newline at end of file diff --git a/ms/py-executor/resource_resolution/authorization.py b/ms/py-executor/resource_resolution/authorization.py new file mode 100644 index 000000000..ae5954ecc --- /dev/null +++ b/ms/py-executor/resource_resolution/authorization.py @@ -0,0 +1,64 @@ +"""Copyright 2020 Deutsche Telekom. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from collections import namedtuple +from typing import Any, Callable, List + +from grpc import ClientCallDetails, StreamStreamClientInterceptor + + +class NewClientCallDetails( + namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), ClientCallDetails +): + """Namedtuple class to store metadata. + + It's impossible to change original metadata in ClientCallDetails object + passed as a parameter to intercept method, so this class is going to get + original metadata tuple and add the authorization one. + """ + + pass + + +class AuthTokenInterceptor(StreamStreamClientInterceptor): + """Interceptor class to set authorization header. + + Set authorization header (but it can be any header also) for a gRPC call. + """ + + def __init__(self, token: str, header: str = "authorization") -> None: + """Initialize interceptor. + + Set token and header which should be set into call. By default header is "authorization". + Header have to be lowercase. + + Args: + token (str): Token value to be set. + header (str, optional): Header name. It must be lowercase. Defaults to "authorization". + """ + self.token: str = token + if not header.islower(): + raise ValueError("Header must be lowercase.") + self.header: str = header + + def intercept_stream_stream( + self, continuation: Callable, client_call_details: ClientCallDetails, request_iterator: Any + ) -> Any: + """Add header into metadata.""" + metadata: List = list(client_call_details.metadata) if client_call_details.metadata is not None else [] + metadata.append((self.header, self.token,)) + new_client_call_details: NewClientCallDetails = NewClientCallDetails( + client_call_details.method, client_call_details.timeout, metadata, client_call_details.credentials + ) + return continuation(new_client_call_details, request_iterator) diff --git a/ms/py-executor/resource_resolution/client.py b/ms/py-executor/resource_resolution/client.py index 89087745c..fee168628 100644 --- a/ms/py-executor/resource_resolution/client.py +++ b/ms/py-executor/resource_resolution/client.py @@ -14,12 +14,20 @@ limitations under the License. """ from logging import Logger, getLogger from types import TracebackType -from typing import Iterable, List, Optional, Type - -from grpc import Channel, insecure_channel, secure_channel, ssl_channel_credentials +from typing import Iterable, Optional, Type + +from grpc import ( + Channel, + insecure_channel, + intercept_channel, + secure_channel, + ssl_channel_credentials, +) from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput from proto.BluePrintProcessing_pb2_grpc import BluePrintProcessingServiceStub +from .authorization import AuthTokenInterceptor + class Client: """Resource resoulution client class.""" @@ -28,20 +36,29 @@ class Client: self, server_address: str, *, + # TLS/SSL configuration use_ssl: bool = False, root_certificates: bytes = None, private_key: bytes = None, certificate_chain: bytes = None, + # Authentication header configuration + use_header_auth: bool = False, + header_auth_token: str = None, ) -> None: """Client class initialization. :param server_address: Address to server to connect. :param use_ssl: Boolean flag to determine if secure channel should be created or not. Keyword argument. :param root_certificates: The PEM-encoded root certificates. None if it shouldn't be used. Keyword argument. - :param private_key: The PEM-encoded private key as a byte string, or None if no private key should be used. Keyword argument. - :param certificate_chain: The PEM-encoded certificate chain as a byte string to use or or None if no certificate chain should be used. Keyword argument. + :param private_key: The PEM-encoded private key as a byte string, or None if no private key should be used. + Keyword argument. + :param certificate_chain: The PEM-encoded certificate chain as a byte string to use or or None if + no certificate chain should be used. Keyword argument. + :param use_header_auth: Boolean flag to determine if authorization headed shoud be added for every call or not. + Keyword argument. + :param header_auth_token: Authorization token value. Keyword argument. """ - self.logger = getLogger(__name__) + self.logger: Logger = getLogger(__name__) if use_ssl: self.channel: Channel = secure_channel( server_address, ssl_channel_credentials(root_certificates, private_key, certificate_chain) @@ -50,6 +67,8 @@ class Client: else: self.channel: Channel = insecure_channel(server_address) self.logger.debug(f"Create insecure channel to connect to {server_address}") + if use_header_auth: + self.channel: Channel = intercept_channel(self.channel, AuthTokenInterceptor(header_auth_token)) self.stub: BluePrintProcessingServiceStub = BluePrintProcessingServiceStub(self.channel) def close(self) -> None: diff --git a/ms/py-executor/resource_resolution/resource_resolution.py b/ms/py-executor/resource_resolution/resource_resolution.py new file mode 100644 index 000000000..e4f162f8f --- /dev/null +++ b/ms/py-executor/resource_resolution/resource_resolution.py @@ -0,0 +1,294 @@ +"""Copyright 2020 Deutsche Telekom. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from enum import Enum, unique +from logging import Logger, getLogger +from types import TracebackType +from typing import Any, Dict, Generator, Optional, Type + +from google.protobuf import json_format + +from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput + +from .client import Client + + +@unique +class WorkflowMode(Enum): + """Workflow mode enumerator. + + Workflow can be executed in two modes: synchronously and asynchronously. + This enumerator stores valid values to set the mode: SYNC for synchronously mode and ASYNC for asynchronously. + """ + + SYNC = "sync" + ASYNC = "async" + + +class WorkflowExecution: + """Wokflow execution class. + + Describes workflow to call. Set blueprint name and version and workflow name to execute. + Workflow inputs are optional, by default set to empty directory. + Workflow mode is also optional. It is set by default to call workflow synchronously. + """ + + def __init__( + self, + blueprint_name: str, + blueprint_version: str, + workflow_name: str, + workflow_inputs: Dict[str, Any] = None, + workflow_mode: WorkflowMode = WorkflowMode.SYNC, + ) -> None: + """Initialize workflow execution. + + Get all needed information to execute workflow. + + Args: + blueprint_name (str): Blueprint name to execute workflow from. + blueprint_version (str): Blueprint version. + workflow_name (str): Name of the workflow to execute + workflow_inputs (Dict[str, Any], optional): Key-value workflow inputs. Defaults to None. + workflow_mode (WorkflowMode, optional): Workflow execution mode. It can be run synchronously or + asynchronously. Defaults to WorkflowMode.SYNC. + """ + self.blueprint_name: str = blueprint_name + self.blueprint_version: str = blueprint_version + self.workflow_name: str = workflow_name + if workflow_inputs is None: + workflow_inputs = {} + self.workflow_inputs: Dict[str, Any] = workflow_inputs + self.workflow_mode: WorkflowMode = workflow_mode + + @property + def message(self) -> ExecutionServiceInput: + """Workflow execution protobuf message. + + This message is going to be sent to gRPC server to execute workflow. + + Returns: + ExecutionServiceInput: Properly filled protobuf message. + """ + execution_msg: ExecutionServiceInput = ExecutionServiceInput() + execution_msg.actionIdentifiers.mode = self.workflow_mode.value + execution_msg.actionIdentifiers.blueprintName = self.blueprint_name + execution_msg.actionIdentifiers.blueprintVersion = self.blueprint_version + execution_msg.actionIdentifiers.actionName = self.workflow_name + execution_msg.payload.update({f"{self.workflow_name}-request": self.workflow_inputs}) + return execution_msg + + +class WorkflowExecutionResult: + """Result of workflow execution. + + Store both workflow data and the result returns by server. + """ + + def __init__(self, workflow_execution: WorkflowExecution, execution_output: ExecutionServiceOutput) -> None: + """Initialize workflow execution result object. + + Stores workflow execution data and execution result. + + Args: + workflow_execution (WorkflowExecution): WorkflowExecution object which was used to call request. + execution_output (ExecutionServiceOutput): gRPC server response. + """ + self.workflow_execution: WorkflowExecution = workflow_execution + self.execution_output: ExecutionServiceOutput = execution_output + + @property + def blueprint_name(self) -> str: + """Name of blueprint used to call workflow. + + This value is taken from server response not request (should be the same). + + Returns: + str: Blueprint name + """ + return self.execution_output.actionIdentifiers.blueprintName + + @property + def blueprint_version(self) -> str: + """Blueprint version. + + This value is taken from server response not request (should be the same). + + Returns: + str: Blueprint version + """ + return self.execution_output.actionIdentifiers.blueprintVersion + + @property + def workflow_name(self) -> str: + """Workflow name. + + This value is taken from server response not request (should be the same). + + Returns: + str: Workflow name + """ + return self.execution_output.actionIdentifiers.actionName + + @property + def has_error(self) -> bool: + """Returns bool if request returns error or not. + + Returns: + bool: True if response has status code different than 200 + """ + return self.execution_output.status.code != 200 + + @property + def error_message(self) -> str: + """Error message. + + This property is available only if response has error. Otherwise AttributeError will be raised. + + Raises: + AttributeError: Response has 200 response code and hasn't error message. + + Returns: + str: Error message returned by server + """ + if self.has_error: + return self.execution_output.status.errorMessage + raise AttributeError("Execution does not finish with error") + + @property + def payload(self) -> dict: + """Response payload. + + Payload retured by the server is migrated to Python dict. + + Returns: + dict: Response's payload. + """ + return json_format.MessageToDict(self.execution_output.payload) + + +class ResourceResolution: + """Resource resolution class. + + Helper class to connect to blueprintprocessor's gRPC server, send request to execute workflow and parse responses. + Blueprint with workflow must be deployed before workflow request. + It's possible to create both secre or unsecure connection (without SSL/TLS). + """ + + def __init__( + self, + *, + server_address: str = "127.0.0.1", + server_port: int = "9111", + use_ssl: bool = False, + root_certificates: bytes = None, + private_key: bytes = None, + certificate_chain: bytes = None, + # Authentication header configuration + use_header_auth: bool = False, + header_auth_token: str = None, + ) -> None: + """Resource resolution object initialization. + + Args: + server_address (str, optional): gRPC server address. Defaults to "127.0.0.1". + server_port (int, optional): gRPC server address port. Defaults to "9111". + use_ssl (bool, optional): Boolean flag to determine if secure channel should be created or not. + Defaults to False. + root_certificates (bytes, optional): The PEM-encoded root certificates. None if it shouldn't be used. + Defaults to None. + private_key (bytes, optional): The PEM-encoded private key as a byte string, or None if no private key + should be used. Defaults to None. + certificate_chain (bytes, optional): The PEM-encoded certificate chain as a byte string to use or or None if + no certificate chain should be used. Defaults to None. + use_header_auth (bool, optional): Boolean flag to determine if authorization headed shoud be added for + every call or not. Defaults to False. + header_auth_token (str, optional): Authorization token value. Defaults to None. + """ + # Logger + self.logger: Logger = getLogger(__name__) + # Client settings + self.client_server_address: str = server_address + self.client_server_port: str = server_port + self.client_use_ssl: bool = use_ssl + self.client_root_certificates: bytes = root_certificates + self.client_private_key: bytes = private_key + self.client_certificate_chain: bytes = certificate_chain + self.client_use_header_auth: bool = use_header_auth + self.client_header_auth_token: str = header_auth_token + self.client: Client = None + + def __enter__(self) -> "ResourceResolution": + """Enter ResourceResolution instance context. + + Client connection is created. + """ + self.client = Client( + server_address=f"{self.client_server_address}:{self.client_server_port}", + use_ssl=self.client_use_ssl, + root_certificates=self.client_root_certificates, + private_key=self.client_private_key, + certificate_chain=self.client_certificate_chain, + use_header_auth=self.client_use_header_auth, + header_auth_token=self.client_header_auth_token, + ) + return self + + def __exit__( + self, + unused_exc_type: Optional[Type[BaseException]], + unused_exc_value: Optional[BaseException], + unused_traceback: Optional[TracebackType], + ) -> None: + """Exit ResourceResolution instance context. + + Client connection is closed. + """ + self.client.close() + + def execute_workflows(self, *workflows: WorkflowExecution) -> Generator[WorkflowExecutionResult, None, None]: + """Execute provided workflows. + + Workflows are going to be execured using one gRPC API call. Depends of implementation that may has + some consequences. In some cases if any request fails all requests after that won't be called. + + Responses and zipped with workflows and WorkflowExecutionResult object is initialized and yielded. + + Raises: + AttributeError: Raises if client object is not created. It occurs only if you not uses context manager. + Then user have to create client instance for ResourceResolution object by himself calling: + ``` + resource_resoulution.client = Client( + server_address=f"{resource_resoulution.client_server_address}:{resource_resoulution.client_server_port}", + use_ssl=resource_resoulution.client_use_ssl, + root_certificates=resource_resoulution.client_root_certificates, + private_key=resource_resoulution.client_private_key, + certificate_chain=resource_resoulution.client_certificate_chain, + use_header_auth=resource_resoulution.client_use_header_auth, + header_auth_token=resource_resoulution.client_header_auth_token, + ) + ``` + Remeber also to close client connection. + + Returns: + Generator[WorkflowExecutionResult, None, None]: WorkflowExecutionResult object + with both WorkflowExection object and server response for it's request. + """ + self.logger.debug("Execute workflows") + if not self.client: + raise AttributeError("gRPC client not connected") + + for response, workflow in zip(self.client.process((workflow.message for workflow in workflows)), workflows): + yield WorkflowExecutionResult(workflow, response) diff --git a/ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py b/ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py new file mode 100644 index 000000000..4b03f0b36 --- /dev/null +++ b/ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py @@ -0,0 +1,50 @@ +"""Copyright 2020 Deutsche Telekom. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from unittest.mock import MagicMock, _Call + +import pytest + +from resource_resolution.authorization import AuthTokenInterceptor, NewClientCallDetails + + +def test_resource_resolution_auth_token_interceptor(): + """Test AuthTokenInterceptor class. + + - Checks if it's correctly set default value. + - Checks if it's correctly set passed values. + - Checks if it's correctly checked if all header characters are lowercase. + - Checks if continuation function is called with headers setted + """ + interceptor: AuthTokenInterceptor = AuthTokenInterceptor("test_token", header="header") + assert interceptor.token == "test_token" + assert interceptor.header == "header" + + interceptor: AuthTokenInterceptor = AuthTokenInterceptor("test_token") + assert interceptor.token == "test_token" + assert interceptor.header == "authorization" + + with pytest.raises(ValueError): + AuthTokenInterceptor("test_token", header="Auth") + + continuation_mock: MagicMock = MagicMock() + client_call_details: MagicMock = MagicMock() + request_iterator: MagicMock = MagicMock() + + interceptor.intercept_stream_stream(continuation_mock, client_call_details, request_iterator) + continuation_mock.assert_called_once() + client_call_details_argument: _Call = continuation_mock.call_args_list[0][0][0] # Get NewClientCallDetails instance + assert isinstance(client_call_details_argument, NewClientCallDetails) + assert client_call_details_argument.metadata[0] == (interceptor.header, interceptor.token) diff --git a/ms/py-executor/resource_resolution/tests/resource_resolution_test.py b/ms/py-executor/resource_resolution/tests/resource_resolution_test.py new file mode 100644 index 000000000..8a41357e6 --- /dev/null +++ b/ms/py-executor/resource_resolution/tests/resource_resolution_test.py @@ -0,0 +1,105 @@ +"""Copyright 2020 Deutsche Telekom. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from google.protobuf import json_format +from pytest import raises + +from resource_resolution.resource_resolution import ( + ExecutionServiceInput, + ExecutionServiceOutput, + WorkflowExecution, + WorkflowExecutionResult, + WorkflowMode, +) + + +def test_workflow_execution_class(): + """Workflow execution class tests. + + - Test initialization and default values + - Test request message formatting + """ + # Without inputs + workflow_execution: WorkflowExecution = WorkflowExecution("test blueprint", "test version", "test workflow") + assert workflow_execution.blueprint_name == "test blueprint" + assert workflow_execution.blueprint_version == "test version" + assert workflow_execution.workflow_name == "test workflow" + assert workflow_execution.workflow_inputs == {} + assert workflow_execution.workflow_mode == WorkflowMode.SYNC + + msg: ExecutionServiceInput = workflow_execution.message + msg_dict: dict = json_format.MessageToDict(msg) + assert msg_dict["actionIdentifiers"]["blueprintName"] == "test blueprint" + assert msg_dict["actionIdentifiers"]["blueprintVersion"] == "test version" + assert msg_dict["actionIdentifiers"]["actionName"] == "test workflow" + assert msg_dict["actionIdentifiers"]["mode"] == "sync" + assert list(msg_dict["payload"].keys())[0] == "test workflow-request" + assert msg_dict["payload"]["test workflow-request"] == {} + + # With inputs + workflow_execution: WorkflowExecution = WorkflowExecution( + "test blueprint2", + "test version2", + "test workflow2", + workflow_inputs={"test": "test"}, + workflow_mode=WorkflowMode.ASYNC, + ) + assert workflow_execution.blueprint_name == "test blueprint2" + assert workflow_execution.blueprint_version == "test version2" + assert workflow_execution.workflow_name == "test workflow2" + assert workflow_execution.workflow_inputs == {"test": "test"} + assert workflow_execution.workflow_mode == WorkflowMode.ASYNC + + msg: ExecutionServiceInput = workflow_execution.message + msg_dict: dict = json_format.MessageToDict(msg) + assert msg_dict["actionIdentifiers"]["blueprintName"] == "test blueprint2" + assert msg_dict["actionIdentifiers"]["blueprintVersion"] == "test version2" + assert msg_dict["actionIdentifiers"]["actionName"] == "test workflow2" + assert msg_dict["actionIdentifiers"]["mode"] == "async" + assert list(msg_dict["payload"].keys())[0] == "test workflow2-request" + assert msg_dict["payload"]["test workflow2-request"] == {"test": "test"} + + +def test_workflow_execution_result_class(): + """Workflow execution result class tests. + + - Test initizalization and default values + - Test `has_error` property + - Test `error_message` property + - Test payload formatting + """ + workflow_execution: WorkflowExecution = WorkflowExecution("test blueprint", "test version", "test workflow") + execution_output: ExecutionServiceOutput = ExecutionServiceOutput() + execution_output.actionIdentifiers.blueprintName = "test blueprint" + execution_output.actionIdentifiers.blueprintVersion = "test version" + execution_output.actionIdentifiers.actionName = "test workflow" + execution_output.status.code = 200 + + execution_result: WorkflowExecutionResult = WorkflowExecutionResult(workflow_execution, execution_output) + assert not execution_result.has_error + with raises(AttributeError): + execution_result.error_message + assert execution_result.payload == {} + assert execution_result.blueprint_name == "test blueprint" + assert execution_result.blueprint_version == "test version" + assert execution_result.workflow_name == "test workflow" + + execution_output.payload.update({"test_key": "test_value"}) + execution_result: WorkflowExecutionResult = WorkflowExecutionResult(workflow_execution, execution_output) + assert execution_result.payload == {"test_key": "test_value"} + + execution_output.status.code = 500 + assert execution_result.has_error + assert execution_result.error_message == "" -- cgit 1.2.3-korg