diff options
40 files changed, 2121 insertions, 0 deletions
diff --git a/__init__.py b/__init__.py new file mode 100755 index 0000000..d0993ae --- /dev/null +++ b/__init__.py @@ -0,0 +1,45 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +"""Core functions for OSDF Application, including flask app""" + +from jinja2 import Template + + +end_point_auth_mapping = { # map a URL endpoint to auth group + "cmscheduler": "CMScheduler", + "placement": "Placement", +} + +userid_suffix, passwd_suffix = "Username", "Password" +auth_groups = set(end_point_auth_mapping.values()) + +ERROR_TEMPLATE = Template(""" +{ + "serviceException": { + "text": "{{ description }}" + } +} +""") + +ACCEPTED_MESSAGE_TEMPLATE = Template(""" +{ + "requestId": "{{ request_id }}", + "text": "{{ description }}" +} +""") diff --git a/adapters/__init__.py b/adapters/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/__init__.py diff --git a/adapters/database/OracleDB.py b/adapters/database/OracleDB.py new file mode 100644 index 0000000..655dd27 --- /dev/null +++ b/adapters/database/OracleDB.py @@ -0,0 +1,32 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import cx_Oracle + +from osdf.utils.programming_utils import MetaSingleton + + +class OracleDB(metaclass=MetaSingleton): + conn, cur = None, None + + def connect(self, host=None, sid=None, user=None, passwd=None, port=5432): + if self.conn is None: + tns_info = cx_Oracle.makedsn(host=host, port=port, sid=sid) + self.conn = cx_Oracle.connect(user=user, password=passwd, dsn=tns_info, threaded=True) + self.cur = self.conn.cursor() + return self.conn, self.cur diff --git a/adapters/database/PostgresDB.py b/adapters/database/PostgresDB.py new file mode 100644 index 0000000..6689566 --- /dev/null +++ b/adapters/database/PostgresDB.py @@ -0,0 +1,31 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import psycopg2 + +from osdf.utils.programming_utils import MetaSingleton + + +class PostgresDB(metaclass=MetaSingleton): + conn, cur = None, None + + def connect(self, host=None, db=None, user=None, passwd=None, port=5432): + if self.conn is None: + self.conn = psycopg2.connect(host=host, port=port, user=user, password=passwd, database=db) + self.cur = self.conn.cursor() + return self.conn, self.cur diff --git a/adapters/database/VerticaDB.py b/adapters/database/VerticaDB.py new file mode 100644 index 0000000..ad961d7 --- /dev/null +++ b/adapters/database/VerticaDB.py @@ -0,0 +1,55 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import jaydebeapi +import sqlalchemy.pool as pool + +from jaydebeapi import _DEFAULT_CONVERTERS, _java_to_py +from osdf.utils.programming_utils import MetaSingleton +from osdf.config.base import osdf_config + +_DEFAULT_CONVERTERS.update({'BIGINT': _java_to_py('longValue')}) + + +class VerticaDB(metaclass=MetaSingleton): + connection_pool = None + + def get_connection(self): + p = self.get_config_params() + c = jaydebeapi.connect( + 'com.vertica.jdbc.Driver', + 'jdbc:vertica://{}:{}/{}'.format(p['host'], p['port'], p['db']), + {'user': p['user'], 'password': p['passwd'], 'CHARSET': 'UTF8'}, + jars=[p['db_driver']] + ) + return c + + def get_config_params(self): + config = osdf_config["deployment"] + host, port, db = config["verticaHost"], config["verticaPort"], config.get("verticaDB") + user, passwd = config["verticaUsername"], config["verticaPassword"] + jar_path = osdf_config['core']['osdf_system']['vertica_jar'] + params = dict(host=host, db=db, user=user, passwd=passwd, port=port, db_driver=jar_path) + return params + + def connect(self): + if self.connection_pool is None: + self.connection_pool = pool.QueuePool(self.get_connection, max_overflow=10, pool_size=5, recycle=600) + conn = self.connection_pool.connect() + cursor = conn.cursor() + return conn, cursor diff --git a/adapters/database/__init__.py b/adapters/database/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/database/__init__.py diff --git a/adapters/dcae/__init__.py b/adapters/dcae/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/dcae/__init__.py diff --git a/adapters/dcae/message_router.py b/adapters/dcae/message_router.py new file mode 100755 index 0000000..e495331 --- /dev/null +++ b/adapters/dcae/message_router.py @@ -0,0 +1,100 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import requests +from osdf.utils.data_types import list_like +from osdf.operation.exceptions import MessageBusConfigurationException + + +class MessageRouterClient(object): + def __init__(self, + dmaap_url=None, + mr_host_base_urls=None, + topic=None, + consumer_group=None, consumer_id=None, + timeout_ms=15000, fetch_limit=1000, + userid=None, passwd=None): + """ + :param dmaap_url: protocol, host and port; mostly for UEB + (e.g. https://dcae-msrt-ftl.homer.att.com:3905/) + :param mr_host_base_urls: for DMaaP, we get a topic URL (base_url + events/topic_name) + (e.g. https://dcae-msrt-ftl.homer.att.com:3905/events/com.att.dcae.dmaap.FTL.SNIRO-CM-SCHEDULER-RESPONSE) + :param consumer_group: DMaaP/UEB consumer group (unique for each subscriber; required for GET) + :param consumer_id: DMaaP/UEB consumer ID (unique for each thread/process for a subscriber; required for GET) + :param timeout_ms: (optional, default 15 seconds or 15,000 ms) server-side timeout for GET request + :param fetch_limit: (optional, default 1000 messages per request for GET), ignored for "POST" + :param userid: (optional, userid for HTTP basic authentication) + :param passwd: (optional, password for HTTP basic authentication) + """ + mr_error = MessageBusConfigurationException + if dmaap_url is None: # definitely not DMaaP, so use UEB mode + self.is_dmaap = False + if not (mr_host_base_urls and list_like(mr_host_base_urls)): + raise mr_error("Not a DMaaP or UEB configuration") + if not topic: + raise mr_error("Invalid topic: '{}'",format(topic)) + self.topic_urls = ["{}/events/{}".format(base_url, topic) for base_url in mr_host_base_urls] + else: + self.is_dmaap = True + self.topic_urls = [dmaap_url] + + self.timeout_ms = timeout_ms + self.fetch_limit = fetch_limit + self.auth = (userid, passwd) if userid and passwd else None + self.consumer_group = consumer_group + self.consumer_id = consumer_id + + def get(self, outputjson=True): + """Fetch messages from message router (DMaaP or UEB) + :param outputjson: (optional, specifies if response is expected to be in json format), ignored for "POST" + :return: response as a json object (if outputjson is True) or as a string + """ + url_fmt = "{topic_url}/{cgroup}/{cid}?timeout={timeout_ms}&limit={limit}" + urls = [url_fmt.format(topic_url=x, timeout_ms=self.timeout_ms, limit=self.fetch_limit, + cgroup=self.consumer_group, cid=self.consumer_id) for x in self.topic_urls] + for url in urls[:-1]: + try: + return self.http_request(method='GET', url=url, outputjson=outputjson) + except: + pass + return self.http_request(method='GET', url=urls[-1], outputjson=outputjson) + + def post(self, msg, inputjson=True): + for url in self.topic_urls[:-1]: + try: + return self.http_request(method='POST', url=url, inputjson=inputjson, msg=msg) + except: + pass + return self.http_request(method='POST', url=self.topic_urls[-1], inputjson=inputjson, msg=msg) + + def http_request(self, url, method, inputjson=True, outputjson=True, msg=None, **kwargs): + """ + Perform the actual URL request (GET or POST), and do error handling + :param url: full URL (including topic, limit, timeout, etc.) + :param method: GET or POST + :param inputjson: Specify whether input is in json format (valid only for POST) + :param outputjson: Is response expected in a json format + :param msg: content to be posted (valid only for POST) + :return: response as a json object (if outputjson or POST) or as a string; None if error + """ + res = requests.request(url=url, method=method, auth=self.auth, **kwargs) + if res.status_code == requests.codes.ok: + return res.json() if outputjson or method == "POST" else res.content + else: + raise Exception("HTTP Response Error: code {}; headers:{}, content: {}".format( + res.status_code, res.headers, res.content)) diff --git a/adapters/local_data/__init__.py b/adapters/local_data/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/local_data/__init__.py diff --git a/adapters/local_data/local_policies.py b/adapters/local_data/local_policies.py new file mode 100644 index 0000000..c63ae5a --- /dev/null +++ b/adapters/local_data/local_policies.py @@ -0,0 +1,40 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +import os + + +def get_local_policies(local_policy_folder, local_policy_list, policy_id_list=None): + """ + Get policies from a local file system. + Required for the following scenarios: + (a) doing work-arounds (e.g. if we are asked to drop some policies for testing purposes) + (b) work-arounds when policy platform is giving issues (e.g. if dev/IST policies are wiped out in an upgrade) + :param local_policy_folder: where the policy files are present + :param local_policy_list: list of local policies + :param policy_id_list: list of policies to get (if unspecified or None, get all) + :return: get policies + """ + policies = [] + for fname in local_policy_list: # ugly removal of .json from file name + if policy_id_list and fname[:-5] not in policy_id_list: + continue + with open(os.path.join(local_policy_folder, fname)) as fid: + policies.append(json.load(fid)) + return policies diff --git a/adapters/request_parsing/__init__.py b/adapters/request_parsing/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/request_parsing/__init__.py diff --git a/adapters/request_parsing/placement.py b/adapters/request_parsing/placement.py new file mode 100644 index 0000000..d7a6575 --- /dev/null +++ b/adapters/request_parsing/placement.py @@ -0,0 +1,33 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import copy +import json +from osdf.utils.programming_utils import list_flatten, dot_notation + + +def json_path_after_expansion(req_json, reference): + """ + Get the child node(s) from the dot-notation [reference] and parent [req_json]. + For placement and other requests, there are encoded JSONs inside the request or policy, + so we need to expand it and then do a search over the parent plus expanded JSON. + """ + req_json_copy = copy.deepcopy(req_json) # since we expand the JSON in place, we work on a copy + req_json_copy['placementInfo']['orderInfo'] = json.loads(req_json_copy['placementInfo']['orderInfo']) + info = dot_notation(req_json_copy, reference) + return list_flatten(info) if isinstance(info, list) else info diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..303a8ce --- /dev/null +++ b/config/__init__.py @@ -0,0 +1,32 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- + + +import yaml +import json + +from osdf.utils.programming_utils import MetaSingleton + + +class CoreConfig(metaclass=MetaSingleton): + core_config = None + + def get_core_config(self, config_file=None): + if self.core_config is None: + self.core_config = yaml.load(open(config_file)) + return self.core_config + diff --git a/config/base.py b/config/base.py new file mode 100644 index 0000000..b8aacff --- /dev/null +++ b/config/base.py @@ -0,0 +1,36 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import os + +import osdf.config.loader as config_loader +import osdf.config.credentials as creds +from osdf.utils.programming_utils import DotDict + +config_spec = { + "deployment": os.environ.get("OSDF_MANAGER_CONFIG_FILE", "config/osdf_config.yaml"), + "core": "config/common_config.yaml" + } + +osdf_config = DotDict(config_loader.all_configs(**config_spec)) + +http_basic_auth_credentials = creds.load_credentials(osdf_config) + +dmaap_creds = creds.dmaap_creds() + +creds_prefixes = {"so": "so", "cm": "cmPortal"} diff --git a/config/credentials.py b/config/credentials.py new file mode 100644 index 0000000..e5a6399 --- /dev/null +++ b/config/credentials.py @@ -0,0 +1,60 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json + +from osdf import auth_groups, userid_suffix, passwd_suffix + + +def dmaap_creds(dmaap_file="/etc/dcae/dmaap.conf"): + """Get DMaaP credentials from DCAE for publish and subscribe""" + try: + dmaap_creds = _get_dmaap_creds(dmaap_file) + except: + dmaap_creds = {} + return dmaap_creds + + +def _get_dmaap_creds(dmaap_file): + """Get DMaaP credentials from DCAE for publish and subscribe""" + streams = json.load(open(dmaap_file, 'r')) + pubs = [x for x in streams + if x['dmaapStreamId'] == 'requests' and x['dmaapAction'] == 'publish'] + subs = [x for x in streams + if x['dmaapStreamId'] == 'responses' and x['dmaapAction'] == 'subscribe'] + + def get_dmaap_info(x): + """Get DMaaP credentials from dmaap_object 'x'""" + return dict(url=x.get('dmaapUrl'), userid=x.get('dmaapUserName'), passwd=x.get('dmaapPassword')) + + return {'pub': get_dmaap_info(pubs[0]), 'sub': get_dmaap_info(subs[0])} + + +def load_credentials(osdf_config): + """Get credentials as dictionaries grouped by auth_group (e.g. creds["Placement"]["user1"] = "pass1")""" + creds = dict((x, dict()) for x in auth_groups) # each auth group has userid, passwd dict + suffix_start = len(userid_suffix) + + config = osdf_config.deployment + + for element, username in config.items(): + for x in auth_groups: + if element.startswith("osdf" + x) and element.endswith(userid_suffix): + passwd = config[element[:-suffix_start] + passwd_suffix] + creds[x][username] = passwd + return creds diff --git a/config/loader.py b/config/loader.py new file mode 100644 index 0000000..7cb363a --- /dev/null +++ b/config/loader.py @@ -0,0 +1,51 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json + +import yaml + + +def load_config_file(config_file: str, child_name="dockerConfiguration") -> dict: + """ + Load OSDF configuration from a file -- currently only yaml/json are supported + :param config_file: path to config file (.yaml or .json). + :param child_name: if present, return only that child node + :return: config (all or specific child node) + """ + with open(config_file, 'r') as fid: + res = {} + if config_file.endswith(".yaml"): + res = yaml.load(fid) + elif config_file.endswith(".json") or config_file.endswith("json"): + res = json.load(fid) + return res.get(child_name, res) if child_name else res + + +def dcae_config(config_file: str) -> dict: + return load_config_file(config_file, child_name="dockerConfiguration") + + +def all_configs(**kwargs: dict) -> dict: + """ + Load all specified configurations + :param config_file_spec: key-value pairs + (e.g. { "core": "common_config.yaml", "deployment": "/tmp/1452523532json" }) + :return: merged config as a nested dictionary + """ + return {k: load_config_file(fname) for k, fname in kwargs.items()} diff --git a/models/api/common.py b/models/api/common.py new file mode 100755 index 0000000..0d2d0eb --- /dev/null +++ b/models/api/common.py @@ -0,0 +1,54 @@ +# -------------------------------------------------------------------------
+# Copyright (c) 2015-2017 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+import datetime
+from pprint import pformat
+
+from dateutil.parser import parse
+from schematics.exceptions import ConversionError
+from schematics.models import Model
+from schematics.types import DateTimeType
+
+
+class OSDFModel(Model):
+ """Extends generic model with a couple of extra methods"""
+ def __str__(self):
+ """Return values of object's attributes -- excluding hidden or callable ones"""
+ def _str_format(x):
+ """Coerce as string for some special objects"""
+ return str(x) if isinstance(x, datetime.datetime) else x
+
+ z1 = dict((x, getattr(self, x)) for x in dir(self)
+ if not x.startswith("_") and not callable(getattr(self, x)))
+ z1 = dict((x, _str_format(y)) for x, y in z1.items())
+ return pformat(z1, depth=4, indent=2, width=1000)
+
+ def __repr__(self):
+ """Return values of object's attributes -- excluding hidden or callable ones"""
+ return self.__str__()
+
+
+class CustomISODateType(DateTimeType):
+ """Schematics doesn't support full ISO, so we use custom one"""
+ def to_native(self, value, context=None):
+ if isinstance(value, datetime.datetime):
+ return value
+ try:
+ return parse(value)
+ except:
+ raise ConversionError(u'Invalid timestamp {}'.format(value))
diff --git a/models/api/placementRequest.py b/models/api/placementRequest.py new file mode 100644 index 0000000..73eac75 --- /dev/null +++ b/models/api/placementRequest.py @@ -0,0 +1,124 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from .common import OSDFModel +from schematics.types import StringType, URLType, IntType, FloatType +from schematics.types.compound import ModelType, ListType + + +class RequestInfo(OSDFModel): + """Info for northbound request from client such as SO""" + transactionId = StringType(required=True) + requestId = StringType(required=True) + callbackUrl = URLType(required=True) + sourceId = StringType(required=True) + optimizer = ListType(StringType()) + numSolutions = IntType() + timeout = IntType() + requestType = StringType() + + +class CandidateInfo(OSDFModel): + """Preferred candidate for a resource (sent as part of a request from client)""" + candidateType = StringType(required=True) + candidates = ListType(StringType(required=True)) + + +class ResourceModelInfo(OSDFModel): + """Model information for a specific resource""" + modelCustomizationId = StringType(required=True) + modelInvariantId = StringType(required=True) + modelName = StringType() + modelVersion = StringType() + modelVersionId = StringType() + modelType = StringType() + operationalStatus = StringType() + + +class ExistingLicenseInfo(OSDFModel): + entitlementPoolUUID = ListType(StringType()) + licenseKeyGroupUUID = ListType(StringType()) + + +class LicenseDemand(OSDFModel): + resourceInstanceType = StringType(required=True) + serviceResourceId = StringType(required=True) + resourceModuleName = StringType(required=True) + resourceModelInfo = ModelType(ResourceModelInfo) + existingLicense = ModelType(ExistingLicenseInfo) + + +class PlacementDemand(OSDFModel): + resourceInstanceType = StringType(required=True) + serviceResourceId = StringType(required=True) + resourceModuleName = StringType(required=True) + exclusionCandidateInfo = ListType(ModelType(CandidateInfo)) + requiredCandidateInfo = ListType(ModelType(CandidateInfo)) + resourceModelInfo = ModelType(ResourceModelInfo) + tenantId = StringType() + tenantName = StringType() + + +class ExistingPlacementInfo(OSDFModel): + serviceInstanceId = StringType(required=True) + + +class DemandInfo(OSDFModel): + """Requested resources (sent as part of a request from client)""" + placementDemand = ListType(ModelType(PlacementDemand)) + licenseDemand = ListType(ModelType(LicenseDemand)) + + +class SubscriberInfo(OSDFModel): + """Details on the customer that subscribes to the VNFs""" + globalSubscriberId = StringType(required=True) + subscriberName = StringType() + subscriberCommonSiteId = StringType() + + +class ServiceModelInfo(OSDFModel): + """ASDC Service model information""" + modelType = StringType(required=True) + modelInvariantId = StringType(required=True) + modelVersionId = StringType(required=True) + modelName = StringType(required=True) + modelVersion = StringType(required=True) + + +class Location(OSDFModel): + latitude = FloatType(required=True) + longitude = FloatType(required=True) + + +class PlacementInfo(OSDFModel): + """Information specific to placement optimization""" + serviceModelInfo = ModelType(ServiceModelInfo) + subscriberInfo = ModelType(SubscriberInfo) + demandInfo = ModelType(DemandInfo, required=True) + orderInfo = StringType() + policyId = ListType(StringType()) + serviceInstanceId = StringType() + existingPlacement = ModelType(ExistingPlacementInfo) + location = ModelType(Location) + serviceType = StringType() + + +class PlacementAPI(OSDFModel): + """Request for placement optimization (specific to optimization and additional metadata""" + requestInfo = ModelType(RequestInfo, required=True) + placementInfo = ModelType(PlacementInfo, required=True) diff --git a/models/api/placementResponse.py b/models/api/placementResponse.py new file mode 100644 index 0000000..e9746d6 --- /dev/null +++ b/models/api/placementResponse.py @@ -0,0 +1,57 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from .common import OSDFModel +from schematics.types import StringType +from schematics.types.compound import ModelType, ListType + + +# TODO: update osdf.models + +class LicenseSolution(OSDFModel): + serviceResourceId = StringType(required=True) + resourceModuleName = StringType(required=True) + entitlementPoolList = ListType(StringType(required=True)) + licenseKeyGroupList = ListType(StringType(required=True)) + + +class AssignmentInfo(OSDFModel): + variableName = StringType(required=True) + variableValue = StringType(required=True) + + +class PlacementSolution(OSDFModel): + serviceResourceId = StringType(required=True) + resourceModuleName = StringType(required=True) + inventoryType = StringType(required=True) + serviceInstanceId = StringType() + cloudRegionId = StringType() + assignmentInfo = ListType(ModelType(AssignmentInfo)) + + +class SolutionInfo(OSDFModel): + placement = ListType(ModelType(PlacementSolution), min_size=1) + license = ListType(ModelType(LicenseSolution), min_size=1) + + +class PlacementResponse(OSDFModel): + transactionId = StringType(required=True) + requestId = StringType(required=True) + requestState = StringType(required=True) + statusMessage = StringType(required=True) + solutionInfo = ModelType(SolutionInfo) diff --git a/operation/__init__.py b/operation/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/operation/__init__.py diff --git a/operation/error_handling.py b/operation/error_handling.py new file mode 100644 index 0000000..dfb0848 --- /dev/null +++ b/operation/error_handling.py @@ -0,0 +1,93 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json + +from schematics.exceptions import DataError + +from requests import RequestException +from requests import ConnectionError, HTTPError, Timeout +from osdf.operation.exceptions import BusinessException + +import osdf + +ERROR_TEMPLATE = osdf.ERROR_TEMPLATE + +MESSAGE_BASE = "A solution couldn't be determined because an external application" +HTTP_ERROR_MESSAGE = MESSAGE_BASE + " returned a HTTP error" +TIMEOUT_ERROR_MESSAGE = MESSAGE_BASE + " could not respond in time, please check the external application" +CONNECTION_ERROR_MESSAGE = MESSAGE_BASE + " could not be reached" + +internal_error_body = { + "serviceException": { + "text": "Unhandled internal exception, request could not be processed" + } +} + +internal_error_message = json.dumps(internal_error_body) + + +def build_json_error_body(error): + if isinstance(error,RequestException): + return request_exception_to_json_body(error) + elif isinstance(error, DataError): + return data_error_to_json_body(error) + elif type(error) is BusinessException: # return the error message, because it is well formatted + return ERROR_TEMPLATE.render(description=str(error)) + else: + return internal_error_message + + +def data_error_to_json_body(error): + description = str(error).replace('"', '\\"') + error_message = ERROR_TEMPLATE.render(description=description) + return error_message + + +def request_exception_to_json_body(error): + friendly_message = "A request exception has occurred when contacting an external system" + if type(error) is HTTPError: + friendly_message = HTTP_ERROR_MESSAGE + if type(error) is ConnectionError: + friendly_message = CONNECTION_ERROR_MESSAGE + if type(error) is Timeout: + friendly_message = TIMEOUT_ERROR_MESSAGE + + eie_body = { + "serviceException": { + "text": friendly_message, + "errorType": "InterfaceError" + }, + "externalApplicationDetails": { + "httpMethod": error.request.method, + "url": error.request.url + } + } + + response = error.response + + if response is not None: + eie_body['externalApplicationDetails']['httpStatusCode'] = response.status_code + content_type = response.headers.get('content-type') + if content_type is not None: + if 'application/json' in content_type: + eie_body['externalApplicationDetails']['responseMessage'] = response.json() + elif 'text/html' in content_type: + eie_body['externalApplicationDetails']['responseMessage'] = response.text + error_message = json.dumps(eie_body) + return error_message diff --git a/operation/exceptions.py b/operation/exceptions.py new file mode 100644 index 0000000..5277b01 --- /dev/null +++ b/operation/exceptions.py @@ -0,0 +1,40 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +class BusinessException(Exception): + pass + + +class MessageBusConfigurationException(Exception): + pass + + +class CMDataError(Exception): + pass + + +class CMSOExecutionError(Exception): + pass + + +class CMSOCallBackError(Exception): + pass + + +class CMSOInvalidRequestException(Exception): + pass diff --git a/operation/responses.py b/operation/responses.py new file mode 100644 index 0000000..22a94f7 --- /dev/null +++ b/operation/responses.py @@ -0,0 +1,39 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from flask import Response + +from osdf import ACCEPTED_MESSAGE_TEMPLATE + + +def osdf_response_for_request_accept(req_id="", text="", response_code=202, as_http=True): + """Helper method to create a response object for request acceptance, so that the object can be sent to a client + :param req_id: request ID provided by the caller + :param text: extra text description about accepting the request (e.g. "Request accepted") + :param response_code: the HTTP status code to send -- default is 202 (accepted) + :param as_http: whether to send response as HTTP response object or as a string + :return: if as_http is True, return a HTTP Response object. Otherwise, return json-encoded-message + """ + response_message = ACCEPTED_MESSAGE_TEMPLATE.render(description=text, request_id=req_id) + if not as_http: + return response_message + + response = Response(response_message, content_type='application/json; charset=utf-8') + response.headers.add('content-length', len(response_message)) + response.status_code = response_code + return response diff --git a/optimizers/__init__.py b/optimizers/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/optimizers/__init__.py diff --git a/optimizers/placementopt/__init__.py b/optimizers/placementopt/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/optimizers/placementopt/__init__.py diff --git a/optimizers/placementopt/conductor/__init__.py b/optimizers/placementopt/conductor/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/optimizers/placementopt/conductor/__init__.py diff --git a/optimizers/placementopt/conductor/api_builder.py b/optimizers/placementopt/conductor/api_builder.py new file mode 100644 index 0000000..c0281fe --- /dev/null +++ b/optimizers/placementopt/conductor/api_builder.py @@ -0,0 +1,121 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import copy +import json +from osdf.utils import data_mapping +from jinja2 import Template +from osdf.utils.programming_utils import list_flatten, dot_notation +import osdf.optimizers.placementopt.conductor.translation as tr +from osdf.adapters.policy.utils import group_policies + + +def conductor_api_builder(request_json, flat_policies: list, local_config, prov_status, + template="templates/conductor_interface.json"): + """Build a SNIRO southbound API call for Conductor/Placement optimization + :param request_json: parameter data received from a client + :param flat_policies: policy data received from the policy platform (flat policies) + :param template: template to generate southbound API call to conductor + :param local_config: local configuration file with pointers for the service specific information + :param prov_status: provStatus retrieved from Subscriber policy + :return: json to be sent to Conductor/placement optimization + """ + templ = Template(open(template).read()) + gp = group_policies(flat_policies) + demand_vnf_name_list = [] + + for placementDemand in request_json['placementInfo']['demandInfo']['placementDemand']: + demand_vnf_name_list.append(placementDemand['resourceModuleName']) + + demand_list = tr.gen_demands(request_json['placementInfo']['demandInfo'], gp['vnfPolicy']) + attribute_policy_list = tr.gen_attribute_policy(demand_vnf_name_list, gp['attribute']) + distance_to_location_policy_list = tr.gen_distance_to_location_policy( + demand_vnf_name_list, gp['distance_to_location']) + inventory_policy_list = tr.gen_inventory_group_policy(demand_vnf_name_list, gp['inventory_group']) + resource_instance_policy_list = tr.gen_resource_instance_policy( + demand_vnf_name_list, gp['instance_fit']) + resource_region_policy_list = tr.gen_resource_region_policy(demand_vnf_name_list, gp['region_fit']) + zone_policy_list = tr.gen_zone_policy(demand_vnf_name_list, gp['zone']) + optimization_policy_list = tr.gen_optimization_policy(demand_vnf_name_list, gp['placementOptimization']) + reservation_policy_list = tr.gen_reservation_policy(demand_vnf_name_list, gp['instance_reservation']) + conductor_policies = [attribute_policy_list, distance_to_location_policy_list, inventory_policy_list, + resource_instance_policy_list, resource_region_policy_list, zone_policy_list] + filtered_policies = [x for x in conductor_policies if len(x) > 0] + policy_groups = list_flatten(filtered_policies) + reservation_policies = [x for x in reservation_policy_list if len(x) > 0] + reservation_groups = list_flatten(reservation_policies) + req_info = request_json['requestInfo'] + model_name = request_json['placementInfo']['serviceModelInfo']['modelName'] + service_type = data_mapping.get_service_type(model_name) + service_info = local_config.get('service_info', {}).get(service_type, {}) + if 'orderInfo' in request_json["placementInfo"]: + order_info = json.loads(request_json["placementInfo"]["orderInfo"]) + request_type = req_info.get('requestType', None) + subs_com_site_id = "" + if 'subscriberInfo' in request_json['placementInfo']: + subs_com_site_id = request_json['placementInfo']['subscriberInfo'].get('subscriberCommonSiteId', "") + if service_type == 'vCPE': + data_mapping.normalize_user_params(order_info) + rendered_req = templ.render( + requestType=request_type, + chosenComplex=subs_com_site_id, + demand_list=demand_list, + policy_groups=policy_groups, + optimization_policies=optimization_policy_list, + name=req_info['requestId'], + timeout=req_info['timeout'], + limit=req_info['numSolutions'], + serviceType=service_type, + serviceInstance=request_json['placementInfo']['serviceInstanceId'], + provStatus = prov_status, + chosenRegion=order_info['requestParameters']['lcpCloudRegionId'], + json=json) + elif service_type == 'UNKNOWN': + rendered_req = templ.render( + requestType=request_type, + chosenComplex=subs_com_site_id, + demand_list=demand_list, + policy_groups=policy_groups, + reservation_groups=reservation_groups, + optimization_policies=optimization_policy_list, + name=req_info['requestId'], + timeout=req_info['timeout'], + limit=req_info['numSolutions'], + serviceType=service_type, + serviceInstance=request_json['placementInfo']['serviceInstanceId'], + provStatus = prov_status, + # process order data + bandwidth=dot_notation(order_info, service_info['bandwidth']), + bandwidth_unit=dot_notation(order_info, service_info['bandwidth_units']), + json=json) + json_payload = json.dumps(json.loads(rendered_req)) # need this because template's JSON is ugly! + return json_payload + + +def retrieve_node(req_json, reference): + """ + Get the child node(s) from the dot-notation [reference] and parent [req_json]. + For placement and other requests, there are encoded JSONs inside the request or policy, + so we need to expand it and then do a search over the parent plus expanded JSON. + """ + req_json_copy = copy.deepcopy(req_json) # since we expand the JSON in place, we work on a copy + if 'orderInfo' in req_json_copy['placementInfo']: + req_json_copy['placementInfo']['orderInfo'] = json.loads(req_json_copy['placementInfo']['orderInfo']) + info = dot_notation(req_json_copy, reference) + return list_flatten(info) if isinstance(info, list) else info + diff --git a/optimizers/placementopt/conductor/conductor.py b/optimizers/placementopt/conductor/conductor.py new file mode 100644 index 0000000..bdc7f17 --- /dev/null +++ b/optimizers/placementopt/conductor/conductor.py @@ -0,0 +1,186 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +""" +This application generates conductor API calls using the information received from SO and Policy platform. +""" + +import json +import time + +from jinja2 import Template +from requests import RequestException + +from osdf.logging.osdf_logging import debug_log +from osdf.optimizers.placementopt.conductor.api_builder import conductor_api_builder +from osdf.utils.interfaces import RestClient +from osdf.operation.exceptions import BusinessException + + +def request(req_object, osdf_config, grouped_policies, prov_status): + """ + Process a placement request from a Client (build Conductor API call, make the call, return result) + :param req_object: Request parameters from the client + :param osdf_config: Configuration specific to SNIRO application (core + deployment) + :param grouped_policies: policies related to placement (fetched based on request, and grouped by policy type) + :param prov_status: provStatus retrieved from Subscriber policy + :return: response from Conductor (accounting for redirects from Conductor service + """ + config = osdf_config.deployment + local_config = osdf_config.core + uid, passwd = config['conductorUsername'], config['conductorPassword'] + conductor_url = config['conductorUrl'] + req_id = req_object['requestInfo']['requestId'] + transaction_id = req_object['requestInfo']['transactionId'] + headers = dict(transaction_id=transaction_id) + + max_retries = config.get('conductorMaxRetries', 30) + ping_wait_time = config.get('conductorPingWaitTime', 60) + + rc = RestClient(userid=uid, passwd=passwd, method="GET", log_func=debug_log.debug, headers=headers) + conductor_req_json_str = conductor_api_builder(req_object, grouped_policies, local_config, prov_status) + conductor_req_json = json.loads(conductor_req_json_str) + + debug_log.debug("Sending first Conductor request for request_id {}".format(req_id)) + resp, raw_resp = initial_request_to_conductor(rc, conductor_url, conductor_req_json) + # Very crude way of keeping track of time. + # We are not counting initial request time, first call back, or time for HTTP request + total_time, ctr = 0, 2 + client_timeout = req_object['requestInfo']['timeout'] + configured_timeout = max_retries * ping_wait_time + max_timeout = min(client_timeout, configured_timeout) + + while True: # keep requesting conductor till we get a result or we run out of time + if resp is not None: + if resp["plans"][0].get("status") in ["error"]: + raise RequestException(response=raw_resp, request=raw_resp.request) + + if resp["plans"][0].get("status") in ["done", "not found"]: + if resp["plans"][0].get("recommendations"): + return conductor_response_processor(resp, raw_resp, req_id) + else: # "solved" but no solutions found + return conductor_no_solution_processor(resp, raw_resp, req_id) + new_url = resp['plans'][0]['links'][0][0]['href'] # TODO: check why a list of lists + + if total_time >= max_timeout: + raise BusinessException("Conductor could not provide a solution within {} seconds, this transaction is timing out".format(max_timeout)) + time.sleep(ping_wait_time) + ctr += 1 + debug_log.debug("Attempt number {} url {}; prior status={}".format(ctr, new_url, resp['plans'][0]['status'])) + total_time += ping_wait_time + + try: + raw_resp = rc.request(new_url, raw_response=True) + resp = raw_resp.json() + except RequestException as e: + debug_log.debug("Conductor attempt {} for request_id {} has failed because {}".format(ctr, req_id, str(e))) + + +def initial_request_to_conductor(rc, conductor_url, conductor_req_json): + """First steps in the request-redirect chain in making a call to Conductor + :param rc: REST client object for calling conductor + :param conductor_url: conductor's base URL to submit a placement request + :param conductor_req_json: request json object to send to Conductor + :return: URL to check for follow up (similar to redirects); we keep checking these till we get a result/error + """ + debug_log.debug("Payload to Conductor: {}".format(json.dumps(conductor_req_json))) + raw_resp = rc.request(url=conductor_url, raw_response=True, method="POST", json=conductor_req_json) + resp = raw_resp.json() + if resp["status"] != "template": + raise RequestException(response=raw_resp, request=raw_resp.request) + time.sleep(10) # 10 seconds wait time to avoid being too quick! + plan_url = resp["links"][0][0]["href"] + debug_log.debug("Attemping to read the plan from the conductor provided url {}".format(plan_url)) + raw_resp = rc.request(raw_response=True, url=plan_url) # TODO: check why a list of lists for links + resp = raw_resp.json() + + if resp["plans"][0]["status"] in ["error"]: + raise RequestException(response=raw_resp, request=raw_resp.request) + return resp, raw_resp # now the caller of this will handle further follow-ups + + +def conductor_response_processor(conductor_response, raw_response, req_id): + """Build a response object to be sent to client's callback URL from Conductor's response + This includes Conductor's placement optimization response, and required ASDC license artifacts + + :param conductor_response: JSON response from Conductor + :param raw_response: Raw HTTP response corresponding to above + :param req_id: Id of a request + :return: JSON object that can be sent to the client's callback URL + """ + composite_solutions = [] + name_map = {"physical-location-id": "cloudClli", "host_id": "vnfHostName", + "cloud_version": "cloudVersion", "cloud_owner": "cloudOwner"} + for reco in conductor_response['plans'][0]['recommendations']: + for resource in reco.keys(): + c = reco[resource]['candidate'] + solution = { + 'resourceModuleName': resource, + 'serviceResourceId': reco[resource]['service_resource_id'], + 'inventoryType': c['inventory_type'], + 'serviceInstanceId': c['candidate_id'] if c['inventory_type'] == "service" else "", + 'cloudRegionId': c['location_id'], + 'assignmentInfo': [] + } + + for key, value in reco[resource]['attributes'].items(): + try: + solution['assignmentInfo'].append({"variableName": name_map[key], "variableValue": value}) + except KeyError: + debug_log.debug("The key[{}] is not mapped and will not be returned in assignment info".format(key)) + + if c.get('host_id'): + solution['assignmentInfo'].append({'variableName': name_map['host_id'], 'variableValue': c['host_id']}) + composite_solutions.append(solution) + + request_state = conductor_response['plans'][0]['status'] + transaction_id = raw_response.headers.get('transaction_id', "") + status_message = conductor_response.get('plans')[0].get('message', "") + + solution_info = {} + if composite_solutions: + solution_info['placementInfo'] = composite_solutions + + resp = { + "transactionId": transaction_id, + "requestId": req_id, + "requestState": request_state, + "statusMessage": status_message, + "solutionInfo": solution_info + } + return resp + + +def conductor_no_solution_processor(conductor_response, raw_response, request_id, + template_placement_response="templates/plc_opt_response.jsont"): + """Build a response object to be sent to client's callback URL from Conductor's response + This is for case where no solution is found + + :param conductor_response: JSON response from Conductor + :param raw_response: Raw HTTP response corresponding to above + :param request_id: request Id associated with the client request (same as conductor response's "name") + :param template_placement_response: the template for generating response to client (plc_opt_response.jsont) + :return: JSON object that can be sent to the client's callback URL + """ + status_message = conductor_response["plans"][0].get("message") + templ = Template(open(template_placement_response).read()) + return json.loads(templ.render(composite_solutions=[], requestId=request_id, + transactionId=raw_response.headers.get('transaction_id', ""), + statusMessage=status_message, json=json)) + + diff --git a/optimizers/placementopt/conductor/remote_opt_processor.py b/optimizers/placementopt/conductor/remote_opt_processor.py new file mode 100644 index 0000000..f753a70 --- /dev/null +++ b/optimizers/placementopt/conductor/remote_opt_processor.py @@ -0,0 +1,79 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from requests import RequestException + +import traceback +from osdf.operation.error_handling import build_json_error_body +from osdf.logging.osdf_logging import metrics_log, MH, error_log +from osdf.optimizers.placementopt.conductor import conductor +from osdf.optimizers.licenseopt.simple_license_allocation import license_optim +from osdf.utils.interfaces import get_rest_client + + +def process_placement_opt(request_json, policies, osdf_config, prov_status): + """Perform the work for placement optimization (e.g. call SDC artifact and make conductor request) + NOTE: there is scope to make the requests to policy asynchronous to speed up overall performance + :param request_json: json content from original request + :param policies: flattened policies corresponding to this request + :param osdf_config: configuration specific to OSDF app + :param prov_status: provStatus retrieved from Subscriber policy + :return: None, but make a POST to callback URL + """ + + try: + rc = get_rest_client(request_json, service="so") + req_id = request_json["requestInfo"]["requestId"] + transaction_id = request_json['requestInfo']['transactionId'] + + metrics_log.info(MH.inside_worker_thread(req_id)) + license_info = None + if 'licenseDemand' in request_json['placementInfo']['demandInfo']: + license_info = license_optim(request_json) + + # Conductor only handles placement, only call Conductor if placementDemands exist + if 'placementDemand' in request_json['placementInfo']['demandInfo']: + metrics_log.info(MH.requesting("placement/conductor", req_id)) + placement_response = conductor.request(request_json, osdf_config, policies, prov_status) + if license_info: # Attach license solution if it exists + placement_response['solutionInfo']['licenseInfo'] = license_info + else: # License selection only scenario + placement_response = { + "transactionId": transaction_id, + "requestId": req_id, + "requestState": "complete", + "statusMessage": "License selection completed successfully", + "solutionInfo": {"licenseInfo": license_info} + } + except Exception as err: + error_log.error("Error for {} {}".format(req_id, traceback.format_exc())) + + try: + body = build_json_error_body(err) + metrics_log.info(MH.sending_response(req_id, "ERROR")) + rc.request(json=body, noresponse=True) + except RequestException: + error_log.error("Error sending asynchronous notification for {} {}".format(req_id, traceback.format_exc())) + return + + try: + metrics_log.info(MH.calling_back_with_body(req_id, rc.url,placement_response)) + rc.request(json=placement_response, noresponse=True) + except RequestException : # can't do much here but log it and move on + error_log.error("Error sending asynchronous notification for {} {}".format(req_id, traceback.format_exc())) + diff --git a/optimizers/placementopt/conductor/translation.py b/optimizers/placementopt/conductor/translation.py new file mode 100644 index 0000000..036398a --- /dev/null +++ b/optimizers/placementopt/conductor/translation.py @@ -0,0 +1,215 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +from osdf.utils.data_conversion import text_to_symbol +from osdf.utils import data_mapping + +def gen_optimization_policy(vnf_list, optimization_policy): + """Generate optimization policy details to pass to Conductor + :param vnf_list: List of vnf's to used in placement request + :param optimization_policy: optimization policy information provided in the incoming request + :return: List of optimization policies in a format required by Conductor + """ + optimization_policy_list = [] + for policy in optimization_policy: + content = policy['content'] + parameter_list = [] + + for attr in content['objectiveParameter']['parameterAttributes']: + parameter = attr['parameter'] if attr['parameter'] == "cloud_version" else attr['parameter']+"_between" + for res in attr['resource']: + vnf = get_matching_vnf(res, vnf_list) + value = [vnf] if attr['parameter'] == "cloud_version" else [attr['customerLocationInfo'], vnf] + parameter_list.append({ + attr['operator']: [attr['weight'], {parameter: value}] + }) + + optimization_policy_list.append({ + content['objective']: {content['objectiveParameter']['operator']: parameter_list } + }) + return optimization_policy_list + + +def get_matching_vnf(resource, vnf_list): + + for vnf in vnf_list: + if resource in vnf: + return vnf + return resource + + +def get_matching_vnfs(resources, vnf_list, match_type="intersection"): + """Get a list of matching VNFs from the list of resources + :param resources: + :param vnf_list: List of vnf's to used in placement request + :param match_type: "intersection" or "all" or "any" (any => send all_vnfs if there is any intersection) + :return: List of matching VNFs + """ + common_vnfs = [] + for vnf in vnf_list: + for resource in resources: + if resource in vnf: + common_vnfs.append(vnf) + if match_type == "intersection": # specifically requested intersection + return common_vnfs + elif common_vnfs or match_type == "all": # ("any" and common) OR "all" + return resources + return None + + +def gen_policy_instance(vnf_list, resource_policy, match_type="intersection", rtype=None): + """Generate a list of policies + :param vnf_list: List of vnf's to used in placement request + :param resource_policy: policy for this specific resource + :param match_type: How to match the vnf_names with the vnf_list (intersection or "any") + intersection => return intersection; "any" implies return all vnf_names if intersection is not null + :param rtype: resource type (e.g. resourceRegionProperty or resourceInstanceProperty) + None => no controller information added to the policy specification to Conductor + :return: resource policy list in a format required by Conductor + """ + resource_policy_list = [] + related_policies = [] + for policy in resource_policy: + pc = policy['content'] + demands = get_matching_vnfs(pc['resourceInstanceType'], vnf_list, match_type=match_type) + resource = {pc['identity']: {'type': pc['type'], 'demands': demands}} + + if rtype: + resource[pc['identity']]['properties'] = {'controller': pc[rtype]['controller'], + 'request': json.loads(pc[rtype]['request'])} + if demands and len(demands) != 0: + resource_policy_list.append(resource) + related_policies.append(policy) + return resource_policy_list, related_policies + + +def gen_resource_instance_policy(vnf_list, resource_instance_policy): + """Get policies governing resource instances in order to populate the Conductor API call""" + cur_policies, _ = gen_policy_instance(vnf_list, resource_instance_policy, rtype='resourceInstanceProperty') + return cur_policies + + +def gen_resource_region_policy(vnf_list, resource_region_policy): + """Get policies governing resource region in order to populate the Conductor API call""" + cur_policies, _ = gen_policy_instance(vnf_list, resource_region_policy, rtype='resourceRegionProperty') + return cur_policies + + +def gen_inventory_group_policy(vnf_list, inventory_group_policy): + """Get policies governing inventory group in order to populate the Conductor API call""" + cur_policies, _ = gen_policy_instance(vnf_list, inventory_group_policy, rtype=None) + return cur_policies + + +def gen_reservation_policy(vnf_list, reservation_policy): + """Get policies governing resource instances in order to populate the Conductor API call""" + cur_policies, _ = gen_policy_instance(vnf_list, reservation_policy, rtype='instanceReservationProperty') + return cur_policies + + +def gen_distance_to_location_policy(vnf_list, distance_to_location_policy): + """Get policies governing distance-to-location for VNFs in order to populate the Conductor API call""" + cur_policies, related_policies = gen_policy_instance(vnf_list, distance_to_location_policy, rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): # add additional fields to each policy + properties = p_main['content']['distanceToLocationProperty'] + pcp_d = properties['distanceCondition'] + p_new[p_main['content']['identity']]['properties'] = { + 'distance': text_to_symbol[pcp_d['operator']] + " " + pcp_d['value'].lower(), + 'location': properties['locationInfo'] + } + return cur_policies + + +def gen_attribute_policy(vnf_list, attribute_policy): + """Get policies governing attributes of VNFs in order to populate the Conductor API call""" + cur_policies, related_policies = gen_policy_instance(vnf_list, attribute_policy, rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): # add additional fields to each policy + properties = p_main['content']['cloudAttributeProperty'] + p_new[p_main['content']['identity']]['properties'] = { + 'evaluate': { + 'hypervisor': properties.get('hypervisor', ''), + 'cloud_version': properties.get('cloudVersion', ''), + 'cloud_type': properties.get('cloudType', ''), + 'dataplane': properties.get('dataPlane', ''), + 'network_roles': properties.get('networkRoles', ''), + 'complex': properties.get('complex', ''), + 'state': properties.get('state', ''), + 'country': properties.get('country', ''), + 'geo_region': properties.get('geoRegion', ''), + 'exclusivity_groups': properties.get('exclusivityGroups', ''), + 'replication_role': properties.get('replicationRole', '') + } + } + return cur_policies + + +def gen_zone_policy(vnf_list, zone_policy): + """Get zone policies in order to populate the Conductor API call""" + cur_policies, related_policies = gen_policy_instance(vnf_list, zone_policy, rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): # add additional fields to each policy + pmz = p_main['content']['zoneProperty'] + p_new[p_main['content']['identity']]['properties'] = {'category': pmz['category'], 'qualifier': pmz['qualifier']} + return cur_policies + + +def get_demand_properties(demand, policies): + """Get list demand properties objects (named tuples) from policy""" + def _get_candidates(candidate_info): + return [dict(inventory_type=x['candidateType'], candidate_id=x['candidates']) for x in candidate_info] + properties = [] + for policy in policies: + for resourceInstanceType in policy['content']['resourceInstanceType']: + if resourceInstanceType in demand['resourceModuleName']: + for x in policy['content']['property']: + property = dict(inventory_provider=x['inventoryProvider'], + inventory_type=x['inventoryType'], + service_resource_id=demand['serviceResourceId']) + if 'attributes' in x: + attributes = {} + for k,v in x['attributes'].items(): + key=data_mapping.convert(k) + attributes[key] = v + if(key=="model-invariant-id"): + attributes[key]=demand['resourceModelInfo']['modelInvariantId'] + elif(key=="model-version-id"): + attributes[key]=demand['resourceModelInfo']['modelVersionId'] + property.update({"attributes": attributes}) + if x['inventoryType'] == "cloud": + property['region'] = {'get_param': "CHOSEN_REGION"} + if 'exclusionCandidateInfo' in demand: + property['excluded_candidates'] = _get_candidates(demand['exclusionCandidateInfo']) + if 'requiredCandidateInfo' in demand: + property['required_candidates'] = _get_candidates(demand['requiredCandidateInfo']) + properties.append(property) + if len(properties) == 0: + properties.append(dict(customer_id="", service_type="", inventory_provider="", inventory_type="")) + return properties + + +def gen_demands(req_json, vnf_policies): + """Generate list of demands based on request and VNF policies + :param req_json: Request object from the client (e.g. MSO) + :param vnf_policies: Policies associated with demand resources (e.g. from grouped_policies['vnfPolicy']) + :return: list of demand parameters to populate the Conductor API call + """ + demand_dictionary = {} + for placementDemand in req_json['placementDemand']: + demand_dictionary.update({placementDemand['resourceModuleName']: get_demand_properties(placementDemand, vnf_policies)}) + + return demand_dictionary diff --git a/osdfapp.py b/osdfapp.py new file mode 100755 index 0000000..f854dca --- /dev/null +++ b/osdfapp.py @@ -0,0 +1,168 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +""" +OSDF Manager Main Flask Application +""" + +import sys +from threading import Thread # for scaling up, may need celery with RabbitMQ or redis + +from flask import Flask, request, Response, g + +import osdf +import pydevd +import json +import osdf.adapters.policy.interface +import osdf.config.credentials +import osdf.config.loader +import osdf.datasources.aai.aai_local_cached_data +import osdf.operation.error_handling +import osdf.operation.responses +import traceback +from osdf.adapters.policy.interface import get_policies +from osdf.adapters.response_parsing.aots_ueb_cm_data import aots_ds_ueb_listener +from osdf.config.base import osdf_config, DOCKER_CM_OPTIMIZER, AOTS_CM_MESSAGE_BUS +from osdf.optimizers.cmopt.rcscheduler.local_opt_processor import process_local_cm_scheduler_opt +from osdf.optimizers.placementopt.conductor.remote_opt_processor import process_placement_opt +from osdf.webapp.appcontroller import auth_basic +from optparse import OptionParser +from osdf.operation.exceptions import BusinessException +from osdf.operation.error_handling import request_exception_to_json_body, internal_error_message +from requests import RequestException +from schematics.exceptions import DataError +from osdf.logging.osdf_logging import MH, audit_log, error_log +from osdf.models.placementRequest import PlacementAPI +from osdf.models.schedulerRequest import SchedulerAPI + +ERROR_TEMPLATE = osdf.ERROR_TEMPLATE + +app = Flask(__name__) + + + +BAD_CLIENT_REQUEST_MESSAGE = 'Client sent an invalid request' + +# An exception explicitly raised due to some business rule +@app.errorhandler(BusinessException) +def handle_business_exception(e): + error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc())) + err_msg = ERROR_TEMPLATE.render(description=str(e)) + response = Response(err_msg, content_type='application/json; charset=utf-8') + response.status_code = 400 + return response + +# Returns a detailed synchronous message to the calling client when osdf fails due to a remote call to another system +@app.errorhandler(RequestException) +def handle_request_exception(e): + error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc())) + err_msg = request_exception_to_json_body(e) + response = Response(err_msg, content_type='application/json; charset=utf-8') + response.status_code = 400 + return response + +# Returns a detailed message to the calling client when the initial synchronous message is invalid +@app.errorhandler(DataError) +def handle_data_error(e): + error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc())) + + body_dictionary = { + "serviceException": { + "text": BAD_CLIENT_REQUEST_MESSAGE, + "exceptionMessage": str(e.messages), + "errorType": "InvalidClientRequest" + } + } + + body_as_json = json.dumps(body_dictionary) + response = Response(body_as_json, content_type='application/json; charset=utf-8') + response.status_code = 400 + return response + + +@app.route("/osdf/api/v2/placement", methods=["POST"]) +@auth_basic.login_required +def do_placement_opt(): + """Perform placement optimization after validating the request and fetching policies + Make a call to the call-back URL with the output of the placement request. + Note: Call to Conductor for placement optimization may have redirects, so account for them + """ + request_json = request.get_json() + req_id = request_json['requestInfo']['requestId'] + g.request_id = req_id + audit_log.info(MH.received_request(request.url, request.remote_addr, json.dumps(request_json))) + + PlacementAPI(request_json).validate() + + # Currently policies are being used only during placement, so only fetch them if placement demands is not empty + policies = {} + + if 'placementDemand' in request_json['placementInfo']['demandInfo']: + policies, prov_status = get_policies(request_json, "placement") + + audit_log.info(MH.new_worker_thread(req_id, "[for placement]")) + t = Thread(target=process_placement_opt, args=(request_json, policies, osdf_config, prov_status)) + t.start() + audit_log.info(MH.accepted_valid_request(req_id, request)) + return osdf.operation.responses.osdf_response_for_request_accept( + req_id=req_id, text="Accepted placement request. Response will be posted to callback URL") + + +# Returned when unexpected coding errors occur during initial synchronous processing +@app.errorhandler(500) +def interal_failure(error): + error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc())) + response = Response(internal_error_message, content_type='application/json; charset=utf-8') + response.status_code = 500 + return response + + +def getOptions(argv): + program_version_string = '%%prog %s' % ("v1.0") + #program_usage = '''usage: spam two eggs''' # optional - will be autogenerated by optparse + program_longdesc = "" + program_license = "" + + # setup option parser + parser = OptionParser(version=program_version_string, epilog=program_longdesc, description=program_license) + parser.add_option("-l", "--local", dest="local", help="run locally", action="store_true", default=False) + parser.add_option("-t", "--devtest", dest="devtest", help="run in dev/test environment", action="store_true", default=False) + parser.add_option("-d", "--debughost", dest="debughost", help="IP Address of host running debug server", default='') + parser.add_option("-p", "--debugport", dest="debugport", help="Port number of debug server", type=int, default=5678) + (opts, args) = parser.parse_args(argv) + if (opts.debughost != ''): + print('pydevd.settrace(%s, port=%s)' % (opts.debughost, opts.debugport)) + pydevd.settrace(opts.debughost, port=opts.debugport) + return opts + + +if __name__ == "__main__": + + sys_conf = osdf_config['core']['osdf_system'] + ports = sys_conf['osdf_ports'] + internal_port, external_port = ports['internal'], ports['external'] + ssl_context = tuple(sys_conf['ssl_context']) + + common_app_opts = dict(host='0.0.0.0', threaded=True, use_reloader=False) + + opts = getOptions(sys.argv) + if (opts.local == False and opts.devtest == False): # normal deployment + app.run(port=internal_port, ssl_context=ssl_context, debug=False, **common_app_opts) + else: + port = internal_port if opts.local == True else external_port + app.run(port=port, debug=True, **common_app_opts) diff --git a/osdfapp.sh b/osdfapp.sh new file mode 100755 index 0000000..c54d59c --- /dev/null +++ b/osdfapp.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +# Call osdf app after setting LD_LIBRARY_PATH for oracle client, postgres client, etc. + +cd $(dirname $0) + +# Environment variables below are for ORACLE_HOME and such things, and not needed for 1707 onwards +# . ../dependencies/env.sh + +bash ../etc/make-certs.sh # create the https certificates if they are not present + +set -e + +mkdir -p logs + +if [ ! -e "osdf-optim" ]; then +( + mkdir tmp + cd tmp + tar xzf ../../dependencies/SNIROOptimizationPack.tgz + mv osdf ../osdf-optim + cd ../osdf-optim/pywheels + pip install docopt* jsonschema* +) +cp etc/run-case-local.sh osdf-optim/run/ +fi + +if [ $# -ge 1 ]; then + export SNIRO_MANAGER_CONFIG_FILE="$1" # this file is passed by the DCAE controller +fi + +# export FLASK_APP=osdfapp.py + +# flask run +python osdfapp.py # running the app diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/utils/__init__.py diff --git a/utils/data_conversion.py b/utils/data_conversion.py new file mode 100644 index 0000000..2f678fa --- /dev/null +++ b/utils/data_conversion.py @@ -0,0 +1,62 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import itertools +from collections import defaultdict + +from dateutil import tz +from dateutil.parser import parse + + +def tuples_to_multi_val_dict(kvw_tuples, colnums=(0, 1)): + """Given a list of k,v tuples, get a dictionary of the form k -> [v1,v2,...,vn] + :param kvw_tuples: list of k,v,w tuples (e.g. [(k1,v1,a1), (k2,v2,a2), (k1,v3,a3), (k1,v4,a4)] + :param colnums: column numbers + :return: a dict of str:set, something like {k1: {v1, v3, v4}, k2: {v2}} or {k1: {a1, a3, a4}, k2: {a2}} + """ + res = defaultdict(set) + for x in kvw_tuples: + key, val = x[colnums[0]], x[colnums[1]] + res[key].add(val) + return dict((k, set(v)) for k, v in res.items()) + + +def tuples_to_dict(kvw_tuples, colnums=(0, 1)): + """Given a list of k,v tuples, get a dictionary of the form k -> v + :param kvw_tuples: list of k,v,w tuples (e.g. [(k1,v1,a1), (k2,v2,a2), (k3,v3,a3), (k1,v4,a4)] + :param colnums: column numbers + :return: a dict; something like {k1: v4, k2: v2, k3: v3} (note, k1 is repeated, so last val is retained) + """ + return dict((x[colnums[0]], x[colnums[1]]) for x in kvw_tuples) + + +def utc_time_from_ts(timestamp): + """Return corresponding UTC timestamp for a given ISO timestamp (or anything that parse accepts)""" + return parse(timestamp).astimezone(tz.tzutc()).strftime('%Y-%m-%d %H:%M:%S') + + +def list_flatten(l): + """Flatten a complex nested list of nested lists into a flat list""" + return itertools.chain(*[list_flatten(j) if isinstance(j, list) else [j] for j in l]) + + +text_to_symbol = { + 'greater': ">", + 'less': "<", + 'equal': "=" +} diff --git a/utils/data_types.py b/utils/data_types.py new file mode 100644 index 0000000..877d4a1 --- /dev/null +++ b/utils/data_types.py @@ -0,0 +1,30 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import collections + + +def list_like(obj): + """Check if an object is a list-like object, but not a string or dict""" + return isinstance(obj, collections.Sequence) and not isinstance(obj, (str, bytes)) + + +def dict_like(obj): + """Check if an object is a list-like object, but not a string or dict""" + return isinstance(obj, collections.Mapping) + diff --git a/utils/interfaces.py b/utils/interfaces.py new file mode 100644 index 0000000..7a0e3a9 --- /dev/null +++ b/utils/interfaces.py @@ -0,0 +1,90 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import requests + +from osdf.config.base import osdf_config, creds_prefixes +from osdf.logging.osdf_logging import MH, debug_log + + +def get_rest_client(request_json, service): + """Get a RestClient based on request_json's callback URL and osdf_config's credentials based on service name + :param request_json: + :param service: so or cm + :return: rc -- RestClient + """ + callback_url = request_json["requestInfo"]["callbackUrl"] + prefix = creds_prefixes[service] + config = osdf_config.deployment + c_userid, c_passwd = config[prefix + "Username"], config[prefix + "Password"] + return RestClient(url=callback_url, userid=c_userid, passwd=c_passwd) + + +class RestClient(object): + """Simple REST Client that supports get/post and basic auth""" + + def __init__(self, userid=None, passwd=None, log_func=None, url=None, timeout=None, headers=None, + method="POST", req_id=None): + self.auth = (userid, passwd) if userid and passwd else None + self.headers = headers if headers else {} + self.method = method + self.url = url + self.log_func = log_func + self.timeout = (30, 90) if timeout is None else timeout + self.req_id = req_id + + def add_headers(self, headers): + self.headers.update(headers) + + def request(self, url=None, method=None, asjson=True, ok_codes=(2, ), + raw_response=False, noresponse=False, timeout=None, **kwargs): + """ + :param url: REST end point to query + :param method: GET or POST (default is None => self.method) + :param asjson: whether the expected response is in json format + :param ok_codes: expected codes (prefix matching -- e.g. can be (20, 21, 32) or (2, 3)) + :param noresponse: If no response is expected (as long as response codes are OK) + :param raw_response: If we need just the raw response (e.g. conductor sends transaction IDs in headers) + :param timeout: Connection and read timeouts + :param kwargs: Other parameters + :return: + """ + if not self.req_id: + debug_log.debug("Requesting URL: {}".format(url or self.url)) + else: + debug_log.debug("Requesting URL: {} for request ID: {}".format(url or self.url, self.req_id)) + + res = requests.request(url=url or self.url, method=method or self.method, + auth=self.auth, headers=self.headers, + timeout=timeout or self.timeout, **kwargs) + + if self.log_func: + self.log_func(MH.received_http_response(res)) + + res_code = str(res.status_code) + if not any(res_code.startswith(x) for x in map(str, ok_codes)): + raise res.raise_for_status() + + if raw_response: + return res + elif noresponse: + return None + elif asjson: + return res.json() + else: + return res.content diff --git a/utils/local_processing.py b/utils/local_processing.py new file mode 100644 index 0000000..6768839 --- /dev/null +++ b/utils/local_processing.py @@ -0,0 +1,43 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import os + +from osdf.logging.osdf_logging import metrics_log, MH, warn_audit_error + + +def local_create_job_file(req_id, json_req, fname='osdf-req-data.json'): + """Creates a "work" folder for local processing and place relevant + job task file in there""" + + work_dir = 'osdf-optim/work/' + req_id + work_file = '{}/{}'.format(work_dir, fname) + try: + cur_task = "Making a local directory in the OSDF manager for req-id: {}".format(req_id) + metrics_log.info(MH.creating_local_env(cur_task)) + os.makedirs(work_dir, exist_ok=True) + except Exception as err: + warn_audit_error(MH.error_local_env(req_id, "Can't create directory {}".format(work_dir), err)) + return None + try: + with open(work_file, 'w') as fid: + fid.write(json_req['payload']) + return work_dir + except Exception as err: + warn_audit_error(MH.error_local_env(req_id, "can't create file {}".format(work_file), err)) + return None diff --git a/utils/programming_utils.py b/utils/programming_utils.py new file mode 100644 index 0000000..a0a8fde --- /dev/null +++ b/utils/programming_utils.py @@ -0,0 +1,105 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import collections +import itertools + + +class DotDict(dict): + """A dot-dict mixin to be able to access a dictionary via dot notation + source: https://stackoverflow.com/questions/2352181/how-to-use-a-dot-to-access-members-of-dictionary + """ + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + + +class MetaSingleton(type): + """Singleton class (2nd Chapter) from Learning Python Design Patterns - 2nd ed. + Chetan Giridhar, Packt Publ. 2016""" + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +def namedtuple_with_defaults(typename, field_names, default_values=()): + """A namedtuple with default values -- Stack overflow recipe from Mark Lodato + http://stackoverflow.com/questions/11351032/named-tuple-and-optional-keyword-arguments + :param typename: Name for the class (same as for namedtuple) + :param field_names: Field names (same as for namedtuple) + :param default_values: Can be specified as a dictionary or as a list + :return: New namedtuple object + """ + T = collections.namedtuple(typename, field_names) + T.__new__.__defaults__ = (None,) * len(T._fields) + if isinstance(default_values, collections.Mapping): + prototype = T(**default_values) + else: + prototype = T(*default_values) + T.__new__.__defaults__ = tuple(prototype) + return T + + +def dot_notation(dict_like, dot_spec): + """Return the value corresponding to the dot_spec from a dict_like object + :param dict_like: dictionary, JSON, etc. + :param dot_spec: a dot notation (e.g. a1.b1.c1.d1 => a1["b1"]["c1"]["d1"]) + :return: the value referenced by the dot_spec + """ + attrs = dot_spec.split(".") # we split the path + parent = dict_like.get(attrs[0]) + children = ".".join(attrs[1:]) + if not (parent and children): # if no children or no parent, bail out + return parent + if isinstance(parent, list): # here, we apply remaining path spec to all children + return [dot_notation(j, children) for j in parent] + elif isinstance(parent, dict): + return dot_notation(parent, children) + else: + return None + + +def list_flatten(l): + """ + Flatten a complex nested list of nested lists into a flat list (DFS). + For example, [ [1, 2], [[[2,3,4], [2,3,4]], [3,4,5, 'hello']]] + will produce [1, 2, 2, 3, 4, 2, 3, 4, 3, 4, 5, 'hello'] + """ + return list(itertools.chain(*[list_flatten(j) if isinstance(j, list) else [j] for j in l])) + + +def inverted_dict(keys: list, key_val_dict: dict) -> dict: + """ + Get val -> [keys] mapping for the given keys using key_val_dict + :param keys: the keys we are interested in (a list) + :param key_val_dict: the key -> val mapping + :return: inverted dictionary of val -> [keys] (for the subset dict of given keys) + """ + res = {} + all_tuples = ((k, key_val_dict[k] if k in key_val_dict else 'no-parent-' + k) for k in keys) + for k, v in all_tuples: + if v in res: + res[v].append(k) + else: + res[v] = [k] + # making sure to remove duplicate keys + res = dict((v, list(set(k_list))) for v, k_list in res.items()) + return res diff --git a/webapp/__init__.py b/webapp/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/webapp/__init__.py diff --git a/webapp/appcontroller.py b/webapp/appcontroller.py new file mode 100644 index 0000000..49f84ff --- /dev/null +++ b/webapp/appcontroller.py @@ -0,0 +1,47 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from flask import request +from flask_httpauth import HTTPBasicAuth +from flask import Response +import json +import osdf +from osdf.config.base import http_basic_auth_credentials + +auth_basic = HTTPBasicAuth() + +error_body = { + "serviceException": { + "text": "Unauthorized, check username and password" + } +} + +unauthorized_message = json.dumps(error_body) + +@auth_basic.get_password +def get_pw(username): + end_point = request.url.split('/')[-1] + auth_group = osdf.end_point_auth_mapping.get(end_point) + return http_basic_auth_credentials[auth_group].get(username) if auth_group else None + +@auth_basic.error_handler +def auth_error(): + response = Response(unauthorized_message, content_type='application/json; charset=utf-8') + response.headers.add('content-length', len(unauthorized_message)) + response.status_code = 401 + return response |