summaryrefslogtreecommitdiffstats
path: root/dcaeapplib/dcaeapplib/__init__.py
blob: e81b6fce62accded7ab6d51fea6b22ba7b12db9c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# org.onap.dcae
# ============LICENSE_START====================================================
# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# =============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END======================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.

import base64
import json
import os
import requests
from threading import Thread
import uuid
from onap_dcae_cbs_docker_client.client import get_config

try:
  from http.server import BaseHTTPRequestHandler, HTTPServer
except ImportError:
  from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer

_httpport = int(os.environ['DCAEPORT']) if 'DCAEPORT' in os.environ else 80
_clientid = uuid.uuid4().hex
_groupid = uuid.uuid4().hex

class _handler(BaseHTTPRequestHandler):
  def do_GET(self):
    if '/healthcheck' == self.path:
      if self.server.env._health():
        self.send_response(202)
        self.end_headers()
      else:
        self.send_error(503)
    elif '/reconfigure' == self.path:
      self.server.env._loadconfig()
      self.server.env._reconf()
      self.send_response(202)
      self.end_headers()
    else:
      self.send_error(404)

class DcaeEnv:
  def __init__(self, healthCB = lambda:True, reconfigCB = lambda:None):
    """
    Initialize environment, but don't start web server or invoke any callbacks.
    """
    self._health = healthCB
    self._reconf = reconfigCB
    self._unread = {}
    self._server = None
    self._loadconfig()

  def start(self):
    """
    Start web server to receive health checks and reconfigure requests
    """
    if self._server is not None:
      return
    self._server = HTTPServer(('', _httpport), _handler)
    self._server.env = self
    th = Thread(target=self._server.serve_forever, name='webserver')
    th.daemon = True
    th.start()

  def stop(self):
    """
    Stop web server
    """
    if self._server is None:
      return
    self._server.shutdown()
    self._server.env = None
    self._server = None

  def _loadconfig(self):
    self._config = get_config()

  def hasdata(self, stream):
    """
    Return whether there is any unprocessed received data for the specified
    data stream.  That is, if an earlier getdata() call returned more than
    one record, and the additional records have not yet been retrieved.
    """
    return stream in self._unread

  def getdata(self, stream, timeout_ms = 15000, limit = 10):
    """
    Try to retrieve data from Message Router for the specified data stream.
    If no data is retrieved, within the specified timeout, return None.
    """
    sinfo = self._config['streams_subscribes'][stream]
    if stream in self._unread:
      x = self._unread[stream]
      ret = x.pop()
      if len(x) == 0:
        del self._unread[stream]
      return ret
    gid = sinfo['client_id'] if 'client_id' in sinfo and sinfo['client_id'] else _groupid
    resp = requests.get('{0}/{1}/{2}?timeout={3}&limit={4}'.format(sinfo['dmaap_info']['topic_url'], gid, _clientid, timeout_ms, limit), auth=(sinfo['aaf_username'], sinfo['aaf_password']))
    resp.raise_for_status()
    x = resp.json()
    if len(x) == 0:
      return None
    if len(x) == 1:
      return x[0]
    x.reverse()
    ret = x.pop()
    self._unread[stream] = x
    return ret

  def senddata(self, stream, partition, data):
    """
    Publish data to the specified stream.
    """
    sinfo = self._config['streams_publishes'][stream]
    body = '{0}.{1}.{2}{3}'.format(len(partition), len(data), partition, data)
    resp = requests.post('{0}'.format(sinfo['dmaap_info']['topic_url']), auth=(sinfo['aaf_username'], sinfo['aaf_password']), headers={'Content-Type': 'application/cambria'}, data=body)
    resp.raise_for_status()

  def getconfig(self):
    """
    Get the latest version of the configuration data.
    """
    return self._config

def reconfigure():
  """
  Make the web request to reconfigure (locally)
  """
  requests.get('http://localhost:{0}/reconfigure'.format(_httpport))