From 8c776e128cf557490db59525b29a608e30e637be Mon Sep 17 00:00:00 2001 From: Andrew Gauld Date: Fri, 16 Feb 2018 12:22:23 -0500 Subject: Initial version of dcaeapplib package Change-Id: I54db903e59a67eb4423cacf10ea957d148b4f853 Issue-ID: DCAEGEN2-354 Signed-off-by: Andrew Gauld --- dcaeapplib/dcaeapplib/__init__.py | 141 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 dcaeapplib/dcaeapplib/__init__.py (limited to 'dcaeapplib/dcaeapplib/__init__.py') diff --git a/dcaeapplib/dcaeapplib/__init__.py b/dcaeapplib/dcaeapplib/__init__.py new file mode 100644 index 0000000..e81b6fc --- /dev/null +++ b/dcaeapplib/dcaeapplib/__init__.py @@ -0,0 +1,141 @@ +# org.onap.dcae +# ============LICENSE_START==================================================== +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import base64 +import json +import os +import requests +from threading import Thread +import uuid +from onap_dcae_cbs_docker_client.client import get_config + +try: + from http.server import BaseHTTPRequestHandler, HTTPServer +except ImportError: + from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + +_httpport = int(os.environ['DCAEPORT']) if 'DCAEPORT' in os.environ else 80 +_clientid = uuid.uuid4().hex +_groupid = uuid.uuid4().hex + +class _handler(BaseHTTPRequestHandler): + def do_GET(self): + if '/healthcheck' == self.path: + if self.server.env._health(): + self.send_response(202) + self.end_headers() + else: + self.send_error(503) + elif '/reconfigure' == self.path: + self.server.env._loadconfig() + self.server.env._reconf() + self.send_response(202) + self.end_headers() + else: + self.send_error(404) + +class DcaeEnv: + def __init__(self, healthCB = lambda:True, reconfigCB = lambda:None): + """ + Initialize environment, but don't start web server or invoke any callbacks. + """ + self._health = healthCB + self._reconf = reconfigCB + self._unread = {} + self._server = None + self._loadconfig() + + def start(self): + """ + Start web server to receive health checks and reconfigure requests + """ + if self._server is not None: + return + self._server = HTTPServer(('', _httpport), _handler) + self._server.env = self + th = Thread(target=self._server.serve_forever, name='webserver') + th.daemon = True + th.start() + + def stop(self): + """ + Stop web server + """ + if self._server is None: + return + self._server.shutdown() + self._server.env = None + self._server = None + + def _loadconfig(self): + self._config = get_config() + + def hasdata(self, stream): + """ + Return whether there is any unprocessed received data for the specified + data stream. That is, if an earlier getdata() call returned more than + one record, and the additional records have not yet been retrieved. + """ + return stream in self._unread + + def getdata(self, stream, timeout_ms = 15000, limit = 10): + """ + Try to retrieve data from Message Router for the specified data stream. + If no data is retrieved, within the specified timeout, return None. + """ + sinfo = self._config['streams_subscribes'][stream] + if stream in self._unread: + x = self._unread[stream] + ret = x.pop() + if len(x) == 0: + del self._unread[stream] + return ret + gid = sinfo['client_id'] if 'client_id' in sinfo and sinfo['client_id'] else _groupid + resp = requests.get('{0}/{1}/{2}?timeout={3}&limit={4}'.format(sinfo['dmaap_info']['topic_url'], gid, _clientid, timeout_ms, limit), auth=(sinfo['aaf_username'], sinfo['aaf_password'])) + resp.raise_for_status() + x = resp.json() + if len(x) == 0: + return None + if len(x) == 1: + return x[0] + x.reverse() + ret = x.pop() + self._unread[stream] = x + return ret + + def senddata(self, stream, partition, data): + """ + Publish data to the specified stream. + """ + sinfo = self._config['streams_publishes'][stream] + body = '{0}.{1}.{2}{3}'.format(len(partition), len(data), partition, data) + resp = requests.post('{0}'.format(sinfo['dmaap_info']['topic_url']), auth=(sinfo['aaf_username'], sinfo['aaf_password']), headers={'Content-Type': 'application/cambria'}, data=body) + resp.raise_for_status() + + def getconfig(self): + """ + Get the latest version of the configuration data. + """ + return self._config + +def reconfigure(): + """ + Make the web request to reconfigure (locally) + """ + requests.get('http://localhost:{0}/reconfigure'.format(_httpport)) -- cgit 1.2.3-korg