diff options
author | Sastry Isukapalli <sastry@research.att.com> | 2018-01-11 14:37:39 -0500 |
---|---|---|
committer | Sastry Isukapalli <sastry@research.att.com> | 2018-01-11 14:38:50 -0500 |
commit | 7167d1a0c3c30afbba61edb593580cbf7244e52c (patch) | |
tree | 3686d1878449ff552687539dc5869ee5ea2697bb | |
parent | d51c20ced007294d86e560f3bc9f5812cc17f193 (diff) |
OOF design framework seed code
Issue-ID: OPTFRA-18
Change-Id: Ib44b06f7184cb49f90d5936edd82cba44068b1c8
Signed-off-by: Sastry Isukapalli <sastry@research.att.com>
-rw-r--r-- | config/__init__.py | 32 | ||||
-rw-r--r-- | config/base.py | 36 | ||||
-rw-r--r-- | config/credentials.py | 60 | ||||
-rw-r--r-- | config/loader.py | 51 | ||||
-rw-r--r-- | operation/__init__.py | 0 | ||||
-rw-r--r-- | operation/error_handling.py | 93 | ||||
-rw-r--r-- | operation/exceptions.py | 40 | ||||
-rw-r--r-- | operation/responses.py | 39 | ||||
-rwxr-xr-x | osdfapp.py | 168 | ||||
-rwxr-xr-x | osdfapp.sh | 53 | ||||
-rw-r--r-- | utils/__init__.py | 0 | ||||
-rw-r--r-- | utils/data_conversion.py | 62 | ||||
-rw-r--r-- | utils/data_types.py | 30 | ||||
-rw-r--r-- | utils/interfaces.py | 90 | ||||
-rw-r--r-- | utils/local_processing.py | 43 | ||||
-rw-r--r-- | utils/programming_utils.py | 105 | ||||
-rw-r--r-- | webapp/__init__.py | 0 | ||||
-rw-r--r-- | webapp/appcontroller.py | 47 |
18 files changed, 949 insertions, 0 deletions
diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..303a8ce --- /dev/null +++ b/config/__init__.py @@ -0,0 +1,32 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- + + +import yaml +import json + +from osdf.utils.programming_utils import MetaSingleton + + +class CoreConfig(metaclass=MetaSingleton): + core_config = None + + def get_core_config(self, config_file=None): + if self.core_config is None: + self.core_config = yaml.load(open(config_file)) + return self.core_config + diff --git a/config/base.py b/config/base.py new file mode 100644 index 0000000..b8aacff --- /dev/null +++ b/config/base.py @@ -0,0 +1,36 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import os + +import osdf.config.loader as config_loader +import osdf.config.credentials as creds +from osdf.utils.programming_utils import DotDict + +config_spec = { + "deployment": os.environ.get("OSDF_MANAGER_CONFIG_FILE", "config/osdf_config.yaml"), + "core": "config/common_config.yaml" + } + +osdf_config = DotDict(config_loader.all_configs(**config_spec)) + +http_basic_auth_credentials = creds.load_credentials(osdf_config) + +dmaap_creds = creds.dmaap_creds() + +creds_prefixes = {"so": "so", "cm": "cmPortal"} diff --git a/config/credentials.py b/config/credentials.py new file mode 100644 index 0000000..e5a6399 --- /dev/null +++ b/config/credentials.py @@ -0,0 +1,60 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json + +from osdf import auth_groups, userid_suffix, passwd_suffix + + +def dmaap_creds(dmaap_file="/etc/dcae/dmaap.conf"): + """Get DMaaP credentials from DCAE for publish and subscribe""" + try: + dmaap_creds = _get_dmaap_creds(dmaap_file) + except: + dmaap_creds = {} + return dmaap_creds + + +def _get_dmaap_creds(dmaap_file): + """Get DMaaP credentials from DCAE for publish and subscribe""" + streams = json.load(open(dmaap_file, 'r')) + pubs = [x for x in streams + if x['dmaapStreamId'] == 'requests' and x['dmaapAction'] == 'publish'] + subs = [x for x in streams + if x['dmaapStreamId'] == 'responses' and x['dmaapAction'] == 'subscribe'] + + def get_dmaap_info(x): + """Get DMaaP credentials from dmaap_object 'x'""" + return dict(url=x.get('dmaapUrl'), userid=x.get('dmaapUserName'), passwd=x.get('dmaapPassword')) + + return {'pub': get_dmaap_info(pubs[0]), 'sub': get_dmaap_info(subs[0])} + + +def load_credentials(osdf_config): + """Get credentials as dictionaries grouped by auth_group (e.g. creds["Placement"]["user1"] = "pass1")""" + creds = dict((x, dict()) for x in auth_groups) # each auth group has userid, passwd dict + suffix_start = len(userid_suffix) + + config = osdf_config.deployment + + for element, username in config.items(): + for x in auth_groups: + if element.startswith("osdf" + x) and element.endswith(userid_suffix): + passwd = config[element[:-suffix_start] + passwd_suffix] + creds[x][username] = passwd + return creds diff --git a/config/loader.py b/config/loader.py new file mode 100644 index 0000000..7cb363a --- /dev/null +++ b/config/loader.py @@ -0,0 +1,51 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json + +import yaml + + +def load_config_file(config_file: str, child_name="dockerConfiguration") -> dict: + """ + Load OSDF configuration from a file -- currently only yaml/json are supported + :param config_file: path to config file (.yaml or .json). + :param child_name: if present, return only that child node + :return: config (all or specific child node) + """ + with open(config_file, 'r') as fid: + res = {} + if config_file.endswith(".yaml"): + res = yaml.load(fid) + elif config_file.endswith(".json") or config_file.endswith("json"): + res = json.load(fid) + return res.get(child_name, res) if child_name else res + + +def dcae_config(config_file: str) -> dict: + return load_config_file(config_file, child_name="dockerConfiguration") + + +def all_configs(**kwargs: dict) -> dict: + """ + Load all specified configurations + :param config_file_spec: key-value pairs + (e.g. { "core": "common_config.yaml", "deployment": "/tmp/1452523532json" }) + :return: merged config as a nested dictionary + """ + return {k: load_config_file(fname) for k, fname in kwargs.items()} diff --git a/operation/__init__.py b/operation/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/operation/__init__.py diff --git a/operation/error_handling.py b/operation/error_handling.py new file mode 100644 index 0000000..dfb0848 --- /dev/null +++ b/operation/error_handling.py @@ -0,0 +1,93 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json + +from schematics.exceptions import DataError + +from requests import RequestException +from requests import ConnectionError, HTTPError, Timeout +from osdf.operation.exceptions import BusinessException + +import osdf + +ERROR_TEMPLATE = osdf.ERROR_TEMPLATE + +MESSAGE_BASE = "A solution couldn't be determined because an external application" +HTTP_ERROR_MESSAGE = MESSAGE_BASE + " returned a HTTP error" +TIMEOUT_ERROR_MESSAGE = MESSAGE_BASE + " could not respond in time, please check the external application" +CONNECTION_ERROR_MESSAGE = MESSAGE_BASE + " could not be reached" + +internal_error_body = { + "serviceException": { + "text": "Unhandled internal exception, request could not be processed" + } +} + +internal_error_message = json.dumps(internal_error_body) + + +def build_json_error_body(error): + if isinstance(error,RequestException): + return request_exception_to_json_body(error) + elif isinstance(error, DataError): + return data_error_to_json_body(error) + elif type(error) is BusinessException: # return the error message, because it is well formatted + return ERROR_TEMPLATE.render(description=str(error)) + else: + return internal_error_message + + +def data_error_to_json_body(error): + description = str(error).replace('"', '\\"') + error_message = ERROR_TEMPLATE.render(description=description) + return error_message + + +def request_exception_to_json_body(error): + friendly_message = "A request exception has occurred when contacting an external system" + if type(error) is HTTPError: + friendly_message = HTTP_ERROR_MESSAGE + if type(error) is ConnectionError: + friendly_message = CONNECTION_ERROR_MESSAGE + if type(error) is Timeout: + friendly_message = TIMEOUT_ERROR_MESSAGE + + eie_body = { + "serviceException": { + "text": friendly_message, + "errorType": "InterfaceError" + }, + "externalApplicationDetails": { + "httpMethod": error.request.method, + "url": error.request.url + } + } + + response = error.response + + if response is not None: + eie_body['externalApplicationDetails']['httpStatusCode'] = response.status_code + content_type = response.headers.get('content-type') + if content_type is not None: + if 'application/json' in content_type: + eie_body['externalApplicationDetails']['responseMessage'] = response.json() + elif 'text/html' in content_type: + eie_body['externalApplicationDetails']['responseMessage'] = response.text + error_message = json.dumps(eie_body) + return error_message diff --git a/operation/exceptions.py b/operation/exceptions.py new file mode 100644 index 0000000..5277b01 --- /dev/null +++ b/operation/exceptions.py @@ -0,0 +1,40 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +class BusinessException(Exception): + pass + + +class MessageBusConfigurationException(Exception): + pass + + +class CMDataError(Exception): + pass + + +class CMSOExecutionError(Exception): + pass + + +class CMSOCallBackError(Exception): + pass + + +class CMSOInvalidRequestException(Exception): + pass diff --git a/operation/responses.py b/operation/responses.py new file mode 100644 index 0000000..22a94f7 --- /dev/null +++ b/operation/responses.py @@ -0,0 +1,39 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from flask import Response + +from osdf import ACCEPTED_MESSAGE_TEMPLATE + + +def osdf_response_for_request_accept(req_id="", text="", response_code=202, as_http=True): + """Helper method to create a response object for request acceptance, so that the object can be sent to a client + :param req_id: request ID provided by the caller + :param text: extra text description about accepting the request (e.g. "Request accepted") + :param response_code: the HTTP status code to send -- default is 202 (accepted) + :param as_http: whether to send response as HTTP response object or as a string + :return: if as_http is True, return a HTTP Response object. Otherwise, return json-encoded-message + """ + response_message = ACCEPTED_MESSAGE_TEMPLATE.render(description=text, request_id=req_id) + if not as_http: + return response_message + + response = Response(response_message, content_type='application/json; charset=utf-8') + response.headers.add('content-length', len(response_message)) + response.status_code = response_code + return response diff --git a/osdfapp.py b/osdfapp.py new file mode 100755 index 0000000..f854dca --- /dev/null +++ b/osdfapp.py @@ -0,0 +1,168 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +""" +OSDF Manager Main Flask Application +""" + +import sys +from threading import Thread # for scaling up, may need celery with RabbitMQ or redis + +from flask import Flask, request, Response, g + +import osdf +import pydevd +import json +import osdf.adapters.policy.interface +import osdf.config.credentials +import osdf.config.loader +import osdf.datasources.aai.aai_local_cached_data +import osdf.operation.error_handling +import osdf.operation.responses +import traceback +from osdf.adapters.policy.interface import get_policies +from osdf.adapters.response_parsing.aots_ueb_cm_data import aots_ds_ueb_listener +from osdf.config.base import osdf_config, DOCKER_CM_OPTIMIZER, AOTS_CM_MESSAGE_BUS +from osdf.optimizers.cmopt.rcscheduler.local_opt_processor import process_local_cm_scheduler_opt +from osdf.optimizers.placementopt.conductor.remote_opt_processor import process_placement_opt +from osdf.webapp.appcontroller import auth_basic +from optparse import OptionParser +from osdf.operation.exceptions import BusinessException +from osdf.operation.error_handling import request_exception_to_json_body, internal_error_message +from requests import RequestException +from schematics.exceptions import DataError +from osdf.logging.osdf_logging import MH, audit_log, error_log +from osdf.models.placementRequest import PlacementAPI +from osdf.models.schedulerRequest import SchedulerAPI + +ERROR_TEMPLATE = osdf.ERROR_TEMPLATE + +app = Flask(__name__) + + + +BAD_CLIENT_REQUEST_MESSAGE = 'Client sent an invalid request' + +# An exception explicitly raised due to some business rule +@app.errorhandler(BusinessException) +def handle_business_exception(e): + error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc())) + err_msg = ERROR_TEMPLATE.render(description=str(e)) + response = Response(err_msg, content_type='application/json; charset=utf-8') + response.status_code = 400 + return response + +# Returns a detailed synchronous message to the calling client when osdf fails due to a remote call to another system +@app.errorhandler(RequestException) +def handle_request_exception(e): + error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc())) + err_msg = request_exception_to_json_body(e) + response = Response(err_msg, content_type='application/json; charset=utf-8') + response.status_code = 400 + return response + +# Returns a detailed message to the calling client when the initial synchronous message is invalid +@app.errorhandler(DataError) +def handle_data_error(e): + error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc())) + + body_dictionary = { + "serviceException": { + "text": BAD_CLIENT_REQUEST_MESSAGE, + "exceptionMessage": str(e.messages), + "errorType": "InvalidClientRequest" + } + } + + body_as_json = json.dumps(body_dictionary) + response = Response(body_as_json, content_type='application/json; charset=utf-8') + response.status_code = 400 + return response + + +@app.route("/osdf/api/v2/placement", methods=["POST"]) +@auth_basic.login_required +def do_placement_opt(): + """Perform placement optimization after validating the request and fetching policies + Make a call to the call-back URL with the output of the placement request. + Note: Call to Conductor for placement optimization may have redirects, so account for them + """ + request_json = request.get_json() + req_id = request_json['requestInfo']['requestId'] + g.request_id = req_id + audit_log.info(MH.received_request(request.url, request.remote_addr, json.dumps(request_json))) + + PlacementAPI(request_json).validate() + + # Currently policies are being used only during placement, so only fetch them if placement demands is not empty + policies = {} + + if 'placementDemand' in request_json['placementInfo']['demandInfo']: + policies, prov_status = get_policies(request_json, "placement") + + audit_log.info(MH.new_worker_thread(req_id, "[for placement]")) + t = Thread(target=process_placement_opt, args=(request_json, policies, osdf_config, prov_status)) + t.start() + audit_log.info(MH.accepted_valid_request(req_id, request)) + return osdf.operation.responses.osdf_response_for_request_accept( + req_id=req_id, text="Accepted placement request. Response will be posted to callback URL") + + +# Returned when unexpected coding errors occur during initial synchronous processing +@app.errorhandler(500) +def interal_failure(error): + error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc())) + response = Response(internal_error_message, content_type='application/json; charset=utf-8') + response.status_code = 500 + return response + + +def getOptions(argv): + program_version_string = '%%prog %s' % ("v1.0") + #program_usage = '''usage: spam two eggs''' # optional - will be autogenerated by optparse + program_longdesc = "" + program_license = "" + + # setup option parser + parser = OptionParser(version=program_version_string, epilog=program_longdesc, description=program_license) + parser.add_option("-l", "--local", dest="local", help="run locally", action="store_true", default=False) + parser.add_option("-t", "--devtest", dest="devtest", help="run in dev/test environment", action="store_true", default=False) + parser.add_option("-d", "--debughost", dest="debughost", help="IP Address of host running debug server", default='') + parser.add_option("-p", "--debugport", dest="debugport", help="Port number of debug server", type=int, default=5678) + (opts, args) = parser.parse_args(argv) + if (opts.debughost != ''): + print('pydevd.settrace(%s, port=%s)' % (opts.debughost, opts.debugport)) + pydevd.settrace(opts.debughost, port=opts.debugport) + return opts + + +if __name__ == "__main__": + + sys_conf = osdf_config['core']['osdf_system'] + ports = sys_conf['osdf_ports'] + internal_port, external_port = ports['internal'], ports['external'] + ssl_context = tuple(sys_conf['ssl_context']) + + common_app_opts = dict(host='0.0.0.0', threaded=True, use_reloader=False) + + opts = getOptions(sys.argv) + if (opts.local == False and opts.devtest == False): # normal deployment + app.run(port=internal_port, ssl_context=ssl_context, debug=False, **common_app_opts) + else: + port = internal_port if opts.local == True else external_port + app.run(port=port, debug=True, **common_app_opts) diff --git a/osdfapp.sh b/osdfapp.sh new file mode 100755 index 0000000..c54d59c --- /dev/null +++ b/osdfapp.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +# Call osdf app after setting LD_LIBRARY_PATH for oracle client, postgres client, etc. + +cd $(dirname $0) + +# Environment variables below are for ORACLE_HOME and such things, and not needed for 1707 onwards +# . ../dependencies/env.sh + +bash ../etc/make-certs.sh # create the https certificates if they are not present + +set -e + +mkdir -p logs + +if [ ! -e "osdf-optim" ]; then +( + mkdir tmp + cd tmp + tar xzf ../../dependencies/SNIROOptimizationPack.tgz + mv osdf ../osdf-optim + cd ../osdf-optim/pywheels + pip install docopt* jsonschema* +) +cp etc/run-case-local.sh osdf-optim/run/ +fi + +if [ $# -ge 1 ]; then + export SNIRO_MANAGER_CONFIG_FILE="$1" # this file is passed by the DCAE controller +fi + +# export FLASK_APP=osdfapp.py + +# flask run +python osdfapp.py # running the app diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/utils/__init__.py diff --git a/utils/data_conversion.py b/utils/data_conversion.py new file mode 100644 index 0000000..2f678fa --- /dev/null +++ b/utils/data_conversion.py @@ -0,0 +1,62 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import itertools +from collections import defaultdict + +from dateutil import tz +from dateutil.parser import parse + + +def tuples_to_multi_val_dict(kvw_tuples, colnums=(0, 1)): + """Given a list of k,v tuples, get a dictionary of the form k -> [v1,v2,...,vn] + :param kvw_tuples: list of k,v,w tuples (e.g. [(k1,v1,a1), (k2,v2,a2), (k1,v3,a3), (k1,v4,a4)] + :param colnums: column numbers + :return: a dict of str:set, something like {k1: {v1, v3, v4}, k2: {v2}} or {k1: {a1, a3, a4}, k2: {a2}} + """ + res = defaultdict(set) + for x in kvw_tuples: + key, val = x[colnums[0]], x[colnums[1]] + res[key].add(val) + return dict((k, set(v)) for k, v in res.items()) + + +def tuples_to_dict(kvw_tuples, colnums=(0, 1)): + """Given a list of k,v tuples, get a dictionary of the form k -> v + :param kvw_tuples: list of k,v,w tuples (e.g. [(k1,v1,a1), (k2,v2,a2), (k3,v3,a3), (k1,v4,a4)] + :param colnums: column numbers + :return: a dict; something like {k1: v4, k2: v2, k3: v3} (note, k1 is repeated, so last val is retained) + """ + return dict((x[colnums[0]], x[colnums[1]]) for x in kvw_tuples) + + +def utc_time_from_ts(timestamp): + """Return corresponding UTC timestamp for a given ISO timestamp (or anything that parse accepts)""" + return parse(timestamp).astimezone(tz.tzutc()).strftime('%Y-%m-%d %H:%M:%S') + + +def list_flatten(l): + """Flatten a complex nested list of nested lists into a flat list""" + return itertools.chain(*[list_flatten(j) if isinstance(j, list) else [j] for j in l]) + + +text_to_symbol = { + 'greater': ">", + 'less': "<", + 'equal': "=" +} diff --git a/utils/data_types.py b/utils/data_types.py new file mode 100644 index 0000000..877d4a1 --- /dev/null +++ b/utils/data_types.py @@ -0,0 +1,30 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import collections + + +def list_like(obj): + """Check if an object is a list-like object, but not a string or dict""" + return isinstance(obj, collections.Sequence) and not isinstance(obj, (str, bytes)) + + +def dict_like(obj): + """Check if an object is a list-like object, but not a string or dict""" + return isinstance(obj, collections.Mapping) + diff --git a/utils/interfaces.py b/utils/interfaces.py new file mode 100644 index 0000000..7a0e3a9 --- /dev/null +++ b/utils/interfaces.py @@ -0,0 +1,90 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import requests + +from osdf.config.base import osdf_config, creds_prefixes +from osdf.logging.osdf_logging import MH, debug_log + + +def get_rest_client(request_json, service): + """Get a RestClient based on request_json's callback URL and osdf_config's credentials based on service name + :param request_json: + :param service: so or cm + :return: rc -- RestClient + """ + callback_url = request_json["requestInfo"]["callbackUrl"] + prefix = creds_prefixes[service] + config = osdf_config.deployment + c_userid, c_passwd = config[prefix + "Username"], config[prefix + "Password"] + return RestClient(url=callback_url, userid=c_userid, passwd=c_passwd) + + +class RestClient(object): + """Simple REST Client that supports get/post and basic auth""" + + def __init__(self, userid=None, passwd=None, log_func=None, url=None, timeout=None, headers=None, + method="POST", req_id=None): + self.auth = (userid, passwd) if userid and passwd else None + self.headers = headers if headers else {} + self.method = method + self.url = url + self.log_func = log_func + self.timeout = (30, 90) if timeout is None else timeout + self.req_id = req_id + + def add_headers(self, headers): + self.headers.update(headers) + + def request(self, url=None, method=None, asjson=True, ok_codes=(2, ), + raw_response=False, noresponse=False, timeout=None, **kwargs): + """ + :param url: REST end point to query + :param method: GET or POST (default is None => self.method) + :param asjson: whether the expected response is in json format + :param ok_codes: expected codes (prefix matching -- e.g. can be (20, 21, 32) or (2, 3)) + :param noresponse: If no response is expected (as long as response codes are OK) + :param raw_response: If we need just the raw response (e.g. conductor sends transaction IDs in headers) + :param timeout: Connection and read timeouts + :param kwargs: Other parameters + :return: + """ + if not self.req_id: + debug_log.debug("Requesting URL: {}".format(url or self.url)) + else: + debug_log.debug("Requesting URL: {} for request ID: {}".format(url or self.url, self.req_id)) + + res = requests.request(url=url or self.url, method=method or self.method, + auth=self.auth, headers=self.headers, + timeout=timeout or self.timeout, **kwargs) + + if self.log_func: + self.log_func(MH.received_http_response(res)) + + res_code = str(res.status_code) + if not any(res_code.startswith(x) for x in map(str, ok_codes)): + raise res.raise_for_status() + + if raw_response: + return res + elif noresponse: + return None + elif asjson: + return res.json() + else: + return res.content diff --git a/utils/local_processing.py b/utils/local_processing.py new file mode 100644 index 0000000..6768839 --- /dev/null +++ b/utils/local_processing.py @@ -0,0 +1,43 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import os + +from osdf.logging.osdf_logging import metrics_log, MH, warn_audit_error + + +def local_create_job_file(req_id, json_req, fname='osdf-req-data.json'): + """Creates a "work" folder for local processing and place relevant + job task file in there""" + + work_dir = 'osdf-optim/work/' + req_id + work_file = '{}/{}'.format(work_dir, fname) + try: + cur_task = "Making a local directory in the OSDF manager for req-id: {}".format(req_id) + metrics_log.info(MH.creating_local_env(cur_task)) + os.makedirs(work_dir, exist_ok=True) + except Exception as err: + warn_audit_error(MH.error_local_env(req_id, "Can't create directory {}".format(work_dir), err)) + return None + try: + with open(work_file, 'w') as fid: + fid.write(json_req['payload']) + return work_dir + except Exception as err: + warn_audit_error(MH.error_local_env(req_id, "can't create file {}".format(work_file), err)) + return None diff --git a/utils/programming_utils.py b/utils/programming_utils.py new file mode 100644 index 0000000..a0a8fde --- /dev/null +++ b/utils/programming_utils.py @@ -0,0 +1,105 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import collections +import itertools + + +class DotDict(dict): + """A dot-dict mixin to be able to access a dictionary via dot notation + source: https://stackoverflow.com/questions/2352181/how-to-use-a-dot-to-access-members-of-dictionary + """ + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + + +class MetaSingleton(type): + """Singleton class (2nd Chapter) from Learning Python Design Patterns - 2nd ed. + Chetan Giridhar, Packt Publ. 2016""" + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +def namedtuple_with_defaults(typename, field_names, default_values=()): + """A namedtuple with default values -- Stack overflow recipe from Mark Lodato + http://stackoverflow.com/questions/11351032/named-tuple-and-optional-keyword-arguments + :param typename: Name for the class (same as for namedtuple) + :param field_names: Field names (same as for namedtuple) + :param default_values: Can be specified as a dictionary or as a list + :return: New namedtuple object + """ + T = collections.namedtuple(typename, field_names) + T.__new__.__defaults__ = (None,) * len(T._fields) + if isinstance(default_values, collections.Mapping): + prototype = T(**default_values) + else: + prototype = T(*default_values) + T.__new__.__defaults__ = tuple(prototype) + return T + + +def dot_notation(dict_like, dot_spec): + """Return the value corresponding to the dot_spec from a dict_like object + :param dict_like: dictionary, JSON, etc. + :param dot_spec: a dot notation (e.g. a1.b1.c1.d1 => a1["b1"]["c1"]["d1"]) + :return: the value referenced by the dot_spec + """ + attrs = dot_spec.split(".") # we split the path + parent = dict_like.get(attrs[0]) + children = ".".join(attrs[1:]) + if not (parent and children): # if no children or no parent, bail out + return parent + if isinstance(parent, list): # here, we apply remaining path spec to all children + return [dot_notation(j, children) for j in parent] + elif isinstance(parent, dict): + return dot_notation(parent, children) + else: + return None + + +def list_flatten(l): + """ + Flatten a complex nested list of nested lists into a flat list (DFS). + For example, [ [1, 2], [[[2,3,4], [2,3,4]], [3,4,5, 'hello']]] + will produce [1, 2, 2, 3, 4, 2, 3, 4, 3, 4, 5, 'hello'] + """ + return list(itertools.chain(*[list_flatten(j) if isinstance(j, list) else [j] for j in l])) + + +def inverted_dict(keys: list, key_val_dict: dict) -> dict: + """ + Get val -> [keys] mapping for the given keys using key_val_dict + :param keys: the keys we are interested in (a list) + :param key_val_dict: the key -> val mapping + :return: inverted dictionary of val -> [keys] (for the subset dict of given keys) + """ + res = {} + all_tuples = ((k, key_val_dict[k] if k in key_val_dict else 'no-parent-' + k) for k in keys) + for k, v in all_tuples: + if v in res: + res[v].append(k) + else: + res[v] = [k] + # making sure to remove duplicate keys + res = dict((v, list(set(k_list))) for v, k_list in res.items()) + return res diff --git a/webapp/__init__.py b/webapp/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/webapp/__init__.py diff --git a/webapp/appcontroller.py b/webapp/appcontroller.py new file mode 100644 index 0000000..49f84ff --- /dev/null +++ b/webapp/appcontroller.py @@ -0,0 +1,47 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from flask import request +from flask_httpauth import HTTPBasicAuth +from flask import Response +import json +import osdf +from osdf.config.base import http_basic_auth_credentials + +auth_basic = HTTPBasicAuth() + +error_body = { + "serviceException": { + "text": "Unauthorized, check username and password" + } +} + +unauthorized_message = json.dumps(error_body) + +@auth_basic.get_password +def get_pw(username): + end_point = request.url.split('/')[-1] + auth_group = osdf.end_point_auth_mapping.get(end_point) + return http_basic_auth_credentials[auth_group].get(username) if auth_group else None + +@auth_basic.error_handler +def auth_error(): + response = Response(unauthorized_message, content_type='application/json; charset=utf-8') + response.headers.add('content-length', len(unauthorized_message)) + response.status_code = 401 + return response |