summaryrefslogtreecommitdiffstats
path: root/dcaeapplib/dcaeapplib/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'dcaeapplib/dcaeapplib/__init__.py')
-rw-r--r--dcaeapplib/dcaeapplib/__init__.py15
1 files changed, 13 insertions, 2 deletions
diff --git a/dcaeapplib/dcaeapplib/__init__.py b/dcaeapplib/dcaeapplib/__init__.py
index e81b6fc..18b60fe 100644
--- a/dcaeapplib/dcaeapplib/__init__.py
+++ b/dcaeapplib/dcaeapplib/__init__.py
@@ -50,6 +50,17 @@ class _handler(BaseHTTPRequestHandler):
else:
self.send_error(404)
+def _genauth(sinfo):
+ """
+ Return authentication parameters for stream, if present.
+ """
+ user = sinfo['aaf_username'] if 'aaf_username' in sinfo else None
+ password = sinfo['aaf_password'] if 'aaf_password' in sinfo else None
+ if user and password:
+ return { 'auth': (user, password) }
+ else:
+ return {}
+
class DcaeEnv:
def __init__(self, healthCB = lambda:True, reconfigCB = lambda:None):
"""
@@ -107,7 +118,7 @@ class DcaeEnv:
del self._unread[stream]
return ret
gid = sinfo['client_id'] if 'client_id' in sinfo and sinfo['client_id'] else _groupid
- resp = requests.get('{0}/{1}/{2}?timeout={3}&limit={4}'.format(sinfo['dmaap_info']['topic_url'], gid, _clientid, timeout_ms, limit), auth=(sinfo['aaf_username'], sinfo['aaf_password']))
+ resp = requests.get('{0}/{1}/{2}?timeout={3}&limit={4}'.format(sinfo['dmaap_info']['topic_url'], gid, _clientid, timeout_ms, limit), **_genauth(sinfo))
resp.raise_for_status()
x = resp.json()
if len(x) == 0:
@@ -125,7 +136,7 @@ class DcaeEnv:
"""
sinfo = self._config['streams_publishes'][stream]
body = '{0}.{1}.{2}{3}'.format(len(partition), len(data), partition, data)
- resp = requests.post('{0}'.format(sinfo['dmaap_info']['topic_url']), auth=(sinfo['aaf_username'], sinfo['aaf_password']), headers={'Content-Type': 'application/cambria'}, data=body)
+ resp = requests.post('{0}'.format(sinfo['dmaap_info']['topic_url']), headers={'Content-Type': 'application/cambria'}, data=body, **_genauth(sinfo))
resp.raise_for_status()
def getconfig(self):