summaryrefslogtreecommitdiffstats
path: root/osdfapp.py
diff options
context:
space:
mode:
Diffstat (limited to 'osdfapp.py')
-rwxr-xr-xosdfapp.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/osdfapp.py b/osdfapp.py
new file mode 100755
index 0000000..f854dca
--- /dev/null
+++ b/osdfapp.py
@@ -0,0 +1,168 @@
+# -------------------------------------------------------------------------
+# Copyright (c) 2015-2017 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+"""
+OSDF Manager Main Flask Application
+"""
+
+import sys
+from threading import Thread # for scaling up, may need celery with RabbitMQ or redis
+
+from flask import Flask, request, Response, g
+
+import osdf
+import pydevd
+import json
+import osdf.adapters.policy.interface
+import osdf.config.credentials
+import osdf.config.loader
+import osdf.datasources.aai.aai_local_cached_data
+import osdf.operation.error_handling
+import osdf.operation.responses
+import traceback
+from osdf.adapters.policy.interface import get_policies
+from osdf.adapters.response_parsing.aots_ueb_cm_data import aots_ds_ueb_listener
+from osdf.config.base import osdf_config, DOCKER_CM_OPTIMIZER, AOTS_CM_MESSAGE_BUS
+from osdf.optimizers.cmopt.rcscheduler.local_opt_processor import process_local_cm_scheduler_opt
+from osdf.optimizers.placementopt.conductor.remote_opt_processor import process_placement_opt
+from osdf.webapp.appcontroller import auth_basic
+from optparse import OptionParser
+from osdf.operation.exceptions import BusinessException
+from osdf.operation.error_handling import request_exception_to_json_body, internal_error_message
+from requests import RequestException
+from schematics.exceptions import DataError
+from osdf.logging.osdf_logging import MH, audit_log, error_log
+from osdf.models.placementRequest import PlacementAPI
+from osdf.models.schedulerRequest import SchedulerAPI
+
+ERROR_TEMPLATE = osdf.ERROR_TEMPLATE
+
+app = Flask(__name__)
+
+
+
+BAD_CLIENT_REQUEST_MESSAGE = 'Client sent an invalid request'
+
+# An exception explicitly raised due to some business rule
+@app.errorhandler(BusinessException)
+def handle_business_exception(e):
+ error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc()))
+ err_msg = ERROR_TEMPLATE.render(description=str(e))
+ response = Response(err_msg, content_type='application/json; charset=utf-8')
+ response.status_code = 400
+ return response
+
+# Returns a detailed synchronous message to the calling client when osdf fails due to a remote call to another system
+@app.errorhandler(RequestException)
+def handle_request_exception(e):
+ error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc()))
+ err_msg = request_exception_to_json_body(e)
+ response = Response(err_msg, content_type='application/json; charset=utf-8')
+ response.status_code = 400
+ return response
+
+# Returns a detailed message to the calling client when the initial synchronous message is invalid
+@app.errorhandler(DataError)
+def handle_data_error(e):
+ error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc()))
+
+ body_dictionary = {
+ "serviceException": {
+ "text": BAD_CLIENT_REQUEST_MESSAGE,
+ "exceptionMessage": str(e.messages),
+ "errorType": "InvalidClientRequest"
+ }
+ }
+
+ body_as_json = json.dumps(body_dictionary)
+ response = Response(body_as_json, content_type='application/json; charset=utf-8')
+ response.status_code = 400
+ return response
+
+
+@app.route("/osdf/api/v2/placement", methods=["POST"])
+@auth_basic.login_required
+def do_placement_opt():
+ """Perform placement optimization after validating the request and fetching policies
+ Make a call to the call-back URL with the output of the placement request.
+ Note: Call to Conductor for placement optimization may have redirects, so account for them
+ """
+ request_json = request.get_json()
+ req_id = request_json['requestInfo']['requestId']
+ g.request_id = req_id
+ audit_log.info(MH.received_request(request.url, request.remote_addr, json.dumps(request_json)))
+
+ PlacementAPI(request_json).validate()
+
+ # Currently policies are being used only during placement, so only fetch them if placement demands is not empty
+ policies = {}
+
+ if 'placementDemand' in request_json['placementInfo']['demandInfo']:
+ policies, prov_status = get_policies(request_json, "placement")
+
+ audit_log.info(MH.new_worker_thread(req_id, "[for placement]"))
+ t = Thread(target=process_placement_opt, args=(request_json, policies, osdf_config, prov_status))
+ t.start()
+ audit_log.info(MH.accepted_valid_request(req_id, request))
+ return osdf.operation.responses.osdf_response_for_request_accept(
+ req_id=req_id, text="Accepted placement request. Response will be posted to callback URL")
+
+
+# Returned when unexpected coding errors occur during initial synchronous processing
+@app.errorhandler(500)
+def interal_failure(error):
+ error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc()))
+ response = Response(internal_error_message, content_type='application/json; charset=utf-8')
+ response.status_code = 500
+ return response
+
+
+def getOptions(argv):
+ program_version_string = '%%prog %s' % ("v1.0")
+ #program_usage = '''usage: spam two eggs''' # optional - will be autogenerated by optparse
+ program_longdesc = ""
+ program_license = ""
+
+ # setup option parser
+ parser = OptionParser(version=program_version_string, epilog=program_longdesc, description=program_license)
+ parser.add_option("-l", "--local", dest="local", help="run locally", action="store_true", default=False)
+ parser.add_option("-t", "--devtest", dest="devtest", help="run in dev/test environment", action="store_true", default=False)
+ parser.add_option("-d", "--debughost", dest="debughost", help="IP Address of host running debug server", default='')
+ parser.add_option("-p", "--debugport", dest="debugport", help="Port number of debug server", type=int, default=5678)
+ (opts, args) = parser.parse_args(argv)
+ if (opts.debughost != ''):
+ print('pydevd.settrace(%s, port=%s)' % (opts.debughost, opts.debugport))
+ pydevd.settrace(opts.debughost, port=opts.debugport)
+ return opts
+
+
+if __name__ == "__main__":
+
+ sys_conf = osdf_config['core']['osdf_system']
+ ports = sys_conf['osdf_ports']
+ internal_port, external_port = ports['internal'], ports['external']
+ ssl_context = tuple(sys_conf['ssl_context'])
+
+ common_app_opts = dict(host='0.0.0.0', threaded=True, use_reloader=False)
+
+ opts = getOptions(sys.argv)
+ if (opts.local == False and opts.devtest == False): # normal deployment
+ app.run(port=internal_port, ssl_context=ssl_context, debug=False, **common_app_opts)
+ else:
+ port = internal_port if opts.local == True else external_port
+ app.run(port=port, debug=True, **common_app_opts)