summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py244
1 files changed, 244 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py
new file mode 100644
index 0000000..91b95d9
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -0,0 +1,244 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+``ctx`` proxy server implementation.
+"""
+
+import json
+import socket
+import Queue
+import StringIO
+import threading
+import traceback
+import wsgiref.simple_server
+
+import bottle
+from aria import modeling
+
+from .. import exceptions
+
+
+class CtxProxy(object):
+
+ def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)):
+ self.ctx = ctx
+ self._ctx_patcher = ctx_patcher
+ self.port = _get_unused_port()
+ self.socket_url = 'http://localhost:{0}'.format(self.port)
+ self.server = None
+ self._started = Queue.Queue(1)
+ self.thread = self._start_server()
+ self._started.get(timeout=5)
+
+ def _start_server(self):
+
+ class BottleServerAdapter(bottle.ServerAdapter):
+ proxy = self
+
+ def close_session(self):
+ self.proxy.ctx.model.log._session.remove()
+
+ def run(self, app):
+
+ class Server(wsgiref.simple_server.WSGIServer):
+ allow_reuse_address = True
+ bottle_server = self
+
+ def handle_error(self, request, client_address):
+ pass
+
+ def serve_forever(self, poll_interval=0.5):
+ try:
+ wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
+ finally:
+ # Once shutdown is called, we need to close the session.
+ # If the session is not closed properly, it might raise warnings,
+ # or even lock the database.
+ self.bottle_server.close_session()
+
+ class Handler(wsgiref.simple_server.WSGIRequestHandler):
+ def address_string(self):
+ return self.client_address[0]
+
+ def log_request(*args, **kwargs): # pylint: disable=no-method-argument
+ if not self.quiet:
+ return wsgiref.simple_server.WSGIRequestHandler.log_request(*args,
+ **kwargs)
+ server = wsgiref.simple_server.make_server(
+ host=self.host,
+ port=self.port,
+ app=app,
+ server_class=Server,
+ handler_class=Handler)
+ self.proxy.server = server
+ self.proxy._started.put(True)
+ server.serve_forever(poll_interval=0.1)
+
+ def serve():
+ # Since task is a thread_local object, we need to patch it inside the server thread.
+ self._ctx_patcher(self.ctx)
+
+ bottle_app = bottle.Bottle()
+ bottle_app.post('/', callback=self._request_handler)
+ bottle.run(
+ app=bottle_app,
+ host='localhost',
+ port=self.port,
+ quiet=True,
+ server=BottleServerAdapter)
+ thread = threading.Thread(target=serve)
+ thread.daemon = True
+ thread.start()
+ return thread
+
+ def close(self):
+ if self.server:
+ self.server.shutdown()
+ self.server.server_close()
+
+ def _request_handler(self):
+ request = bottle.request.body.read() # pylint: disable=no-member
+ response = self._process(request)
+ return bottle.LocalResponse(
+ body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder),
+ status=200,
+ headers={'content-type': 'application/json'}
+ )
+
+ def _process(self, request):
+ try:
+ with self.ctx.model.instrument(*self.ctx.INSTRUMENTATION_FIELDS):
+ payload = _process_request(self.ctx, request)
+ result_type = 'result'
+ if isinstance(payload, exceptions.ScriptException):
+ payload = dict(message=str(payload))
+ result_type = 'stop_operation'
+ result = {'type': result_type, 'payload': payload}
+ except Exception as e:
+ traceback_out = StringIO.StringIO()
+ traceback.print_exc(file=traceback_out)
+ payload = {
+ 'type': type(e).__name__,
+ 'message': str(e),
+ 'traceback': traceback_out.getvalue()
+ }
+ result = {'type': 'error', 'payload': payload}
+
+ return result
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
+
+class CtxError(RuntimeError):
+ pass
+
+
+class CtxParsingError(CtxError):
+ pass
+
+
+def _process_request(ctx, request):
+ request = json.loads(request)
+ args = request['args']
+ return _process_arguments(ctx, args)
+
+
+def _process_arguments(obj, args):
+ # Modifying?
+ try:
+ # TODO: should there be a way to escape "=" in case it is needed as real argument?
+ equals_index = args.index('=') # raises ValueError if not found
+ except ValueError:
+ equals_index = None
+ if equals_index is not None:
+ if equals_index == 0:
+ raise CtxParsingError('The "=" argument cannot be first')
+ elif equals_index != len(args) - 2:
+ raise CtxParsingError('The "=" argument must be penultimate')
+ modifying = True
+ modifying_key = args[-3]
+ modifying_value = args[-1]
+ args = args[:-3]
+ else:
+ modifying = False
+ modifying_key = None
+ modifying_value = None
+
+ # Parse all arguments
+ while len(args) > 0:
+ obj, args = _process_next_operation(obj, args, modifying)
+
+ if modifying:
+ if hasattr(obj, '__setitem__'):
+ # Modify item value (dict, list, and similar)
+ if isinstance(obj, (list, tuple)):
+ modifying_key = int(modifying_key)
+ obj[modifying_key] = modifying_value
+ elif hasattr(obj, modifying_key):
+ # Modify object attribute
+ setattr(obj, modifying_key, modifying_value)
+ else:
+ raise CtxError('Cannot modify `{0}` of `{1!r}`'.format(modifying_key, obj))
+
+ return obj
+
+
+def _process_next_operation(obj, args, modifying):
+ args = list(args)
+ arg = args.pop(0)
+
+ # Call?
+ if arg == '[':
+ # TODO: should there be a way to escape "[" and "]" in case they are needed as real
+ # arguments?
+ try:
+ closing_index = args.index(']') # raises ValueError if not found
+ except ValueError:
+ raise CtxParsingError('Opening "[" without a closing "]')
+ callable_args = args[:closing_index]
+ args = args[closing_index + 1:]
+ if not callable(obj):
+ raise CtxError('Used "[" and "] on an object that is not callable')
+ return obj(*callable_args), args
+
+ # Attribute?
+ if isinstance(arg, basestring):
+ if hasattr(obj, arg):
+ return getattr(obj, arg), args
+ token_sugared = arg.replace('-', '_')
+ if hasattr(obj, token_sugared):
+ return getattr(obj, token_sugared), args
+
+ # Item? (dict, lists, and similar)
+ if hasattr(obj, '__getitem__'):
+ if modifying and (arg not in obj) and hasattr(obj, '__setitem__'):
+ # Create nested dict
+ obj[arg] = {}
+ return obj[arg], args
+
+ raise CtxParsingError('Cannot parse argument: `{0!r}`'.format(arg))
+
+
+def _get_unused_port():
+ sock = socket.socket()
+ sock.bind(('127.0.0.1', 0))
+ _, port = sock.getsockname()
+ sock.close()
+ return port