diff options
author | Sastry Isukapalli <sastry@research.att.com> | 2018-01-11 14:53:14 -0500 |
---|---|---|
committer | Sastry Isukapalli <sastry@research.att.com> | 2018-01-11 14:54:33 -0500 |
commit | e91ad0d899121fb190391865422ea6424ffc29ca (patch) | |
tree | 33dc30947fff1dcec8b24a5506b36701ee0d4b08 /adapters | |
parent | d51c20ced007294d86e560f3bc9f5812cc17f193 (diff) |
Seed code for other adapters
Issue-ID: OPTFRA-47
Change-Id: I33595df23f077b2a9366c45b69afecdf80beaaf8
Signed-off-by: Sastry Isukapalli <sastry@research.att.com>
Diffstat (limited to 'adapters')
-rw-r--r-- | adapters/__init__.py | 0 | ||||
-rw-r--r-- | adapters/database/OracleDB.py | 32 | ||||
-rw-r--r-- | adapters/database/PostgresDB.py | 31 | ||||
-rw-r--r-- | adapters/database/VerticaDB.py | 55 | ||||
-rw-r--r-- | adapters/database/__init__.py | 0 | ||||
-rw-r--r-- | adapters/dcae/__init__.py | 0 | ||||
-rwxr-xr-x | adapters/dcae/message_router.py | 100 | ||||
-rw-r--r-- | adapters/local_data/__init__.py | 0 | ||||
-rw-r--r-- | adapters/local_data/local_policies.py | 40 | ||||
-rw-r--r-- | adapters/request_parsing/__init__.py | 0 | ||||
-rw-r--r-- | adapters/request_parsing/placement.py | 33 |
11 files changed, 291 insertions, 0 deletions
diff --git a/adapters/__init__.py b/adapters/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/__init__.py diff --git a/adapters/database/OracleDB.py b/adapters/database/OracleDB.py new file mode 100644 index 0000000..655dd27 --- /dev/null +++ b/adapters/database/OracleDB.py @@ -0,0 +1,32 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import cx_Oracle + +from osdf.utils.programming_utils import MetaSingleton + + +class OracleDB(metaclass=MetaSingleton): + conn, cur = None, None + + def connect(self, host=None, sid=None, user=None, passwd=None, port=5432): + if self.conn is None: + tns_info = cx_Oracle.makedsn(host=host, port=port, sid=sid) + self.conn = cx_Oracle.connect(user=user, password=passwd, dsn=tns_info, threaded=True) + self.cur = self.conn.cursor() + return self.conn, self.cur diff --git a/adapters/database/PostgresDB.py b/adapters/database/PostgresDB.py new file mode 100644 index 0000000..6689566 --- /dev/null +++ b/adapters/database/PostgresDB.py @@ -0,0 +1,31 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import psycopg2 + +from osdf.utils.programming_utils import MetaSingleton + + +class PostgresDB(metaclass=MetaSingleton): + conn, cur = None, None + + def connect(self, host=None, db=None, user=None, passwd=None, port=5432): + if self.conn is None: + self.conn = psycopg2.connect(host=host, port=port, user=user, password=passwd, database=db) + self.cur = self.conn.cursor() + return self.conn, self.cur diff --git a/adapters/database/VerticaDB.py b/adapters/database/VerticaDB.py new file mode 100644 index 0000000..ad961d7 --- /dev/null +++ b/adapters/database/VerticaDB.py @@ -0,0 +1,55 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import jaydebeapi +import sqlalchemy.pool as pool + +from jaydebeapi import _DEFAULT_CONVERTERS, _java_to_py +from osdf.utils.programming_utils import MetaSingleton +from osdf.config.base import osdf_config + +_DEFAULT_CONVERTERS.update({'BIGINT': _java_to_py('longValue')}) + + +class VerticaDB(metaclass=MetaSingleton): + connection_pool = None + + def get_connection(self): + p = self.get_config_params() + c = jaydebeapi.connect( + 'com.vertica.jdbc.Driver', + 'jdbc:vertica://{}:{}/{}'.format(p['host'], p['port'], p['db']), + {'user': p['user'], 'password': p['passwd'], 'CHARSET': 'UTF8'}, + jars=[p['db_driver']] + ) + return c + + def get_config_params(self): + config = osdf_config["deployment"] + host, port, db = config["verticaHost"], config["verticaPort"], config.get("verticaDB") + user, passwd = config["verticaUsername"], config["verticaPassword"] + jar_path = osdf_config['core']['osdf_system']['vertica_jar'] + params = dict(host=host, db=db, user=user, passwd=passwd, port=port, db_driver=jar_path) + return params + + def connect(self): + if self.connection_pool is None: + self.connection_pool = pool.QueuePool(self.get_connection, max_overflow=10, pool_size=5, recycle=600) + conn = self.connection_pool.connect() + cursor = conn.cursor() + return conn, cursor diff --git a/adapters/database/__init__.py b/adapters/database/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/database/__init__.py diff --git a/adapters/dcae/__init__.py b/adapters/dcae/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/dcae/__init__.py diff --git a/adapters/dcae/message_router.py b/adapters/dcae/message_router.py new file mode 100755 index 0000000..e495331 --- /dev/null +++ b/adapters/dcae/message_router.py @@ -0,0 +1,100 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import requests +from osdf.utils.data_types import list_like +from osdf.operation.exceptions import MessageBusConfigurationException + + +class MessageRouterClient(object): + def __init__(self, + dmaap_url=None, + mr_host_base_urls=None, + topic=None, + consumer_group=None, consumer_id=None, + timeout_ms=15000, fetch_limit=1000, + userid=None, passwd=None): + """ + :param dmaap_url: protocol, host and port; mostly for UEB + (e.g. https://dcae-msrt-ftl.homer.att.com:3905/) + :param mr_host_base_urls: for DMaaP, we get a topic URL (base_url + events/topic_name) + (e.g. https://dcae-msrt-ftl.homer.att.com:3905/events/com.att.dcae.dmaap.FTL.SNIRO-CM-SCHEDULER-RESPONSE) + :param consumer_group: DMaaP/UEB consumer group (unique for each subscriber; required for GET) + :param consumer_id: DMaaP/UEB consumer ID (unique for each thread/process for a subscriber; required for GET) + :param timeout_ms: (optional, default 15 seconds or 15,000 ms) server-side timeout for GET request + :param fetch_limit: (optional, default 1000 messages per request for GET), ignored for "POST" + :param userid: (optional, userid for HTTP basic authentication) + :param passwd: (optional, password for HTTP basic authentication) + """ + mr_error = MessageBusConfigurationException + if dmaap_url is None: # definitely not DMaaP, so use UEB mode + self.is_dmaap = False + if not (mr_host_base_urls and list_like(mr_host_base_urls)): + raise mr_error("Not a DMaaP or UEB configuration") + if not topic: + raise mr_error("Invalid topic: '{}'",format(topic)) + self.topic_urls = ["{}/events/{}".format(base_url, topic) for base_url in mr_host_base_urls] + else: + self.is_dmaap = True + self.topic_urls = [dmaap_url] + + self.timeout_ms = timeout_ms + self.fetch_limit = fetch_limit + self.auth = (userid, passwd) if userid and passwd else None + self.consumer_group = consumer_group + self.consumer_id = consumer_id + + def get(self, outputjson=True): + """Fetch messages from message router (DMaaP or UEB) + :param outputjson: (optional, specifies if response is expected to be in json format), ignored for "POST" + :return: response as a json object (if outputjson is True) or as a string + """ + url_fmt = "{topic_url}/{cgroup}/{cid}?timeout={timeout_ms}&limit={limit}" + urls = [url_fmt.format(topic_url=x, timeout_ms=self.timeout_ms, limit=self.fetch_limit, + cgroup=self.consumer_group, cid=self.consumer_id) for x in self.topic_urls] + for url in urls[:-1]: + try: + return self.http_request(method='GET', url=url, outputjson=outputjson) + except: + pass + return self.http_request(method='GET', url=urls[-1], outputjson=outputjson) + + def post(self, msg, inputjson=True): + for url in self.topic_urls[:-1]: + try: + return self.http_request(method='POST', url=url, inputjson=inputjson, msg=msg) + except: + pass + return self.http_request(method='POST', url=self.topic_urls[-1], inputjson=inputjson, msg=msg) + + def http_request(self, url, method, inputjson=True, outputjson=True, msg=None, **kwargs): + """ + Perform the actual URL request (GET or POST), and do error handling + :param url: full URL (including topic, limit, timeout, etc.) + :param method: GET or POST + :param inputjson: Specify whether input is in json format (valid only for POST) + :param outputjson: Is response expected in a json format + :param msg: content to be posted (valid only for POST) + :return: response as a json object (if outputjson or POST) or as a string; None if error + """ + res = requests.request(url=url, method=method, auth=self.auth, **kwargs) + if res.status_code == requests.codes.ok: + return res.json() if outputjson or method == "POST" else res.content + else: + raise Exception("HTTP Response Error: code {}; headers:{}, content: {}".format( + res.status_code, res.headers, res.content)) diff --git a/adapters/local_data/__init__.py b/adapters/local_data/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/local_data/__init__.py diff --git a/adapters/local_data/local_policies.py b/adapters/local_data/local_policies.py new file mode 100644 index 0000000..c63ae5a --- /dev/null +++ b/adapters/local_data/local_policies.py @@ -0,0 +1,40 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +import os + + +def get_local_policies(local_policy_folder, local_policy_list, policy_id_list=None): + """ + Get policies from a local file system. + Required for the following scenarios: + (a) doing work-arounds (e.g. if we are asked to drop some policies for testing purposes) + (b) work-arounds when policy platform is giving issues (e.g. if dev/IST policies are wiped out in an upgrade) + :param local_policy_folder: where the policy files are present + :param local_policy_list: list of local policies + :param policy_id_list: list of policies to get (if unspecified or None, get all) + :return: get policies + """ + policies = [] + for fname in local_policy_list: # ugly removal of .json from file name + if policy_id_list and fname[:-5] not in policy_id_list: + continue + with open(os.path.join(local_policy_folder, fname)) as fid: + policies.append(json.load(fid)) + return policies diff --git a/adapters/request_parsing/__init__.py b/adapters/request_parsing/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/adapters/request_parsing/__init__.py diff --git a/adapters/request_parsing/placement.py b/adapters/request_parsing/placement.py new file mode 100644 index 0000000..d7a6575 --- /dev/null +++ b/adapters/request_parsing/placement.py @@ -0,0 +1,33 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import copy +import json +from osdf.utils.programming_utils import list_flatten, dot_notation + + +def json_path_after_expansion(req_json, reference): + """ + Get the child node(s) from the dot-notation [reference] and parent [req_json]. + For placement and other requests, there are encoded JSONs inside the request or policy, + so we need to expand it and then do a search over the parent plus expanded JSON. + """ + req_json_copy = copy.deepcopy(req_json) # since we expand the JSON in place, we work on a copy + req_json_copy['placementInfo']['orderInfo'] = json.loads(req_json_copy['placementInfo']['orderInfo']) + info = dot_notation(req_json_copy, reference) + return list_flatten(info) if isinstance(info, list) else info |