summaryrefslogtreecommitdiffstats
path: root/docker/dockerplugin/tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'docker/dockerplugin/tasks.py')
-rw-r--r--docker/dockerplugin/tasks.py47
1 files changed, 22 insertions, 25 deletions
diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py
index 03eba62..8a15319 100644
--- a/docker/dockerplugin/tasks.py
+++ b/docker/dockerplugin/tasks.py
@@ -2,6 +2,7 @@
# org.onap.dcae
# ================================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -136,32 +137,29 @@ def _parse_streams(**kwargs):
"""Parse streams and setup for DMaaP plugin"""
# The DMaaP plugin requires this plugin to set the runtime properties
# keyed by the node name.
- def setup_publishes(s):
- kwargs[s["name"]] = s
+ for stream in kwargs["streams_publishes"]:
+ kwargs[stream["name"]] = stream
- map(setup_publishes, kwargs["streams_publishes"])
-
- def setup_subscribes(s):
- if s["type"] == "data_router":
- # If username and password has been provided then generate it. The
- # DMaaP plugin doesn't generate for subscribers. The generation code
- # and length of username password has been lifted from the DMaaP
+ # NOTE: That the delivery url is constructed and setup in the start operation
+ for stream in kwargs["streams_subscribes"]:
+ if stream["type"] == "data_router":
+ # If either username or password is missing then generate it. The
+ # DMaaP plugin doesn't generate them for subscribers.
+ # The code and length of username/password are lifted from the DMaaP
# plugin.
# Don't want to mutate the source
- s = copy.deepcopy(s)
- if not s.get("username", None):
- s["username"] = utils.random_string(8)
- if not s.get("password", None):
- s["password"] = utils.random_string(10)
+ stream = copy.deepcopy(stream)
+ if not stream.get("username", None):
+ stream["username"] = utils.random_string(8)
+ if not stream.get("password", None):
+ stream["password"] = utils.random_string(10)
- kwargs[s["name"]] = s
-
- # NOTE: That the delivery url is constructed and setup in the start operation
- map(setup_subscribes, kwargs["streams_subscribes"])
+ kwargs[stream["name"]] = stream
return kwargs
+
def _setup_for_discovery_streams(**kwargs):
"""Setup for discovery of streams
@@ -182,18 +180,17 @@ def _setup_for_discovery_streams(**kwargs):
v = { "location": dr_sub["location"], "delivery_url": None,
"username": dr_sub["username"], "password": dr_sub["password"],
"subscriber_id": None }
- return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
+ return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v)
try:
- if all(map(add_feed, dr_subs)):
- return kwargs
+ for dr_sub in dr_subs:
+ if add_feed(dr_sub) is None:
+ raise NonRecoverableError(
+ "Failure updating feed streams in Consul")
except Exception as e:
raise NonRecoverableError(e)
- # You should never get here
- raise NonRecoverableError("Failure updating feed streams in Consul")
- else:
- return kwargs
+ return kwargs
@merge_inputs_for_create