summaryrefslogtreecommitdiffstats
path: root/ms/py-executor/resource_resolution/grpc
diff options
context:
space:
mode:
authorMichal Jagiello <michal.jagiello@t-mobile.pl>2020-03-27 12:16:22 +0000
committerMichal Jagiello <michal.jagiello@t-mobile.pl>2020-04-28 08:26:32 +0000
commitdaf7bf3b0726c9574f9f1b7aa34af4f199ee32c3 (patch)
tree023d8ce4a7a2e599cdc7a7c9d1a38e57e6d73b79 /ms/py-executor/resource_resolution/grpc
parent4dccd00d6b1399596ed6f1e0c7f08cdb98cbec4f (diff)
PyExecutor ResourceResolution store/retrieve templates
Issue-ID: CCSDK-2156 Signed-off-by: Michal Jagiello <michal.jagiello@t-mobile.pl> Change-Id: I59df2772d004e349532a1b42c4e4abd367c13256
Diffstat (limited to 'ms/py-executor/resource_resolution/grpc')
-rw-r--r--ms/py-executor/resource_resolution/grpc/__init__.py16
-rw-r--r--ms/py-executor/resource_resolution/grpc/authorization.py64
-rw-r--r--ms/py-executor/resource_resolution/grpc/client.py112
3 files changed, 192 insertions, 0 deletions
diff --git a/ms/py-executor/resource_resolution/grpc/__init__.py b/ms/py-executor/resource_resolution/grpc/__init__.py
new file mode 100644
index 000000000..1894680a6
--- /dev/null
+++ b/ms/py-executor/resource_resolution/grpc/__init__.py
@@ -0,0 +1,16 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from .client import Client
diff --git a/ms/py-executor/resource_resolution/grpc/authorization.py b/ms/py-executor/resource_resolution/grpc/authorization.py
new file mode 100644
index 000000000..ae5954ecc
--- /dev/null
+++ b/ms/py-executor/resource_resolution/grpc/authorization.py
@@ -0,0 +1,64 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from collections import namedtuple
+from typing import Any, Callable, List
+
+from grpc import ClientCallDetails, StreamStreamClientInterceptor
+
+
+class NewClientCallDetails(
+ namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), ClientCallDetails
+):
+ """Namedtuple class to store metadata.
+
+ It's impossible to change original metadata in ClientCallDetails object
+ passed as a parameter to intercept method, so this class is going to get
+ original metadata tuple and add the authorization one.
+ """
+
+ pass
+
+
+class AuthTokenInterceptor(StreamStreamClientInterceptor):
+ """Interceptor class to set authorization header.
+
+ Set authorization header (but it can be any header also) for a gRPC call.
+ """
+
+ def __init__(self, token: str, header: str = "authorization") -> None:
+ """Initialize interceptor.
+
+ Set token and header which should be set into call. By default header is "authorization".
+ Header have to be lowercase.
+
+ Args:
+ token (str): Token value to be set.
+ header (str, optional): Header name. It must be lowercase. Defaults to "authorization".
+ """
+ self.token: str = token
+ if not header.islower():
+ raise ValueError("Header must be lowercase.")
+ self.header: str = header
+
+ def intercept_stream_stream(
+ self, continuation: Callable, client_call_details: ClientCallDetails, request_iterator: Any
+ ) -> Any:
+ """Add header into metadata."""
+ metadata: List = list(client_call_details.metadata) if client_call_details.metadata is not None else []
+ metadata.append((self.header, self.token,))
+ new_client_call_details: NewClientCallDetails = NewClientCallDetails(
+ client_call_details.method, client_call_details.timeout, metadata, client_call_details.credentials
+ )
+ return continuation(new_client_call_details, request_iterator)
diff --git a/ms/py-executor/resource_resolution/grpc/client.py b/ms/py-executor/resource_resolution/grpc/client.py
new file mode 100644
index 000000000..fee168628
--- /dev/null
+++ b/ms/py-executor/resource_resolution/grpc/client.py
@@ -0,0 +1,112 @@
+"""Copyright 2019 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from logging import Logger, getLogger
+from types import TracebackType
+from typing import Iterable, Optional, Type
+
+from grpc import (
+ Channel,
+ insecure_channel,
+ intercept_channel,
+ secure_channel,
+ ssl_channel_credentials,
+)
+from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput
+from proto.BluePrintProcessing_pb2_grpc import BluePrintProcessingServiceStub
+
+from .authorization import AuthTokenInterceptor
+
+
+class Client:
+ """Resource resoulution client class."""
+
+ def __init__(
+ self,
+ server_address: str,
+ *,
+ # TLS/SSL configuration
+ use_ssl: bool = False,
+ root_certificates: bytes = None,
+ private_key: bytes = None,
+ certificate_chain: bytes = None,
+ # Authentication header configuration
+ use_header_auth: bool = False,
+ header_auth_token: str = None,
+ ) -> None:
+ """Client class initialization.
+
+ :param server_address: Address to server to connect.
+ :param use_ssl: Boolean flag to determine if secure channel should be created or not. Keyword argument.
+ :param root_certificates: The PEM-encoded root certificates. None if it shouldn't be used. Keyword argument.
+ :param private_key: The PEM-encoded private key as a byte string, or None if no private key should be used.
+ Keyword argument.
+ :param certificate_chain: The PEM-encoded certificate chain as a byte string to use or or None if
+ no certificate chain should be used. Keyword argument.
+ :param use_header_auth: Boolean flag to determine if authorization headed shoud be added for every call or not.
+ Keyword argument.
+ :param header_auth_token: Authorization token value. Keyword argument.
+ """
+ self.logger: Logger = getLogger(__name__)
+ if use_ssl:
+ self.channel: Channel = secure_channel(
+ server_address, ssl_channel_credentials(root_certificates, private_key, certificate_chain)
+ )
+ self.logger.debug(f"Create secure channel to connect with {server_address}")
+ else:
+ self.channel: Channel = insecure_channel(server_address)
+ self.logger.debug(f"Create insecure channel to connect to {server_address}")
+ if use_header_auth:
+ self.channel: Channel = intercept_channel(self.channel, AuthTokenInterceptor(header_auth_token))
+ self.stub: BluePrintProcessingServiceStub = BluePrintProcessingServiceStub(self.channel)
+
+ def close(self) -> None:
+ """Close client session.
+
+ Closes client's channel.
+ """
+ self.logger.debug("Close channel connection")
+ self.channel.close()
+
+ def __enter__(self) -> Channel:
+ """Enter Client instance context.
+
+ Return Client instance. In the context user can call methods to communicate with server.
+ On exit connection with the server is going to be closed.
+ """
+ self.logger.debug("Enter Client instance context")
+ return self
+
+ def __exit__(
+ self,
+ unused_exc_type: Optional[Type[BaseException]],
+ unused_exc_value: Optional[BaseException],
+ unused_traceback: Optional[TracebackType],
+ ) -> None:
+ """Exit Client instance context.
+
+ Close connection with the server.
+ """
+ self.logger.debug("Exit Client instance context")
+ self.close()
+
+ def process(self, messages: Iterable[ExecutionServiceInput]) -> Iterable[ExecutionServiceOutput]:
+ """Send messages to server and return responses.
+
+ :param messages: Iterable messages to send
+ :return: Iterable responses
+ """
+ for message in self.stub.process(messages):
+ self.logger.debug(f"Get response message: {message}")
+ yield message