diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py new file mode 100644 index 0000000..91b95d9 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -0,0 +1,244 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +``ctx`` proxy server implementation. +""" + +import json +import socket +import Queue +import StringIO +import threading +import traceback +import wsgiref.simple_server + +import bottle +from aria import modeling + +from .. import exceptions + + +class CtxProxy(object): + + def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)): + self.ctx = ctx + self._ctx_patcher = ctx_patcher + self.port = _get_unused_port() + self.socket_url = 'http://localhost:{0}'.format(self.port) + self.server = None + self._started = Queue.Queue(1) + self.thread = self._start_server() + self._started.get(timeout=5) + + def _start_server(self): + + class BottleServerAdapter(bottle.ServerAdapter): + proxy = self + + def close_session(self): + self.proxy.ctx.model.log._session.remove() + + def run(self, app): + + class Server(wsgiref.simple_server.WSGIServer): + allow_reuse_address = True + bottle_server = self + + def handle_error(self, request, client_address): + pass + + def serve_forever(self, poll_interval=0.5): + try: + wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval) + finally: + # Once shutdown is called, we need to close the session. + # If the session is not closed properly, it might raise warnings, + # or even lock the database. + self.bottle_server.close_session() + + class Handler(wsgiref.simple_server.WSGIRequestHandler): + def address_string(self): + return self.client_address[0] + + def log_request(*args, **kwargs): # pylint: disable=no-method-argument + if not self.quiet: + return wsgiref.simple_server.WSGIRequestHandler.log_request(*args, + **kwargs) + server = wsgiref.simple_server.make_server( + host=self.host, + port=self.port, + app=app, + server_class=Server, + handler_class=Handler) + self.proxy.server = server + self.proxy._started.put(True) + server.serve_forever(poll_interval=0.1) + + def serve(): + # Since task is a thread_local object, we need to patch it inside the server thread. + self._ctx_patcher(self.ctx) + + bottle_app = bottle.Bottle() + bottle_app.post('/', callback=self._request_handler) + bottle.run( + app=bottle_app, + host='localhost', + port=self.port, + quiet=True, + server=BottleServerAdapter) + thread = threading.Thread(target=serve) + thread.daemon = True + thread.start() + return thread + + def close(self): + if self.server: + self.server.shutdown() + self.server.server_close() + + def _request_handler(self): + request = bottle.request.body.read() # pylint: disable=no-member + response = self._process(request) + return bottle.LocalResponse( + body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder), + status=200, + headers={'content-type': 'application/json'} + ) + + def _process(self, request): + try: + with self.ctx.model.instrument(*self.ctx.INSTRUMENTATION_FIELDS): + payload = _process_request(self.ctx, request) + result_type = 'result' + if isinstance(payload, exceptions.ScriptException): + payload = dict(message=str(payload)) + result_type = 'stop_operation' + result = {'type': result_type, 'payload': payload} + except Exception as e: + traceback_out = StringIO.StringIO() + traceback.print_exc(file=traceback_out) + payload = { + 'type': type(e).__name__, + 'message': str(e), + 'traceback': traceback_out.getvalue() + } + result = {'type': 'error', 'payload': payload} + + return result + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + +class CtxError(RuntimeError): + pass + + +class CtxParsingError(CtxError): + pass + + +def _process_request(ctx, request): + request = json.loads(request) + args = request['args'] + return _process_arguments(ctx, args) + + +def _process_arguments(obj, args): + # Modifying? + try: + # TODO: should there be a way to escape "=" in case it is needed as real argument? + equals_index = args.index('=') # raises ValueError if not found + except ValueError: + equals_index = None + if equals_index is not None: + if equals_index == 0: + raise CtxParsingError('The "=" argument cannot be first') + elif equals_index != len(args) - 2: + raise CtxParsingError('The "=" argument must be penultimate') + modifying = True + modifying_key = args[-3] + modifying_value = args[-1] + args = args[:-3] + else: + modifying = False + modifying_key = None + modifying_value = None + + # Parse all arguments + while len(args) > 0: + obj, args = _process_next_operation(obj, args, modifying) + + if modifying: + if hasattr(obj, '__setitem__'): + # Modify item value (dict, list, and similar) + if isinstance(obj, (list, tuple)): + modifying_key = int(modifying_key) + obj[modifying_key] = modifying_value + elif hasattr(obj, modifying_key): + # Modify object attribute + setattr(obj, modifying_key, modifying_value) + else: + raise CtxError('Cannot modify `{0}` of `{1!r}`'.format(modifying_key, obj)) + + return obj + + +def _process_next_operation(obj, args, modifying): + args = list(args) + arg = args.pop(0) + + # Call? + if arg == '[': + # TODO: should there be a way to escape "[" and "]" in case they are needed as real + # arguments? + try: + closing_index = args.index(']') # raises ValueError if not found + except ValueError: + raise CtxParsingError('Opening "[" without a closing "]') + callable_args = args[:closing_index] + args = args[closing_index + 1:] + if not callable(obj): + raise CtxError('Used "[" and "] on an object that is not callable') + return obj(*callable_args), args + + # Attribute? + if isinstance(arg, basestring): + if hasattr(obj, arg): + return getattr(obj, arg), args + token_sugared = arg.replace('-', '_') + if hasattr(obj, token_sugared): + return getattr(obj, token_sugared), args + + # Item? (dict, lists, and similar) + if hasattr(obj, '__getitem__'): + if modifying and (arg not in obj) and hasattr(obj, '__setitem__'): + # Create nested dict + obj[arg] = {} + return obj[arg], args + + raise CtxParsingError('Cannot parse argument: `{0!r}`'.format(arg)) + + +def _get_unused_port(): + sock = socket.socket() + sock.bind(('127.0.0.1', 0)) + _, port = sock.getsockname() + sock.close() + return port |