summaryrefslogtreecommitdiffstats
path: root/k8s/k8splugin/tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'k8s/k8splugin/tasks.py')
-rw-r--r--k8s/k8splugin/tasks.py126
1 files changed, 29 insertions, 97 deletions
diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py
index ecd3ffa..108cf31 100644
--- a/k8s/k8splugin/tasks.py
+++ b/k8s/k8splugin/tasks.py
@@ -129,8 +129,9 @@ def create_for_components(**create_inputs):
"""
_done_for_create(
**_setup_for_discovery(
- **_generate_component_name(
- **create_inputs)))
+ **_enhance_docker_params(
+ **_generate_component_name(
+ **create_inputs))))
def _parse_streams(**kwargs):
@@ -144,13 +145,32 @@ def _parse_streams(**kwargs):
def setup_subscribes(s):
if s["type"] == "data_router":
- # If username and password has been provided then generate it. The
- # DMaaP plugin doesn't generate for subscribers. The generation code
- # and length of username password has been lifted from the DMaaP
- # plugin.
# Don't want to mutate the source
s = copy.deepcopy(s)
+
+ # Set up the delivery URL
+ # Using service_component_name as the host name in the subscriber URL
+ # will work in a single-cluster ONAP deployment. Whether it will also work
+ # in a multi-cluster ONAP deployment--with a central location and one or
+ # more remote ("edge") locations depends on how networking and DNS is set
+ # up in a multi-cluster deployment
+ service_component_name = kwargs["name"]
+ ports,_ = k8sclient.parse_ports(kwargs["ports"])
+ (dport, _) = ports[0]
+ subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
+
+ scheme = s["scheme"] if "scheme" in s else DEFAULT_SCHEME
+ if "route" not in s:
+ raise NonRecoverableError("'route' key missing from data router subscriber")
+ path = s["route"]
+ s["delivery_url"] = "{scheme}://{host}/{path}".format(
+ scheme=scheme, host=subscriber_host, path=path)
+
+ # If username and password has not been provided then generate it. The
+ # DMaaP plugin doesn't generate for subscribers. The generation code
+ # and length of username password has been lifted from the DMaaP
+ # plugin.
if not s.get("username", None):
s["username"] = utils.random_string(8)
if not s.get("password", None):
@@ -158,45 +178,10 @@ def _parse_streams(**kwargs):
kwargs[s["name"]] = s
- # NOTE: That the delivery url is constructed and setup in the start operation
map(setup_subscribes, kwargs["streams_subscribes"])
return kwargs
-def _setup_for_discovery_streams(**kwargs):
- """Setup for discovery of streams
-
- Specifically, there's a race condition this call addresses for data router
- subscriber case. The component needs its feed subscriber information but the
- DMaaP plugin doesn't provide this until after the docker plugin start
- operation.
- """
- dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
- if s["type"] == "data_router"]
-
- if dr_subs:
- dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
- conn = dis.create_kv_conn(CONSUL_HOST)
-
- def add_feed(dr_sub):
- # delivery url and subscriber id will be fill by the dmaap plugin later
- v = { "location": dr_sub["location"], "delivery_url": None,
- "username": dr_sub["username"], "password": dr_sub["password"],
- "subscriber_id": None }
- return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
-
- try:
- if all(map(add_feed, dr_subs)):
- return kwargs
- except Exception as e:
- raise NonRecoverableError(e)
-
- # You should never get here
- raise NonRecoverableError("Failure updating feed streams in Consul")
- else:
- return kwargs
-
-
@merge_inputs_for_create
@monkeypatch_loggers
@Policies.gather_policies_to_node()
@@ -209,16 +194,14 @@ def create_for_components_with_streams(**create_inputs):
1. Generating service component name
2. Setup runtime properties for DMaaP plugin
3. Populating application config into Consul
- 4. Populating DMaaP config for data router subscribers in Consul
"""
_done_for_create(
**_setup_for_discovery(
- **_setup_for_discovery_streams(
- **_parse_streams(
+ **_parse_streams(
+ **_enhance_docker_params(
**_generate_component_name(
**create_inputs)))))
-
@merge_inputs_for_create
@monkeypatch_loggers
@operation
@@ -470,58 +453,7 @@ def create_and_start_container_for_components(**start_inputs):
_done_for_start(
**_verify_component(
**_create_and_start_component(
- **_enhance_docker_params(
- **_parse_cloudify_context(**start_inputs)))))
-
-
-def _update_delivery_url(**kwargs):
- """Update the delivery url for data router subscribers"""
- dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
- if s["type"] == "data_router"]
-
- if dr_subs:
- service_component_name = kwargs[SERVICE_COMPONENT_NAME]
- # TODO: Should NOT be setting up the delivery url with ip addresses
- # because in the https case, this will not work because data router does
- # a certificate validation using the fqdn.
- ports,_ = k8sclient.parse_ports(kwargs["ports"])
- (dport, _) = ports[0]
- # Using service_component_name as the host name in the subscriber URL
- # will work in a single-cluster ONAP deployment. Whether it will also work
- # in a multi-cluster ONAP deployment--with a central location and one or
- # more remote ("edge") locations depends on how networking and DNS is set
- # up in a multi-cluster deployment
- subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
-
- for dr_sub in dr_subs:
- scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
- if "route" not in dr_sub:
- raise NonRecoverableError("'route' key missing from data router subscriber")
- path = dr_sub["route"]
- dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
- scheme=scheme, host=subscriber_host, path=path)
- kwargs[dr_sub["name"]] = dr_sub
-
- return kwargs
-
-@wrap_error_handling_start
-@merge_inputs_for_start
-@monkeypatch_loggers
-@operation
-def create_and_start_container_for_components_with_streams(**start_inputs):
- """Initiate Kubernetes deployment for service components that have streams
-
- This operation method is to be used with the ContainerizedServiceComponentUsingDmaap
- node type. After initiating the Kubernetes deployment, the plugin will verify with
- Kubernetes that the app is up and responding successfully to readiness probes.
- """
- _done_for_start(
- **_update_delivery_url(
- **_verify_component(
- **_create_and_start_component(
- **_enhance_docker_params(
- **_parse_cloudify_context(**start_inputs))))))
-
+ **_parse_cloudify_context(**start_inputs))))
@wrap_error_handling_start
@monkeypatch_loggers