diff options
Diffstat (limited to 'k8s/k8splugin')
-rw-r--r-- | k8s/k8splugin/__init__.py | 1 | ||||
-rw-r--r-- | k8s/k8splugin/tasks.py | 126 |
2 files changed, 29 insertions, 98 deletions
diff --git a/k8s/k8splugin/__init__.py b/k8s/k8splugin/__init__.py index 7f721b2..aa4ceda 100644 --- a/k8s/k8splugin/__init__.py +++ b/k8s/k8splugin/__init__.py @@ -24,7 +24,6 @@ # __version__ = '0.1.0' from .tasks import create_for_components, create_for_components_with_streams, \ - create_and_start_container_for_components_with_streams, \ create_for_platforms, create_and_start_container, \ create_and_start_container_for_components, create_and_start_container_for_platforms, \ stop_and_remove_container, cleanup_discovery, policy_update, scale, update_image
\ No newline at end of file diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py index ecd3ffa..108cf31 100644 --- a/k8s/k8splugin/tasks.py +++ b/k8s/k8splugin/tasks.py @@ -129,8 +129,9 @@ def create_for_components(**create_inputs): """ _done_for_create( **_setup_for_discovery( - **_generate_component_name( - **create_inputs))) + **_enhance_docker_params( + **_generate_component_name( + **create_inputs)))) def _parse_streams(**kwargs): @@ -144,13 +145,32 @@ def _parse_streams(**kwargs): def setup_subscribes(s): if s["type"] == "data_router": - # If username and password has been provided then generate it. The - # DMaaP plugin doesn't generate for subscribers. The generation code - # and length of username password has been lifted from the DMaaP - # plugin. # Don't want to mutate the source s = copy.deepcopy(s) + + # Set up the delivery URL + # Using service_component_name as the host name in the subscriber URL + # will work in a single-cluster ONAP deployment. Whether it will also work + # in a multi-cluster ONAP deployment--with a central location and one or + # more remote ("edge") locations depends on how networking and DNS is set + # up in a multi-cluster deployment + service_component_name = kwargs["name"] + ports,_ = k8sclient.parse_ports(kwargs["ports"]) + (dport, _) = ports[0] + subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport) + + scheme = s["scheme"] if "scheme" in s else DEFAULT_SCHEME + if "route" not in s: + raise NonRecoverableError("'route' key missing from data router subscriber") + path = s["route"] + s["delivery_url"] = "{scheme}://{host}/{path}".format( + scheme=scheme, host=subscriber_host, path=path) + + # If username and password has not been provided then generate it. The + # DMaaP plugin doesn't generate for subscribers. The generation code + # and length of username password has been lifted from the DMaaP + # plugin. if not s.get("username", None): s["username"] = utils.random_string(8) if not s.get("password", None): @@ -158,45 +178,10 @@ def _parse_streams(**kwargs): kwargs[s["name"]] = s - # NOTE: That the delivery url is constructed and setup in the start operation map(setup_subscribes, kwargs["streams_subscribes"]) return kwargs -def _setup_for_discovery_streams(**kwargs): - """Setup for discovery of streams - - Specifically, there's a race condition this call addresses for data router - subscriber case. The component needs its feed subscriber information but the - DMaaP plugin doesn't provide this until after the docker plugin start - operation. - """ - dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ - if s["type"] == "data_router"] - - if dr_subs: - dmaap_kv_key = "{0}:dmaap".format(kwargs["name"]) - conn = dis.create_kv_conn(CONSUL_HOST) - - def add_feed(dr_sub): - # delivery url and subscriber id will be fill by the dmaap plugin later - v = { "location": dr_sub["location"], "delivery_url": None, - "username": dr_sub["username"], "password": dr_sub["password"], - "subscriber_id": None } - return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None - - try: - if all(map(add_feed, dr_subs)): - return kwargs - except Exception as e: - raise NonRecoverableError(e) - - # You should never get here - raise NonRecoverableError("Failure updating feed streams in Consul") - else: - return kwargs - - @merge_inputs_for_create @monkeypatch_loggers @Policies.gather_policies_to_node() @@ -209,16 +194,14 @@ def create_for_components_with_streams(**create_inputs): 1. Generating service component name 2. Setup runtime properties for DMaaP plugin 3. Populating application config into Consul - 4. Populating DMaaP config for data router subscribers in Consul """ _done_for_create( **_setup_for_discovery( - **_setup_for_discovery_streams( - **_parse_streams( + **_parse_streams( + **_enhance_docker_params( **_generate_component_name( **create_inputs))))) - @merge_inputs_for_create @monkeypatch_loggers @operation @@ -470,58 +453,7 @@ def create_and_start_container_for_components(**start_inputs): _done_for_start( **_verify_component( **_create_and_start_component( - **_enhance_docker_params( - **_parse_cloudify_context(**start_inputs))))) - - -def _update_delivery_url(**kwargs): - """Update the delivery url for data router subscribers""" - dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ - if s["type"] == "data_router"] - - if dr_subs: - service_component_name = kwargs[SERVICE_COMPONENT_NAME] - # TODO: Should NOT be setting up the delivery url with ip addresses - # because in the https case, this will not work because data router does - # a certificate validation using the fqdn. - ports,_ = k8sclient.parse_ports(kwargs["ports"]) - (dport, _) = ports[0] - # Using service_component_name as the host name in the subscriber URL - # will work in a single-cluster ONAP deployment. Whether it will also work - # in a multi-cluster ONAP deployment--with a central location and one or - # more remote ("edge") locations depends on how networking and DNS is set - # up in a multi-cluster deployment - subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport) - - for dr_sub in dr_subs: - scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME - if "route" not in dr_sub: - raise NonRecoverableError("'route' key missing from data router subscriber") - path = dr_sub["route"] - dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format( - scheme=scheme, host=subscriber_host, path=path) - kwargs[dr_sub["name"]] = dr_sub - - return kwargs - -@wrap_error_handling_start -@merge_inputs_for_start -@monkeypatch_loggers -@operation -def create_and_start_container_for_components_with_streams(**start_inputs): - """Initiate Kubernetes deployment for service components that have streams - - This operation method is to be used with the ContainerizedServiceComponentUsingDmaap - node type. After initiating the Kubernetes deployment, the plugin will verify with - Kubernetes that the app is up and responding successfully to readiness probes. - """ - _done_for_start( - **_update_delivery_url( - **_verify_component( - **_create_and_start_component( - **_enhance_docker_params( - **_parse_cloudify_context(**start_inputs)))))) - + **_parse_cloudify_context(**start_inputs)))) @wrap_error_handling_start @monkeypatch_loggers |