diff options
Diffstat (limited to 'ms/py-executor/resource_resolution/grpc')
-rw-r--r-- | ms/py-executor/resource_resolution/grpc/__init__.py | 16 | ||||
-rw-r--r-- | ms/py-executor/resource_resolution/grpc/authorization.py | 64 | ||||
-rw-r--r-- | ms/py-executor/resource_resolution/grpc/client.py | 112 |
3 files changed, 192 insertions, 0 deletions
diff --git a/ms/py-executor/resource_resolution/grpc/__init__.py b/ms/py-executor/resource_resolution/grpc/__init__.py new file mode 100644 index 000000000..1894680a6 --- /dev/null +++ b/ms/py-executor/resource_resolution/grpc/__init__.py @@ -0,0 +1,16 @@ +"""Copyright 2020 Deutsche Telekom. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from .client import Client diff --git a/ms/py-executor/resource_resolution/grpc/authorization.py b/ms/py-executor/resource_resolution/grpc/authorization.py new file mode 100644 index 000000000..ae5954ecc --- /dev/null +++ b/ms/py-executor/resource_resolution/grpc/authorization.py @@ -0,0 +1,64 @@ +"""Copyright 2020 Deutsche Telekom. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from collections import namedtuple +from typing import Any, Callable, List + +from grpc import ClientCallDetails, StreamStreamClientInterceptor + + +class NewClientCallDetails( + namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), ClientCallDetails +): + """Namedtuple class to store metadata. + + It's impossible to change original metadata in ClientCallDetails object + passed as a parameter to intercept method, so this class is going to get + original metadata tuple and add the authorization one. + """ + + pass + + +class AuthTokenInterceptor(StreamStreamClientInterceptor): + """Interceptor class to set authorization header. + + Set authorization header (but it can be any header also) for a gRPC call. + """ + + def __init__(self, token: str, header: str = "authorization") -> None: + """Initialize interceptor. + + Set token and header which should be set into call. By default header is "authorization". + Header have to be lowercase. + + Args: + token (str): Token value to be set. + header (str, optional): Header name. It must be lowercase. Defaults to "authorization". + """ + self.token: str = token + if not header.islower(): + raise ValueError("Header must be lowercase.") + self.header: str = header + + def intercept_stream_stream( + self, continuation: Callable, client_call_details: ClientCallDetails, request_iterator: Any + ) -> Any: + """Add header into metadata.""" + metadata: List = list(client_call_details.metadata) if client_call_details.metadata is not None else [] + metadata.append((self.header, self.token,)) + new_client_call_details: NewClientCallDetails = NewClientCallDetails( + client_call_details.method, client_call_details.timeout, metadata, client_call_details.credentials + ) + return continuation(new_client_call_details, request_iterator) diff --git a/ms/py-executor/resource_resolution/grpc/client.py b/ms/py-executor/resource_resolution/grpc/client.py new file mode 100644 index 000000000..fee168628 --- /dev/null +++ b/ms/py-executor/resource_resolution/grpc/client.py @@ -0,0 +1,112 @@ +"""Copyright 2019 Deutsche Telekom. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from logging import Logger, getLogger +from types import TracebackType +from typing import Iterable, Optional, Type + +from grpc import ( + Channel, + insecure_channel, + intercept_channel, + secure_channel, + ssl_channel_credentials, +) +from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput +from proto.BluePrintProcessing_pb2_grpc import BluePrintProcessingServiceStub + +from .authorization import AuthTokenInterceptor + + +class Client: + """Resource resoulution client class.""" + + def __init__( + self, + server_address: str, + *, + # TLS/SSL configuration + use_ssl: bool = False, + root_certificates: bytes = None, + private_key: bytes = None, + certificate_chain: bytes = None, + # Authentication header configuration + use_header_auth: bool = False, + header_auth_token: str = None, + ) -> None: + """Client class initialization. + + :param server_address: Address to server to connect. + :param use_ssl: Boolean flag to determine if secure channel should be created or not. Keyword argument. + :param root_certificates: The PEM-encoded root certificates. None if it shouldn't be used. Keyword argument. + :param private_key: The PEM-encoded private key as a byte string, or None if no private key should be used. + Keyword argument. + :param certificate_chain: The PEM-encoded certificate chain as a byte string to use or or None if + no certificate chain should be used. Keyword argument. + :param use_header_auth: Boolean flag to determine if authorization headed shoud be added for every call or not. + Keyword argument. + :param header_auth_token: Authorization token value. Keyword argument. + """ + self.logger: Logger = getLogger(__name__) + if use_ssl: + self.channel: Channel = secure_channel( + server_address, ssl_channel_credentials(root_certificates, private_key, certificate_chain) + ) + self.logger.debug(f"Create secure channel to connect with {server_address}") + else: + self.channel: Channel = insecure_channel(server_address) + self.logger.debug(f"Create insecure channel to connect to {server_address}") + if use_header_auth: + self.channel: Channel = intercept_channel(self.channel, AuthTokenInterceptor(header_auth_token)) + self.stub: BluePrintProcessingServiceStub = BluePrintProcessingServiceStub(self.channel) + + def close(self) -> None: + """Close client session. + + Closes client's channel. + """ + self.logger.debug("Close channel connection") + self.channel.close() + + def __enter__(self) -> Channel: + """Enter Client instance context. + + Return Client instance. In the context user can call methods to communicate with server. + On exit connection with the server is going to be closed. + """ + self.logger.debug("Enter Client instance context") + return self + + def __exit__( + self, + unused_exc_type: Optional[Type[BaseException]], + unused_exc_value: Optional[BaseException], + unused_traceback: Optional[TracebackType], + ) -> None: + """Exit Client instance context. + + Close connection with the server. + """ + self.logger.debug("Exit Client instance context") + self.close() + + def process(self, messages: Iterable[ExecutionServiceInput]) -> Iterable[ExecutionServiceOutput]: + """Send messages to server and return responses. + + :param messages: Iterable messages to send + :return: Iterable responses + """ + for message in self.stub.process(messages): + self.logger.debug(f"Get response message: {message}") + yield message |