diff options
Diffstat (limited to 'docker/dockerplugin/tasks.py')
-rw-r--r-- | docker/dockerplugin/tasks.py | 47 |
1 files changed, 22 insertions, 25 deletions
diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py index 03eba62..8a15319 100644 --- a/docker/dockerplugin/tasks.py +++ b/docker/dockerplugin/tasks.py @@ -2,6 +2,7 @@ # org.onap.dcae # ================================================================================ # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019 Pantheon.tech. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -136,32 +137,29 @@ def _parse_streams(**kwargs): """Parse streams and setup for DMaaP plugin""" # The DMaaP plugin requires this plugin to set the runtime properties # keyed by the node name. - def setup_publishes(s): - kwargs[s["name"]] = s + for stream in kwargs["streams_publishes"]: + kwargs[stream["name"]] = stream - map(setup_publishes, kwargs["streams_publishes"]) - - def setup_subscribes(s): - if s["type"] == "data_router": - # If username and password has been provided then generate it. The - # DMaaP plugin doesn't generate for subscribers. The generation code - # and length of username password has been lifted from the DMaaP + # NOTE: That the delivery url is constructed and setup in the start operation + for stream in kwargs["streams_subscribes"]: + if stream["type"] == "data_router": + # If either username or password is missing then generate it. The + # DMaaP plugin doesn't generate them for subscribers. + # The code and length of username/password are lifted from the DMaaP # plugin. # Don't want to mutate the source - s = copy.deepcopy(s) - if not s.get("username", None): - s["username"] = utils.random_string(8) - if not s.get("password", None): - s["password"] = utils.random_string(10) + stream = copy.deepcopy(stream) + if not stream.get("username", None): + stream["username"] = utils.random_string(8) + if not stream.get("password", None): + stream["password"] = utils.random_string(10) - kwargs[s["name"]] = s - - # NOTE: That the delivery url is constructed and setup in the start operation - map(setup_subscribes, kwargs["streams_subscribes"]) + kwargs[stream["name"]] = stream return kwargs + def _setup_for_discovery_streams(**kwargs): """Setup for discovery of streams @@ -182,18 +180,17 @@ def _setup_for_discovery_streams(**kwargs): v = { "location": dr_sub["location"], "delivery_url": None, "username": dr_sub["username"], "password": dr_sub["password"], "subscriber_id": None } - return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None + return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) try: - if all(map(add_feed, dr_subs)): - return kwargs + for dr_sub in dr_subs: + if add_feed(dr_sub) is None: + raise NonRecoverableError( + "Failure updating feed streams in Consul") except Exception as e: raise NonRecoverableError(e) - # You should never get here - raise NonRecoverableError("Failure updating feed streams in Consul") - else: - return kwargs + return kwargs @merge_inputs_for_create |