summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichal Jagiello <michal.jagiello@t-mobile.pl>2020-03-09 14:34:14 +0000
committerMichal Jagiello <michal.jagiello@t-mobile.pl>2020-03-11 09:30:15 +0000
commitfff342d0769e1c9f4e7900fd66b91e82c6966ac3 (patch)
tree5144bb755abdbe3a5595cc80e23d38d3bbb2da70
parent88929cf6c21023328644fc637627371733b085d8 (diff)
PyExecutor ResourceResoluton helper class.
Create a class to call workflow execution requests to gRPC server. Create an interceptor to use header authorization for gRPC calls. Issue-ID: CCSDK-1989 Signed-off-by: Michal Jagiello <michal.jagiello@t-mobile.pl> Change-Id: Ia449a089e02e7a12e31bee5e3b7debee506d8426
-rw-r--r--ms/py-executor/resource_resolution/README63
-rw-r--r--ms/py-executor/resource_resolution/authorization.py64
-rw-r--r--ms/py-executor/resource_resolution/client.py31
-rw-r--r--ms/py-executor/resource_resolution/resource_resolution.py294
-rw-r--r--ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py50
-rw-r--r--ms/py-executor/resource_resolution/tests/resource_resolution_test.py105
6 files changed, 601 insertions, 6 deletions
diff --git a/ms/py-executor/resource_resolution/README b/ms/py-executor/resource_resolution/README
index 222dae499..353600445 100644
--- a/ms/py-executor/resource_resolution/README
+++ b/ms/py-executor/resource_resolution/README
@@ -77,4 +77,67 @@ if __name__ == "__main__":
for response in client.process(generate_messages()):
print(response)
+```
+
+### Authorizarion header
+
+```
+from proto.BluePrintCommon_pb2 import ActionIdentifiers, CommonHeader
+from proto.BluePrintProcessing_pb2 import ExecutionServiceInput
+from resource_resolution.client import Client as ResourceResolutionClient
+
+
+def generate_messages():
+ commonHeader = CommonHeader()
+ commonHeader.requestId = "1234"
+ commonHeader.subRequestId = "1234-1"
+ commonHeader.originatorId = "CDS"
+
+ actionIdentifiers = ActionIdentifiers()
+ actionIdentifiers.blueprintName = "sample-cba"
+ actionIdentifiers.blueprintVersion = "1.0.0"
+ actionIdentifiers.actionName = "SampleScript"
+
+ input = ExecutionServiceInput(commonHeader=commonHeader, actionIdentifiers=actionIdentifiers)
+
+ commonHeader2 = CommonHeader()
+ commonHeader2.requestId = "1235"
+ commonHeader2.subRequestId = "1234-2"
+ commonHeader2.originatorId = "CDS"
+
+ input2 = ExecutionServiceInput(commonHeader=commonHeader2, actionIdentifiers=actionIdentifiers)
+
+ yield from [input, input2]
+
+
+if __name__ == "__main__":
+ with ResourceResolutionClient("127.0.0.1:9111", use_header_auth=True, header_auth_token="Token test") as client:
+ for response in client.process(generate_messages()):
+ print(response)
+
+```
+
+# ResourceResoulution helper class
+
+## How to use examples
+
+### Insecure channel
+
+```
+from resource_resolution.resource_resolution import ResourceResolution, WorkflowExecution, WorkflowExecutionResult
+
+
+if __name__ == "__main__":
+ with ResourceResolution(use_header_auth=True, header_auth_token="Basic token") as rr:
+ for response in rr.execute_workflows( # type: WorkflowExecutionResult
+ WorkflowExecution(
+ blueprint_name="blueprintName",
+ blueprint_version="1.0",
+ workflow_name="resource-assignment"
+ )
+ ):
+ if response.has_error:
+ print(response.error_message)
+ else:
+ print(response.payload)
``` \ No newline at end of file
diff --git a/ms/py-executor/resource_resolution/authorization.py b/ms/py-executor/resource_resolution/authorization.py
new file mode 100644
index 000000000..ae5954ecc
--- /dev/null
+++ b/ms/py-executor/resource_resolution/authorization.py
@@ -0,0 +1,64 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from collections import namedtuple
+from typing import Any, Callable, List
+
+from grpc import ClientCallDetails, StreamStreamClientInterceptor
+
+
+class NewClientCallDetails(
+ namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), ClientCallDetails
+):
+ """Namedtuple class to store metadata.
+
+ It's impossible to change original metadata in ClientCallDetails object
+ passed as a parameter to intercept method, so this class is going to get
+ original metadata tuple and add the authorization one.
+ """
+
+ pass
+
+
+class AuthTokenInterceptor(StreamStreamClientInterceptor):
+ """Interceptor class to set authorization header.
+
+ Set authorization header (but it can be any header also) for a gRPC call.
+ """
+
+ def __init__(self, token: str, header: str = "authorization") -> None:
+ """Initialize interceptor.
+
+ Set token and header which should be set into call. By default header is "authorization".
+ Header have to be lowercase.
+
+ Args:
+ token (str): Token value to be set.
+ header (str, optional): Header name. It must be lowercase. Defaults to "authorization".
+ """
+ self.token: str = token
+ if not header.islower():
+ raise ValueError("Header must be lowercase.")
+ self.header: str = header
+
+ def intercept_stream_stream(
+ self, continuation: Callable, client_call_details: ClientCallDetails, request_iterator: Any
+ ) -> Any:
+ """Add header into metadata."""
+ metadata: List = list(client_call_details.metadata) if client_call_details.metadata is not None else []
+ metadata.append((self.header, self.token,))
+ new_client_call_details: NewClientCallDetails = NewClientCallDetails(
+ client_call_details.method, client_call_details.timeout, metadata, client_call_details.credentials
+ )
+ return continuation(new_client_call_details, request_iterator)
diff --git a/ms/py-executor/resource_resolution/client.py b/ms/py-executor/resource_resolution/client.py
index 89087745c..fee168628 100644
--- a/ms/py-executor/resource_resolution/client.py
+++ b/ms/py-executor/resource_resolution/client.py
@@ -14,12 +14,20 @@ limitations under the License.
"""
from logging import Logger, getLogger
from types import TracebackType
-from typing import Iterable, List, Optional, Type
-
-from grpc import Channel, insecure_channel, secure_channel, ssl_channel_credentials
+from typing import Iterable, Optional, Type
+
+from grpc import (
+ Channel,
+ insecure_channel,
+ intercept_channel,
+ secure_channel,
+ ssl_channel_credentials,
+)
from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput
from proto.BluePrintProcessing_pb2_grpc import BluePrintProcessingServiceStub
+from .authorization import AuthTokenInterceptor
+
class Client:
"""Resource resoulution client class."""
@@ -28,20 +36,29 @@ class Client:
self,
server_address: str,
*,
+ # TLS/SSL configuration
use_ssl: bool = False,
root_certificates: bytes = None,
private_key: bytes = None,
certificate_chain: bytes = None,
+ # Authentication header configuration
+ use_header_auth: bool = False,
+ header_auth_token: str = None,
) -> None:
"""Client class initialization.
:param server_address: Address to server to connect.
:param use_ssl: Boolean flag to determine if secure channel should be created or not. Keyword argument.
:param root_certificates: The PEM-encoded root certificates. None if it shouldn't be used. Keyword argument.
- :param private_key: The PEM-encoded private key as a byte string, or None if no private key should be used. Keyword argument.
- :param certificate_chain: The PEM-encoded certificate chain as a byte string to use or or None if no certificate chain should be used. Keyword argument.
+ :param private_key: The PEM-encoded private key as a byte string, or None if no private key should be used.
+ Keyword argument.
+ :param certificate_chain: The PEM-encoded certificate chain as a byte string to use or or None if
+ no certificate chain should be used. Keyword argument.
+ :param use_header_auth: Boolean flag to determine if authorization headed shoud be added for every call or not.
+ Keyword argument.
+ :param header_auth_token: Authorization token value. Keyword argument.
"""
- self.logger = getLogger(__name__)
+ self.logger: Logger = getLogger(__name__)
if use_ssl:
self.channel: Channel = secure_channel(
server_address, ssl_channel_credentials(root_certificates, private_key, certificate_chain)
@@ -50,6 +67,8 @@ class Client:
else:
self.channel: Channel = insecure_channel(server_address)
self.logger.debug(f"Create insecure channel to connect to {server_address}")
+ if use_header_auth:
+ self.channel: Channel = intercept_channel(self.channel, AuthTokenInterceptor(header_auth_token))
self.stub: BluePrintProcessingServiceStub = BluePrintProcessingServiceStub(self.channel)
def close(self) -> None:
diff --git a/ms/py-executor/resource_resolution/resource_resolution.py b/ms/py-executor/resource_resolution/resource_resolution.py
new file mode 100644
index 000000000..e4f162f8f
--- /dev/null
+++ b/ms/py-executor/resource_resolution/resource_resolution.py
@@ -0,0 +1,294 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from enum import Enum, unique
+from logging import Logger, getLogger
+from types import TracebackType
+from typing import Any, Dict, Generator, Optional, Type
+
+from google.protobuf import json_format
+
+from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput
+
+from .client import Client
+
+
+@unique
+class WorkflowMode(Enum):
+ """Workflow mode enumerator.
+
+ Workflow can be executed in two modes: synchronously and asynchronously.
+ This enumerator stores valid values to set the mode: SYNC for synchronously mode and ASYNC for asynchronously.
+ """
+
+ SYNC = "sync"
+ ASYNC = "async"
+
+
+class WorkflowExecution:
+ """Wokflow execution class.
+
+ Describes workflow to call. Set blueprint name and version and workflow name to execute.
+ Workflow inputs are optional, by default set to empty directory.
+ Workflow mode is also optional. It is set by default to call workflow synchronously.
+ """
+
+ def __init__(
+ self,
+ blueprint_name: str,
+ blueprint_version: str,
+ workflow_name: str,
+ workflow_inputs: Dict[str, Any] = None,
+ workflow_mode: WorkflowMode = WorkflowMode.SYNC,
+ ) -> None:
+ """Initialize workflow execution.
+
+ Get all needed information to execute workflow.
+
+ Args:
+ blueprint_name (str): Blueprint name to execute workflow from.
+ blueprint_version (str): Blueprint version.
+ workflow_name (str): Name of the workflow to execute
+ workflow_inputs (Dict[str, Any], optional): Key-value workflow inputs. Defaults to None.
+ workflow_mode (WorkflowMode, optional): Workflow execution mode. It can be run synchronously or
+ asynchronously. Defaults to WorkflowMode.SYNC.
+ """
+ self.blueprint_name: str = blueprint_name
+ self.blueprint_version: str = blueprint_version
+ self.workflow_name: str = workflow_name
+ if workflow_inputs is None:
+ workflow_inputs = {}
+ self.workflow_inputs: Dict[str, Any] = workflow_inputs
+ self.workflow_mode: WorkflowMode = workflow_mode
+
+ @property
+ def message(self) -> ExecutionServiceInput:
+ """Workflow execution protobuf message.
+
+ This message is going to be sent to gRPC server to execute workflow.
+
+ Returns:
+ ExecutionServiceInput: Properly filled protobuf message.
+ """
+ execution_msg: ExecutionServiceInput = ExecutionServiceInput()
+ execution_msg.actionIdentifiers.mode = self.workflow_mode.value
+ execution_msg.actionIdentifiers.blueprintName = self.blueprint_name
+ execution_msg.actionIdentifiers.blueprintVersion = self.blueprint_version
+ execution_msg.actionIdentifiers.actionName = self.workflow_name
+ execution_msg.payload.update({f"{self.workflow_name}-request": self.workflow_inputs})
+ return execution_msg
+
+
+class WorkflowExecutionResult:
+ """Result of workflow execution.
+
+ Store both workflow data and the result returns by server.
+ """
+
+ def __init__(self, workflow_execution: WorkflowExecution, execution_output: ExecutionServiceOutput) -> None:
+ """Initialize workflow execution result object.
+
+ Stores workflow execution data and execution result.
+
+ Args:
+ workflow_execution (WorkflowExecution): WorkflowExecution object which was used to call request.
+ execution_output (ExecutionServiceOutput): gRPC server response.
+ """
+ self.workflow_execution: WorkflowExecution = workflow_execution
+ self.execution_output: ExecutionServiceOutput = execution_output
+
+ @property
+ def blueprint_name(self) -> str:
+ """Name of blueprint used to call workflow.
+
+ This value is taken from server response not request (should be the same).
+
+ Returns:
+ str: Blueprint name
+ """
+ return self.execution_output.actionIdentifiers.blueprintName
+
+ @property
+ def blueprint_version(self) -> str:
+ """Blueprint version.
+
+ This value is taken from server response not request (should be the same).
+
+ Returns:
+ str: Blueprint version
+ """
+ return self.execution_output.actionIdentifiers.blueprintVersion
+
+ @property
+ def workflow_name(self) -> str:
+ """Workflow name.
+
+ This value is taken from server response not request (should be the same).
+
+ Returns:
+ str: Workflow name
+ """
+ return self.execution_output.actionIdentifiers.actionName
+
+ @property
+ def has_error(self) -> bool:
+ """Returns bool if request returns error or not.
+
+ Returns:
+ bool: True if response has status code different than 200
+ """
+ return self.execution_output.status.code != 200
+
+ @property
+ def error_message(self) -> str:
+ """Error message.
+
+ This property is available only if response has error. Otherwise AttributeError will be raised.
+
+ Raises:
+ AttributeError: Response has 200 response code and hasn't error message.
+
+ Returns:
+ str: Error message returned by server
+ """
+ if self.has_error:
+ return self.execution_output.status.errorMessage
+ raise AttributeError("Execution does not finish with error")
+
+ @property
+ def payload(self) -> dict:
+ """Response payload.
+
+ Payload retured by the server is migrated to Python dict.
+
+ Returns:
+ dict: Response's payload.
+ """
+ return json_format.MessageToDict(self.execution_output.payload)
+
+
+class ResourceResolution:
+ """Resource resolution class.
+
+ Helper class to connect to blueprintprocessor's gRPC server, send request to execute workflow and parse responses.
+ Blueprint with workflow must be deployed before workflow request.
+ It's possible to create both secre or unsecure connection (without SSL/TLS).
+ """
+
+ def __init__(
+ self,
+ *,
+ server_address: str = "127.0.0.1",
+ server_port: int = "9111",
+ use_ssl: bool = False,
+ root_certificates: bytes = None,
+ private_key: bytes = None,
+ certificate_chain: bytes = None,
+ # Authentication header configuration
+ use_header_auth: bool = False,
+ header_auth_token: str = None,
+ ) -> None:
+ """Resource resolution object initialization.
+
+ Args:
+ server_address (str, optional): gRPC server address. Defaults to "127.0.0.1".
+ server_port (int, optional): gRPC server address port. Defaults to "9111".
+ use_ssl (bool, optional): Boolean flag to determine if secure channel should be created or not.
+ Defaults to False.
+ root_certificates (bytes, optional): The PEM-encoded root certificates. None if it shouldn't be used.
+ Defaults to None.
+ private_key (bytes, optional): The PEM-encoded private key as a byte string, or None if no private key
+ should be used. Defaults to None.
+ certificate_chain (bytes, optional): The PEM-encoded certificate chain as a byte string to use or or None if
+ no certificate chain should be used. Defaults to None.
+ use_header_auth (bool, optional): Boolean flag to determine if authorization headed shoud be added for
+ every call or not. Defaults to False.
+ header_auth_token (str, optional): Authorization token value. Defaults to None.
+ """
+ # Logger
+ self.logger: Logger = getLogger(__name__)
+ # Client settings
+ self.client_server_address: str = server_address
+ self.client_server_port: str = server_port
+ self.client_use_ssl: bool = use_ssl
+ self.client_root_certificates: bytes = root_certificates
+ self.client_private_key: bytes = private_key
+ self.client_certificate_chain: bytes = certificate_chain
+ self.client_use_header_auth: bool = use_header_auth
+ self.client_header_auth_token: str = header_auth_token
+ self.client: Client = None
+
+ def __enter__(self) -> "ResourceResolution":
+ """Enter ResourceResolution instance context.
+
+ Client connection is created.
+ """
+ self.client = Client(
+ server_address=f"{self.client_server_address}:{self.client_server_port}",
+ use_ssl=self.client_use_ssl,
+ root_certificates=self.client_root_certificates,
+ private_key=self.client_private_key,
+ certificate_chain=self.client_certificate_chain,
+ use_header_auth=self.client_use_header_auth,
+ header_auth_token=self.client_header_auth_token,
+ )
+ return self
+
+ def __exit__(
+ self,
+ unused_exc_type: Optional[Type[BaseException]],
+ unused_exc_value: Optional[BaseException],
+ unused_traceback: Optional[TracebackType],
+ ) -> None:
+ """Exit ResourceResolution instance context.
+
+ Client connection is closed.
+ """
+ self.client.close()
+
+ def execute_workflows(self, *workflows: WorkflowExecution) -> Generator[WorkflowExecutionResult, None, None]:
+ """Execute provided workflows.
+
+ Workflows are going to be execured using one gRPC API call. Depends of implementation that may has
+ some consequences. In some cases if any request fails all requests after that won't be called.
+
+ Responses and zipped with workflows and WorkflowExecutionResult object is initialized and yielded.
+
+ Raises:
+ AttributeError: Raises if client object is not created. It occurs only if you not uses context manager.
+ Then user have to create client instance for ResourceResolution object by himself calling:
+ ```
+ resource_resoulution.client = Client(
+ server_address=f"{resource_resoulution.client_server_address}:{resource_resoulution.client_server_port}",
+ use_ssl=resource_resoulution.client_use_ssl,
+ root_certificates=resource_resoulution.client_root_certificates,
+ private_key=resource_resoulution.client_private_key,
+ certificate_chain=resource_resoulution.client_certificate_chain,
+ use_header_auth=resource_resoulution.client_use_header_auth,
+ header_auth_token=resource_resoulution.client_header_auth_token,
+ )
+ ```
+ Remeber also to close client connection.
+
+ Returns:
+ Generator[WorkflowExecutionResult, None, None]: WorkflowExecutionResult object
+ with both WorkflowExection object and server response for it's request.
+ """
+ self.logger.debug("Execute workflows")
+ if not self.client:
+ raise AttributeError("gRPC client not connected")
+
+ for response, workflow in zip(self.client.process((workflow.message for workflow in workflows)), workflows):
+ yield WorkflowExecutionResult(workflow, response)
diff --git a/ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py b/ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py
new file mode 100644
index 000000000..4b03f0b36
--- /dev/null
+++ b/ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py
@@ -0,0 +1,50 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from unittest.mock import MagicMock, _Call
+
+import pytest
+
+from resource_resolution.authorization import AuthTokenInterceptor, NewClientCallDetails
+
+
+def test_resource_resolution_auth_token_interceptor():
+ """Test AuthTokenInterceptor class.
+
+ - Checks if it's correctly set default value.
+ - Checks if it's correctly set passed values.
+ - Checks if it's correctly checked if all header characters are lowercase.
+ - Checks if continuation function is called with headers setted
+ """
+ interceptor: AuthTokenInterceptor = AuthTokenInterceptor("test_token", header="header")
+ assert interceptor.token == "test_token"
+ assert interceptor.header == "header"
+
+ interceptor: AuthTokenInterceptor = AuthTokenInterceptor("test_token")
+ assert interceptor.token == "test_token"
+ assert interceptor.header == "authorization"
+
+ with pytest.raises(ValueError):
+ AuthTokenInterceptor("test_token", header="Auth")
+
+ continuation_mock: MagicMock = MagicMock()
+ client_call_details: MagicMock = MagicMock()
+ request_iterator: MagicMock = MagicMock()
+
+ interceptor.intercept_stream_stream(continuation_mock, client_call_details, request_iterator)
+ continuation_mock.assert_called_once()
+ client_call_details_argument: _Call = continuation_mock.call_args_list[0][0][0] # Get NewClientCallDetails instance
+ assert isinstance(client_call_details_argument, NewClientCallDetails)
+ assert client_call_details_argument.metadata[0] == (interceptor.header, interceptor.token)
diff --git a/ms/py-executor/resource_resolution/tests/resource_resolution_test.py b/ms/py-executor/resource_resolution/tests/resource_resolution_test.py
new file mode 100644
index 000000000..8a41357e6
--- /dev/null
+++ b/ms/py-executor/resource_resolution/tests/resource_resolution_test.py
@@ -0,0 +1,105 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from google.protobuf import json_format
+from pytest import raises
+
+from resource_resolution.resource_resolution import (
+ ExecutionServiceInput,
+ ExecutionServiceOutput,
+ WorkflowExecution,
+ WorkflowExecutionResult,
+ WorkflowMode,
+)
+
+
+def test_workflow_execution_class():
+ """Workflow execution class tests.
+
+ - Test initialization and default values
+ - Test request message formatting
+ """
+ # Without inputs
+ workflow_execution: WorkflowExecution = WorkflowExecution("test blueprint", "test version", "test workflow")
+ assert workflow_execution.blueprint_name == "test blueprint"
+ assert workflow_execution.blueprint_version == "test version"
+ assert workflow_execution.workflow_name == "test workflow"
+ assert workflow_execution.workflow_inputs == {}
+ assert workflow_execution.workflow_mode == WorkflowMode.SYNC
+
+ msg: ExecutionServiceInput = workflow_execution.message
+ msg_dict: dict = json_format.MessageToDict(msg)
+ assert msg_dict["actionIdentifiers"]["blueprintName"] == "test blueprint"
+ assert msg_dict["actionIdentifiers"]["blueprintVersion"] == "test version"
+ assert msg_dict["actionIdentifiers"]["actionName"] == "test workflow"
+ assert msg_dict["actionIdentifiers"]["mode"] == "sync"
+ assert list(msg_dict["payload"].keys())[0] == "test workflow-request"
+ assert msg_dict["payload"]["test workflow-request"] == {}
+
+ # With inputs
+ workflow_execution: WorkflowExecution = WorkflowExecution(
+ "test blueprint2",
+ "test version2",
+ "test workflow2",
+ workflow_inputs={"test": "test"},
+ workflow_mode=WorkflowMode.ASYNC,
+ )
+ assert workflow_execution.blueprint_name == "test blueprint2"
+ assert workflow_execution.blueprint_version == "test version2"
+ assert workflow_execution.workflow_name == "test workflow2"
+ assert workflow_execution.workflow_inputs == {"test": "test"}
+ assert workflow_execution.workflow_mode == WorkflowMode.ASYNC
+
+ msg: ExecutionServiceInput = workflow_execution.message
+ msg_dict: dict = json_format.MessageToDict(msg)
+ assert msg_dict["actionIdentifiers"]["blueprintName"] == "test blueprint2"
+ assert msg_dict["actionIdentifiers"]["blueprintVersion"] == "test version2"
+ assert msg_dict["actionIdentifiers"]["actionName"] == "test workflow2"
+ assert msg_dict["actionIdentifiers"]["mode"] == "async"
+ assert list(msg_dict["payload"].keys())[0] == "test workflow2-request"
+ assert msg_dict["payload"]["test workflow2-request"] == {"test": "test"}
+
+
+def test_workflow_execution_result_class():
+ """Workflow execution result class tests.
+
+ - Test initizalization and default values
+ - Test `has_error` property
+ - Test `error_message` property
+ - Test payload formatting
+ """
+ workflow_execution: WorkflowExecution = WorkflowExecution("test blueprint", "test version", "test workflow")
+ execution_output: ExecutionServiceOutput = ExecutionServiceOutput()
+ execution_output.actionIdentifiers.blueprintName = "test blueprint"
+ execution_output.actionIdentifiers.blueprintVersion = "test version"
+ execution_output.actionIdentifiers.actionName = "test workflow"
+ execution_output.status.code = 200
+
+ execution_result: WorkflowExecutionResult = WorkflowExecutionResult(workflow_execution, execution_output)
+ assert not execution_result.has_error
+ with raises(AttributeError):
+ execution_result.error_message
+ assert execution_result.payload == {}
+ assert execution_result.blueprint_name == "test blueprint"
+ assert execution_result.blueprint_version == "test version"
+ assert execution_result.workflow_name == "test workflow"
+
+ execution_output.payload.update({"test_key": "test_value"})
+ execution_result: WorkflowExecutionResult = WorkflowExecutionResult(workflow_execution, execution_output)
+ assert execution_result.payload == {"test_key": "test_value"}
+
+ execution_output.status.code = 500
+ assert execution_result.has_error
+ assert execution_result.error_message == ""