# org.onap.dcae # ============LICENSE_START==================================================== # Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. # ============================================================================= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END====================================================== # # ECOMP is a trademark and service mark of AT&T Intellectual Property. import base64 import json import os import requests from threading import Thread import uuid from onap_dcae_cbs_docker_client.client import get_config try: from http.server import BaseHTTPRequestHandler, HTTPServer except ImportError: from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer _httpport = int(os.environ['DCAEPORT']) if 'DCAEPORT' in os.environ else 80 _clientid = uuid.uuid4().hex _groupid = uuid.uuid4().hex class _handler(BaseHTTPRequestHandler): def do_GET(self): if '/healthcheck' == self.path: if self.server.env._health(): self.send_response(202) self.end_headers() else: self.send_error(503) elif '/reconfigure' == self.path: self.server.env._loadconfig() self.server.env._reconf() self.send_response(202) self.end_headers() else: self.send_error(404) def _genauth(sinfo): """ Return authentication parameters for stream, if present. """ user = sinfo['aaf_username'] if 'aaf_username' in sinfo else None password = sinfo['aaf_password'] if 'aaf_password' in sinfo else None if user and password: return { 'auth': (user, password) } else: return {} class DcaeEnv: def __init__(self, healthCB = lambda:True, reconfigCB = lambda:None): """ Initialize environment, but don't start web server or invoke any callbacks. """ self._health = healthCB self._reconf = reconfigCB self._unread = {} self._server = None self._loadconfig() def start(self): """ Start web server to receive health checks and reconfigure requests """ if self._server is not None: return self._server = HTTPServer(('', _httpport), _handler) self._server.env = self th = Thread(target=self._server.serve_forever, name='webserver') th.daemon = True th.start() def stop(self): """ Stop web server """ if self._server is None: return self._server.shutdown() self._server.env = None self._server = None def _loadconfig(self): self._config = get_config() def hasdata(self, stream): """ Return whether there is any unprocessed received data for the specified data stream. That is, if an earlier getdata() call returned more than one record, and the additional records have not yet been retrieved. """ return stream in self._unread def getdata(self, stream, timeout_ms = 15000, limit = 10): """ Try to retrieve data from Message Router for the specified data stream. If no data is retrieved, within the specified timeout, return None. """ sinfo = self._config['streams_subscribes'][stream] if stream in self._unread: x = self._unread[stream] ret = x.pop() if len(x) == 0: del self._unread[stream] return ret gid = sinfo['client_id'] if 'client_id' in sinfo and sinfo['client_id'] else _groupid resp = requests.get('{0}/{1}/{2}?timeout={3}&limit={4}'.format(sinfo['dmaap_info']['topic_url'], gid, _clientid, timeout_ms, limit), **_genauth(sinfo)) resp.raise_for_status() x = resp.json() if len(x) == 0: return None if len(x) == 1: return x[0] x.reverse() ret = x.pop() self._unread[stream] = x return ret def senddata(self, stream, partition, data): """ Publish data to the specified stream. """ sinfo = self._config['streams_publishes'][stream] body = '{0}.{1}.{2}{3}'.format(len(partition), len(data), partition, data) resp = requests.post('{0}'.format(sinfo['dmaap_info']['topic_url']), headers={'Content-Type': 'application/cambria'}, data=body, **_genauth(sinfo)) resp.raise_for_status() def getconfig(self): """ Get the latest version of the configuration data. """ return self._config def reconfigure(): """ Make the web request to reconfigure (locally) """ requests.get('http://localhost:{0}/reconfigure'.format(_httpport))