summaryrefslogtreecommitdiffstats
path: root/dcaeapplib/dcaeapplib/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'dcaeapplib/dcaeapplib/__init__.py')
-rw-r--r--dcaeapplib/dcaeapplib/__init__.py141
1 files changed, 141 insertions, 0 deletions
diff --git a/dcaeapplib/dcaeapplib/__init__.py b/dcaeapplib/dcaeapplib/__init__.py
new file mode 100644
index 0000000..e81b6fc
--- /dev/null
+++ b/dcaeapplib/dcaeapplib/__init__.py
@@ -0,0 +1,141 @@
+# org.onap.dcae
+# ============LICENSE_START====================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import base64
+import json
+import os
+import requests
+from threading import Thread
+import uuid
+from onap_dcae_cbs_docker_client.client import get_config
+
+try:
+ from http.server import BaseHTTPRequestHandler, HTTPServer
+except ImportError:
+ from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+
+_httpport = int(os.environ['DCAEPORT']) if 'DCAEPORT' in os.environ else 80
+_clientid = uuid.uuid4().hex
+_groupid = uuid.uuid4().hex
+
+class _handler(BaseHTTPRequestHandler):
+ def do_GET(self):
+ if '/healthcheck' == self.path:
+ if self.server.env._health():
+ self.send_response(202)
+ self.end_headers()
+ else:
+ self.send_error(503)
+ elif '/reconfigure' == self.path:
+ self.server.env._loadconfig()
+ self.server.env._reconf()
+ self.send_response(202)
+ self.end_headers()
+ else:
+ self.send_error(404)
+
+class DcaeEnv:
+ def __init__(self, healthCB = lambda:True, reconfigCB = lambda:None):
+ """
+ Initialize environment, but don't start web server or invoke any callbacks.
+ """
+ self._health = healthCB
+ self._reconf = reconfigCB
+ self._unread = {}
+ self._server = None
+ self._loadconfig()
+
+ def start(self):
+ """
+ Start web server to receive health checks and reconfigure requests
+ """
+ if self._server is not None:
+ return
+ self._server = HTTPServer(('', _httpport), _handler)
+ self._server.env = self
+ th = Thread(target=self._server.serve_forever, name='webserver')
+ th.daemon = True
+ th.start()
+
+ def stop(self):
+ """
+ Stop web server
+ """
+ if self._server is None:
+ return
+ self._server.shutdown()
+ self._server.env = None
+ self._server = None
+
+ def _loadconfig(self):
+ self._config = get_config()
+
+ def hasdata(self, stream):
+ """
+ Return whether there is any unprocessed received data for the specified
+ data stream. That is, if an earlier getdata() call returned more than
+ one record, and the additional records have not yet been retrieved.
+ """
+ return stream in self._unread
+
+ def getdata(self, stream, timeout_ms = 15000, limit = 10):
+ """
+ Try to retrieve data from Message Router for the specified data stream.
+ If no data is retrieved, within the specified timeout, return None.
+ """
+ sinfo = self._config['streams_subscribes'][stream]
+ if stream in self._unread:
+ x = self._unread[stream]
+ ret = x.pop()
+ if len(x) == 0:
+ del self._unread[stream]
+ return ret
+ gid = sinfo['client_id'] if 'client_id' in sinfo and sinfo['client_id'] else _groupid
+ resp = requests.get('{0}/{1}/{2}?timeout={3}&limit={4}'.format(sinfo['dmaap_info']['topic_url'], gid, _clientid, timeout_ms, limit), auth=(sinfo['aaf_username'], sinfo['aaf_password']))
+ resp.raise_for_status()
+ x = resp.json()
+ if len(x) == 0:
+ return None
+ if len(x) == 1:
+ return x[0]
+ x.reverse()
+ ret = x.pop()
+ self._unread[stream] = x
+ return ret
+
+ def senddata(self, stream, partition, data):
+ """
+ Publish data to the specified stream.
+ """
+ sinfo = self._config['streams_publishes'][stream]
+ body = '{0}.{1}.{2}{3}'.format(len(partition), len(data), partition, data)
+ resp = requests.post('{0}'.format(sinfo['dmaap_info']['topic_url']), auth=(sinfo['aaf_username'], sinfo['aaf_password']), headers={'Content-Type': 'application/cambria'}, data=body)
+ resp.raise_for_status()
+
+ def getconfig(self):
+ """
+ Get the latest version of the configuration data.
+ """
+ return self._config
+
+def reconfigure():
+ """
+ Make the web request to reconfigure (locally)
+ """
+ requests.get('http://localhost:{0}/reconfigure'.format(_httpport))