From 8c776e128cf557490db59525b29a608e30e637be Mon Sep 17 00:00:00 2001 From: Andrew Gauld Date: Fri, 16 Feb 2018 12:22:23 -0500 Subject: Initial version of dcaeapplib package Change-Id: I54db903e59a67eb4423cacf10ea957d148b4f853 Issue-ID: DCAEGEN2-354 Signed-off-by: Andrew Gauld --- dcaeapplib/ChangeLog.md | 6 + dcaeapplib/LICENSE.txt | 17 +++ dcaeapplib/README.md | 64 ++++++++++ dcaeapplib/dcaeapplib/__init__.py | 141 +++++++++++++++++++++ dcaeapplib/pom.xml | 241 ++++++++++++++++++++++++++++++++++++ dcaeapplib/setup.py | 39 ++++++ dcaeapplib/tests/test_dcaeapplib.py | 107 ++++++++++++++++ dcaeapplib/tox-local.ini | 11 ++ dcaeapplib/tox.ini | 11 ++ pom.xml | 1 + 10 files changed, 638 insertions(+) create mode 100644 dcaeapplib/ChangeLog.md create mode 100644 dcaeapplib/LICENSE.txt create mode 100644 dcaeapplib/README.md create mode 100644 dcaeapplib/dcaeapplib/__init__.py create mode 100644 dcaeapplib/pom.xml create mode 100644 dcaeapplib/setup.py create mode 100644 dcaeapplib/tests/test_dcaeapplib.py create mode 100644 dcaeapplib/tox-local.ini create mode 100644 dcaeapplib/tox.ini diff --git a/dcaeapplib/ChangeLog.md b/dcaeapplib/ChangeLog.md new file mode 100644 index 0000000..0942537 --- /dev/null +++ b/dcaeapplib/ChangeLog.md @@ -0,0 +1,6 @@ +# Change Log + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). diff --git a/dcaeapplib/LICENSE.txt b/dcaeapplib/LICENSE.txt new file mode 100644 index 0000000..8bdab89 --- /dev/null +++ b/dcaeapplib/LICENSE.txt @@ -0,0 +1,17 @@ +============LICENSE_START======================================================= +Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +================================================================================ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END========================================================= + +ECOMP is a trademark and service mark of AT&T Intellectual Property. diff --git a/dcaeapplib/README.md b/dcaeapplib/README.md new file mode 100644 index 0000000..81fd825 --- /dev/null +++ b/dcaeapplib/README.md @@ -0,0 +1,64 @@ +# dcaeapplib + +Library for building DCAE analytics applications + +# Example use: + +``` + +class myapp: + def __init__(self): + # get the environment, and register callbacks for health checks and + # configuration changes + self.dcaeenv = dcaeapplib.DcaeEnv(healthCB=self.isHealthy, reconfigCB=self.reconfigure) + # simulate a configuration change - we want the initial configuration + self.reconfigure() + # start the environment (to wait for health checks and config changes) + self.dcaeenv.start() + # begin processing loop + while True: + if self.configchanged: + # fetch the updated configuration + self.conig = self.dcaeenv.getconfig() + self.configchanged = False + # do any setup relevant to the configuration + data = self.dcaeenv.getdata('myinputstream') + # Data is a UTF-8 string, 'myinputstream' is this application's logical + # name for this (one of potentially several) data sources. + # Can also specify timeout_ms and limit as arguments. + # timeout_ms (default 15,000) is the maximum time getdata() will block + # limit is the maximum number of records retrieved at a time. + # If no data is retrieved (timeout) the return will be None. + if data is not None: + # do something to process the data + self.dcaeenv.senddata('myoutputstream', 'somepartitionkey', data) + # data is a string, 'myoutputstream' is the application's logical + # name for this (one of potentially several) data sinks. + # somepartitionkey is used, by Kafka, to assign the data to a partition. + + def isHealthy(self): + # return the health of the application (True/False) + return True + + def reconfigure(self): + # Do whatever needs to be done to handle a configuration change + self.configchanged = True +``` + +# Environment Variables + +This library uses the onap-dcae-cbs-docker-client library to fetch +configuration. That library relies on the HOSTNAME and CONSUL_HOST +environment variables to find the configuration. + +This library provides an HTTP interface for health checks. By default this +listens on port 80 but can be overridden to use another port by setting the +DCAEPORT environment variable. The HTTP interface supports getting 2 URLs: +/healthcheck, which will return a status of 202 (Accepted) for healthy, +and 503 (Service Unavailable) for unhealthy, and /reconfigure, which triggers +the library to check for updated configuration. + +# Console Commands + +This library provides a single console command "reconfigure.sh" which +performs an HTTP get of the /reconfigure URL diff --git a/dcaeapplib/dcaeapplib/__init__.py b/dcaeapplib/dcaeapplib/__init__.py new file mode 100644 index 0000000..e81b6fc --- /dev/null +++ b/dcaeapplib/dcaeapplib/__init__.py @@ -0,0 +1,141 @@ +# org.onap.dcae +# ============LICENSE_START==================================================== +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import base64 +import json +import os +import requests +from threading import Thread +import uuid +from onap_dcae_cbs_docker_client.client import get_config + +try: + from http.server import BaseHTTPRequestHandler, HTTPServer +except ImportError: + from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + +_httpport = int(os.environ['DCAEPORT']) if 'DCAEPORT' in os.environ else 80 +_clientid = uuid.uuid4().hex +_groupid = uuid.uuid4().hex + +class _handler(BaseHTTPRequestHandler): + def do_GET(self): + if '/healthcheck' == self.path: + if self.server.env._health(): + self.send_response(202) + self.end_headers() + else: + self.send_error(503) + elif '/reconfigure' == self.path: + self.server.env._loadconfig() + self.server.env._reconf() + self.send_response(202) + self.end_headers() + else: + self.send_error(404) + +class DcaeEnv: + def __init__(self, healthCB = lambda:True, reconfigCB = lambda:None): + """ + Initialize environment, but don't start web server or invoke any callbacks. + """ + self._health = healthCB + self._reconf = reconfigCB + self._unread = {} + self._server = None + self._loadconfig() + + def start(self): + """ + Start web server to receive health checks and reconfigure requests + """ + if self._server is not None: + return + self._server = HTTPServer(('', _httpport), _handler) + self._server.env = self + th = Thread(target=self._server.serve_forever, name='webserver') + th.daemon = True + th.start() + + def stop(self): + """ + Stop web server + """ + if self._server is None: + return + self._server.shutdown() + self._server.env = None + self._server = None + + def _loadconfig(self): + self._config = get_config() + + def hasdata(self, stream): + """ + Return whether there is any unprocessed received data for the specified + data stream. That is, if an earlier getdata() call returned more than + one record, and the additional records have not yet been retrieved. + """ + return stream in self._unread + + def getdata(self, stream, timeout_ms = 15000, limit = 10): + """ + Try to retrieve data from Message Router for the specified data stream. + If no data is retrieved, within the specified timeout, return None. + """ + sinfo = self._config['streams_subscribes'][stream] + if stream in self._unread: + x = self._unread[stream] + ret = x.pop() + if len(x) == 0: + del self._unread[stream] + return ret + gid = sinfo['client_id'] if 'client_id' in sinfo and sinfo['client_id'] else _groupid + resp = requests.get('{0}/{1}/{2}?timeout={3}&limit={4}'.format(sinfo['dmaap_info']['topic_url'], gid, _clientid, timeout_ms, limit), auth=(sinfo['aaf_username'], sinfo['aaf_password'])) + resp.raise_for_status() + x = resp.json() + if len(x) == 0: + return None + if len(x) == 1: + return x[0] + x.reverse() + ret = x.pop() + self._unread[stream] = x + return ret + + def senddata(self, stream, partition, data): + """ + Publish data to the specified stream. + """ + sinfo = self._config['streams_publishes'][stream] + body = '{0}.{1}.{2}{3}'.format(len(partition), len(data), partition, data) + resp = requests.post('{0}'.format(sinfo['dmaap_info']['topic_url']), auth=(sinfo['aaf_username'], sinfo['aaf_password']), headers={'Content-Type': 'application/cambria'}, data=body) + resp.raise_for_status() + + def getconfig(self): + """ + Get the latest version of the configuration data. + """ + return self._config + +def reconfigure(): + """ + Make the web request to reconfigure (locally) + """ + requests.get('http://localhost:{0}/reconfigure'.format(_httpport)) diff --git a/dcaeapplib/pom.xml b/dcaeapplib/pom.xml new file mode 100644 index 0000000..878e7c6 --- /dev/null +++ b/dcaeapplib/pom.xml @@ -0,0 +1,241 @@ + + + + 4.0.0 + + org.onap.dcaegen2.utils + utils + 1.2.0-SNAPSHOT + + org.onap.dcaegen2.utils + dcaeapplib + dcaeapplib + 0.0.3-SNAPSHOT + http://maven.apache.org + + + UTF-8 + + . + xunit-results.xml + coverage.xml + py + Python + **/*.py + tests/* + + + ${project.artifactId}-${project.version} + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + + true + + + + org.apache.maven.plugins + maven-deploy-plugin + + 2.8 + + true + + + + + + org.apache.maven.plugins + maven-resources-plugin + 2.6 + + true + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + true + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + default-jar + + + + + + + org.apache.maven.plugins + maven-install-plugin + 2.4 + + true + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + true + + + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + clean phase script + clean + + exec + + + + ${project.artifactId} + clean + + + + + generate-sources script + generate-sources + + exec + + + + ${project.artifactId} + generate-sources + + + + + compile script + compile + + exec + + + + ${project.artifactId} + compile + + + + + package script + package + + exec + + + + ${project.artifactId} + package + + + + + test script + test + + exec + + + + ${project.artifactId} + test + + + + + install script + install + + exec + + + + ${project.artifactId} + install + + + + + deploy script + deploy + + exec + + + + ${project.artifactId} + deploy + + + + + + + + diff --git a/dcaeapplib/setup.py b/dcaeapplib/setup.py new file mode 100644 index 0000000..a19fa7c --- /dev/null +++ b/dcaeapplib/setup.py @@ -0,0 +1,39 @@ +# org.onap.dcae +# ============LICENSE_START==================================================== +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +from setuptools import setup, find_packages + +setup( + name='dcaeapplib', + version='0.0.3', + packages=find_packages(), + author = 'Andrew Gauld', + author_email = 'ag1282@att.com', + description = ('Library for building DCAE analytics applications'), + license = 'Apache 2.0', + keywords = '', + url = '', + zip_safe = True, + install_requires=[ 'onap-dcae-cbs-docker-client>=0.0.2' ], + entry_points = { + 'console_scripts': [ + 'reconfigure.sh=dcaeapplib:reconfigure' + ] + } +) diff --git a/dcaeapplib/tests/test_dcaeapplib.py b/dcaeapplib/tests/test_dcaeapplib.py new file mode 100644 index 0000000..fabec41 --- /dev/null +++ b/dcaeapplib/tests/test_dcaeapplib.py @@ -0,0 +1,107 @@ +# org.onap.dcae +# ============LICENSE_START==================================================== +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import dcaeapplib +import requests +import json + +class Stubs: + def __init__(self): + self.toreturn = [ 's1', 's2', 's3' ] + self.config = { + "anotherparameter": 1, + "streams_subscribes": { + "myinputstream": { + "aaf_username": "user1", + "aaf_password": "pass1", + "dmaap_info": { + "topic_url": "http://messagerouter.example.com:3904/events/topic1" + } + } + }, + "streams_publishes": { + "myoutputstream": { + "aaf_username": "user2", + "aaf_password": "pass2", + "dmaap_info": { + "topic_url": "http://messagerouter.example.com:3904/events/topic2" + } + } + } + } + self.health = True + self.cc = False + + def raise_for_status(self): + pass + + def json(self): + return self.toreturn + +def test_todo(monkeypatch): + stuff = Stubs() + def stub_config(): + return json.loads(json.dumps(stuff.config)) + def stub_hc(): + return stuff.health + def stub_cc(): + stuff.cc = True + monkeypatch.setattr(dcaeapplib, 'get_config', stub_config) + monkeypatch.setattr(dcaeapplib, '_httpport', 0) + env = dcaeapplib.DcaeEnv(healthCB=stub_hc, reconfigCB=stub_cc) + env.stop() # exercise stop when not running + env.start() + print('Port is {0}'.format(env._server.server_port)) + monkeypatch.setattr(dcaeapplib, '_httpport', env._server.server_port) + stuff.config['anotherparameter'] = 2 + assert env.getconfig()['anotherparameter'] == 1 + dcaeapplib.reconfigure() + assert stuff.cc is True + assert env.getconfig()['anotherparameter'] == 2 + resp = requests.get('http://localhost:{0}/healthcheck'.format(dcaeapplib._httpport)) + assert resp.status_code < 300 + stuff.health = False + resp = requests.get('http://localhost:{0}/healthcheck'.format(dcaeapplib._httpport)) + assert resp.status_code >= 500 + resp = requests.get('http://localhost:{0}/invalid'.format(dcaeapplib._httpport)) + assert resp.status_code == 404 + env.start() # exercise start when already running + env.stop() + def stub_get(*args, **kwargs): + return stuff + def stub_post(url, data, *args, **kwargs): + assert data == '4.11.asdfhello world' + stuff.posted = True + return stuff + monkeypatch.setattr(requests, 'post', stub_post) + stuff.posted = False + env.senddata('myoutputstream', 'asdf', 'hello world') + assert stuff.posted == True + monkeypatch.setattr(requests, 'get', stub_get) + assert env.hasdata('myinputstream') is False + assert env.getdata('myinputstream') == 's1' + stuff.toreturn = [ 'a1' ] + assert env.getdata('myinputstream') == 's2' + assert env.hasdata('myinputstream') is True + assert env.getdata('myinputstream') == 's3' + assert env.hasdata('myinputstream') is False + assert env.getdata('myinputstream') == 'a1' + stuff.toreturn = [] + assert env.hasdata('myinputstream') is False + assert env.getdata('myinputstream') is None diff --git a/dcaeapplib/tox-local.ini b/dcaeapplib/tox-local.ini new file mode 100644 index 0000000..935a5d6 --- /dev/null +++ b/dcaeapplib/tox-local.ini @@ -0,0 +1,11 @@ +[tox] +envlist = py27 +[testenv] +deps= + onap-dcae-cbs-docker-client>=0.0.2 + pytest + coverage + pytest-cov +setenv = + PYTHONPATH={toxinidir} +commands=pytest --cov dcaeapplib --cov-report html diff --git a/dcaeapplib/tox.ini b/dcaeapplib/tox.ini new file mode 100644 index 0000000..9eb453c --- /dev/null +++ b/dcaeapplib/tox.ini @@ -0,0 +1,11 @@ +[tox] +envlist = py27,py35 +[testenv] +deps= + onap-dcae-cbs-docker-client>=0.0.2 + pytest + coverage + pytest-cov +setenv = + PYTHONPATH={toxinidir} +commands=pytest --junitxml xunit-results.xml --cov dcaeapplib --cov-report xml diff --git a/pom.xml b/pom.xml index c03e65a..6af5fc4 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. onap-dcae-dcaepolicy-lib python-discovery-client python-dockering + dcaeapplib -- cgit 1.2.3-korg