From 91f452a988c05fc901a5b4ca0ac76eefb6be3a3c Mon Sep 17 00:00:00 2001 From: Jack Lucas Date: Wed, 3 Jul 2019 11:13:25 -0400 Subject: Move DR subscriber creation before container start Also update Kubernetes client library version Fix bug that allowed generating a too-long k8s deployment name. Issue-ID: DCAEGEN2-1651 Issue-ID: DCAEGEN2-1653 Issue-ID: DCAEGEN2-1667 Change-Id: Ied859073fb01d8623278cf9e58c1dcc26fed1712 Signed-off-by: Jack Lucas --- k8s/k8s-node-type.yaml | 17 +++--- k8s/k8sclient/k8sclient.py | 8 +-- k8s/k8splugin/__init__.py | 1 - k8s/k8splugin/tasks.py | 126 +++++++++++---------------------------------- k8s/pom.xml | 2 +- k8s/requirements.txt | 2 +- k8s/setup.py | 4 +- k8s/tests/test_tasks.py | 114 +++++++--------------------------------- 8 files changed, 62 insertions(+), 212 deletions(-) diff --git a/k8s/k8s-node-type.yaml b/k8s/k8s-node-type.yaml index c32d834..86fdb2e 100644 --- a/k8s/k8s-node-type.yaml +++ b/k8s/k8s-node-type.yaml @@ -22,7 +22,7 @@ plugins: k8s: executor: 'central_deployment_agent' package_name: k8splugin - package_version: 1.5.0 + package_version: 1.6.0 data_types: @@ -94,7 +94,7 @@ data_types: node_types: dcae.nodes.ContainerizedComponent: - # Bese type for all containerized components + # Base type for all containerized components # Captures common properties and interfaces derived_from: cloudify.nodes.Root properties: @@ -112,7 +112,7 @@ node_types: docker_config: default: {} description: > - This is what is the auxilary portion of the component spec that contains things + Copied from the auxiliary portion of the component spec that contains things like healthcheck definitions for the Docker component. Health checks are optional. @@ -202,9 +202,9 @@ node_types: type: string description: > Manually override and set the name for this Docker container node. If this - is set, then the name will not be auto-generated. Platform services are the - specific use cases for using this parameter because they have static - names for example the CDAP broker. + is set, then the name will not be auto-generated. Using this feature provides + a service component with a fixed name that's known in advance, but care must be taken + to avoid attempting to deploy two components with the same name. default: Null interfaces: @@ -278,11 +278,8 @@ node_types: interfaces: cloudify.interfaces.lifecycle: create: - # Generate service component name and populate config into Consul + # Generate service component name, populate config into Consul, set up runtime properties for DMaaP plugin implementation: k8s.k8splugin.create_for_components_with_streams - start: - # Create Docker container and start - implementation: k8s.k8splugin.create_and_start_container_for_components_with_streams # ContainerizedPlatformComponent is intended for DCAE platform services. Unlike the components, # platform services have well-known names and well-known ports. diff --git a/k8s/k8sclient/k8sclient.py b/k8s/k8sclient/k8sclient.py index 273b9f3..9a01536 100644 --- a/k8s/k8sclient/k8sclient.py +++ b/k8s/k8sclient/k8sclient.py @@ -43,10 +43,10 @@ FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600} PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$") def _create_deployment_name(component_name): - return "dep-{0}".format(component_name) + return "dep-{0}".format(component_name)[:63] def _create_service_name(component_name): - return "{0}".format(component_name) + return "{0}".format(component_name)[:63] def _create_exposed_service_name(component_name): return ("x{0}".format(component_name))[:63] @@ -545,7 +545,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r core.delete_namespaced_service(_create_service_name(component_name), namespace) # If the deployment was created but not the service, delete the deployment if deployment_ok: - client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions()) + client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions()) raise e return dep, deployment_description @@ -561,7 +561,7 @@ def undeploy(deployment_description): # Have k8s delete the underlying pods and replicaset when deleting the deployment. options = client.V1DeleteOptions(propagation_policy="Foreground") - client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options) + client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options) def is_available(location, namespace, component_name): _configure_api(location) diff --git a/k8s/k8splugin/__init__.py b/k8s/k8splugin/__init__.py index 7f721b2..aa4ceda 100644 --- a/k8s/k8splugin/__init__.py +++ b/k8s/k8splugin/__init__.py @@ -24,7 +24,6 @@ # __version__ = '0.1.0' from .tasks import create_for_components, create_for_components_with_streams, \ - create_and_start_container_for_components_with_streams, \ create_for_platforms, create_and_start_container, \ create_and_start_container_for_components, create_and_start_container_for_platforms, \ stop_and_remove_container, cleanup_discovery, policy_update, scale, update_image \ No newline at end of file diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py index ecd3ffa..108cf31 100644 --- a/k8s/k8splugin/tasks.py +++ b/k8s/k8splugin/tasks.py @@ -129,8 +129,9 @@ def create_for_components(**create_inputs): """ _done_for_create( **_setup_for_discovery( - **_generate_component_name( - **create_inputs))) + **_enhance_docker_params( + **_generate_component_name( + **create_inputs)))) def _parse_streams(**kwargs): @@ -144,13 +145,32 @@ def _parse_streams(**kwargs): def setup_subscribes(s): if s["type"] == "data_router": - # If username and password has been provided then generate it. The - # DMaaP plugin doesn't generate for subscribers. The generation code - # and length of username password has been lifted from the DMaaP - # plugin. # Don't want to mutate the source s = copy.deepcopy(s) + + # Set up the delivery URL + # Using service_component_name as the host name in the subscriber URL + # will work in a single-cluster ONAP deployment. Whether it will also work + # in a multi-cluster ONAP deployment--with a central location and one or + # more remote ("edge") locations depends on how networking and DNS is set + # up in a multi-cluster deployment + service_component_name = kwargs["name"] + ports,_ = k8sclient.parse_ports(kwargs["ports"]) + (dport, _) = ports[0] + subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport) + + scheme = s["scheme"] if "scheme" in s else DEFAULT_SCHEME + if "route" not in s: + raise NonRecoverableError("'route' key missing from data router subscriber") + path = s["route"] + s["delivery_url"] = "{scheme}://{host}/{path}".format( + scheme=scheme, host=subscriber_host, path=path) + + # If username and password has not been provided then generate it. The + # DMaaP plugin doesn't generate for subscribers. The generation code + # and length of username password has been lifted from the DMaaP + # plugin. if not s.get("username", None): s["username"] = utils.random_string(8) if not s.get("password", None): @@ -158,45 +178,10 @@ def _parse_streams(**kwargs): kwargs[s["name"]] = s - # NOTE: That the delivery url is constructed and setup in the start operation map(setup_subscribes, kwargs["streams_subscribes"]) return kwargs -def _setup_for_discovery_streams(**kwargs): - """Setup for discovery of streams - - Specifically, there's a race condition this call addresses for data router - subscriber case. The component needs its feed subscriber information but the - DMaaP plugin doesn't provide this until after the docker plugin start - operation. - """ - dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ - if s["type"] == "data_router"] - - if dr_subs: - dmaap_kv_key = "{0}:dmaap".format(kwargs["name"]) - conn = dis.create_kv_conn(CONSUL_HOST) - - def add_feed(dr_sub): - # delivery url and subscriber id will be fill by the dmaap plugin later - v = { "location": dr_sub["location"], "delivery_url": None, - "username": dr_sub["username"], "password": dr_sub["password"], - "subscriber_id": None } - return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None - - try: - if all(map(add_feed, dr_subs)): - return kwargs - except Exception as e: - raise NonRecoverableError(e) - - # You should never get here - raise NonRecoverableError("Failure updating feed streams in Consul") - else: - return kwargs - - @merge_inputs_for_create @monkeypatch_loggers @Policies.gather_policies_to_node() @@ -209,16 +194,14 @@ def create_for_components_with_streams(**create_inputs): 1. Generating service component name 2. Setup runtime properties for DMaaP plugin 3. Populating application config into Consul - 4. Populating DMaaP config for data router subscribers in Consul """ _done_for_create( **_setup_for_discovery( - **_setup_for_discovery_streams( - **_parse_streams( + **_parse_streams( + **_enhance_docker_params( **_generate_component_name( **create_inputs))))) - @merge_inputs_for_create @monkeypatch_loggers @operation @@ -470,58 +453,7 @@ def create_and_start_container_for_components(**start_inputs): _done_for_start( **_verify_component( **_create_and_start_component( - **_enhance_docker_params( - **_parse_cloudify_context(**start_inputs))))) - - -def _update_delivery_url(**kwargs): - """Update the delivery url for data router subscribers""" - dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ - if s["type"] == "data_router"] - - if dr_subs: - service_component_name = kwargs[SERVICE_COMPONENT_NAME] - # TODO: Should NOT be setting up the delivery url with ip addresses - # because in the https case, this will not work because data router does - # a certificate validation using the fqdn. - ports,_ = k8sclient.parse_ports(kwargs["ports"]) - (dport, _) = ports[0] - # Using service_component_name as the host name in the subscriber URL - # will work in a single-cluster ONAP deployment. Whether it will also work - # in a multi-cluster ONAP deployment--with a central location and one or - # more remote ("edge") locations depends on how networking and DNS is set - # up in a multi-cluster deployment - subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport) - - for dr_sub in dr_subs: - scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME - if "route" not in dr_sub: - raise NonRecoverableError("'route' key missing from data router subscriber") - path = dr_sub["route"] - dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format( - scheme=scheme, host=subscriber_host, path=path) - kwargs[dr_sub["name"]] = dr_sub - - return kwargs - -@wrap_error_handling_start -@merge_inputs_for_start -@monkeypatch_loggers -@operation -def create_and_start_container_for_components_with_streams(**start_inputs): - """Initiate Kubernetes deployment for service components that have streams - - This operation method is to be used with the ContainerizedServiceComponentUsingDmaap - node type. After initiating the Kubernetes deployment, the plugin will verify with - Kubernetes that the app is up and responding successfully to readiness probes. - """ - _done_for_start( - **_update_delivery_url( - **_verify_component( - **_create_and_start_component( - **_enhance_docker_params( - **_parse_cloudify_context(**start_inputs)))))) - + **_parse_cloudify_context(**start_inputs)))) @wrap_error_handling_start @monkeypatch_loggers diff --git a/k8s/pom.xml b/k8s/pom.xml index d96ebe1..5b66d5a 100644 --- a/k8s/pom.xml +++ b/k8s/pom.xml @@ -28,7 +28,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. org.onap.dcaegen2.platform.plugins k8s k8s-plugin - 1.5.0-SNAPSHOT + 1.6.0-SNAPSHOT http://maven.apache.org UTF-8 diff --git a/k8s/requirements.txt b/k8s/requirements.txt index 12d9fea..b2c0986 100644 --- a/k8s/requirements.txt +++ b/k8s/requirements.txt @@ -1,5 +1,5 @@ python-consul>=0.6.0,<1.0.0 uuid==1.30 onap-dcae-dcaepolicy-lib>=2.4.1,<3.0.0 -kubernetes==4.0.0 +kubernetes==9.0.0 cloudify-plugins-common==3.4 diff --git a/k8s/setup.py b/k8s/setup.py index 5d27438..b0e43f6 100644 --- a/k8s/setup.py +++ b/k8s/setup.py @@ -23,7 +23,7 @@ from setuptools import setup setup( name='k8splugin', description='Cloudify plugin for containerized components deployed using Kubernetes', - version="1.5.0", + version="1.6.0", author='J. F. Lucas, Michael Hwang, Tommy Carpenter', packages=['k8splugin','k8sclient','msb','configure'], zip_safe=False, @@ -33,6 +33,6 @@ setup( "onap-dcae-dcaepolicy-lib>=2.4.1,<3.0.0", "cloudify-plugins-common==3.4", "cloudify-python-importer==0.1.0", - "kubernetes==4.0.0" + "kubernetes==9.0.0" ] ) diff --git a/k8s/tests/test_tasks.py b/k8s/tests/test_tasks.py index 933753a..c6781bb 100644 --- a/k8s/tests/test_tasks.py +++ b/k8s/tests/test_tasks.py @@ -53,38 +53,42 @@ def test_parse_streams(monkeypatch, mockconfig): assert expected == tasks._parse_streams(**test_input) # Good case for streams_subscribes (password provided) - test_input = { "streams_publishes": {}, + test_input = { "ports": ["1919:0", "1920:0"],"name": "testcomponent", + "streams_publishes": {}, "streams_subscribes": [{"name": "topic01", "type": "message_router"}, {"name": "feed01", "type": "data_router", "username": "hero", - "password": "123456"}] } + "password": "123456", "route":"test/v0"}] } - expected = {'feed01': {'type': 'data_router', 'name': 'feed01', - 'username': 'hero', 'password': '123456'}, - 'streams_publishes': {}, - 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'}, + expected = {'ports': ['1919:0', '1920:0'], 'name': 'testcomponent', + 'feed01': {'type': 'data_router', 'name': 'feed01', + 'username': 'hero', 'password': '123456', 'route': 'test/v0', 'delivery_url':'http://testcomponent:1919/test/v0'}, + 'streams_publishes': {}, + 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'}, {'type': 'data_router', 'name': 'feed01', 'username': 'hero', - 'password': '123456'}], - 'topic01': {'type': 'message_router', 'name': 'topic01'}} + 'password': '123456', 'route':'test/v0'}], + 'topic01': {'type': 'message_router', 'name': 'topic01'}} assert expected == tasks._parse_streams(**test_input) # Good case for streams_subscribes (password generated) - test_input = { "streams_publishes": {}, - "streams_subscribes": [{"name": "topic01", "type": "message_router"}, + test_input = { "ports": ["1919:0", "1920:0"],"name": "testcomponent", + "streams_publishes": {}, + "streams_subscribes": [{"name": "topic01", "type": "message_router"}, {"name": "feed01", "type": "data_router", "username": None, - "password": None}] } + "password": None, "route": "test/v0"}] } def not_so_random(n): return "nosurprise" monkeypatch.setattr(k8splugin.utils, "random_string", not_so_random) - expected = {'feed01': {'type': 'data_router', 'name': 'feed01', - 'username': 'nosurprise', 'password': 'nosurprise'}, + expected = { 'ports': ['1919:0', '1920:0'], 'name': 'testcomponent', + 'feed01': {'type': 'data_router', 'name': 'feed01', + 'username': 'nosurprise', 'password': 'nosurprise', 'route':'test/v0', 'delivery_url':'http://testcomponent:1919/test/v0'}, 'streams_publishes': {}, 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'}, {'type': 'data_router', 'name': 'feed01', 'username': None, - 'password': None}], + 'password': None, 'route': 'test/v0'}], 'topic01': {'type': 'message_router', 'name': 'topic01'}} assert expected == tasks._parse_streams(**test_input) @@ -114,66 +118,6 @@ def test_setup_for_discovery(monkeypatch, mockconfig): with pytest.raises(RecoverableError): tasks._setup_for_discovery(**test_input) - -def test_setup_for_discovery_streams(monkeypatch, mockconfig): - import k8splugin - from k8splugin import tasks - test_input = {'feed01': {'type': 'data_router', 'name': 'feed01', - 'username': 'hero', 'password': '123456', 'location': 'Bedminster'}, - 'streams_publishes': {}, - 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'}, - {'type': 'data_router', 'name': 'feed01', 'username': 'hero', - 'password': '123456', 'location': 'Bedminster'}], - 'topic01': {'type': 'message_router', 'name': 'topic01'}} - test_input["name"] = "some-foo-service-component" - - # Good case - def fake_add_to_entry(conn, key, add_name, add_value): - """ - This fake method will check all the pieces that are used to make store - details in Consul - """ - if key != test_input["name"] + ":dmaap": - return None - if add_name != "feed01": - return None - if add_value != {"location": "Bedminster", "delivery_url": None, - "username": "hero", "password": "123456", "subscriber_id": None}: - return None - - return "SUCCESS!" - - monkeypatch.setattr(k8splugin.discovery, "add_to_entry", - fake_add_to_entry) - - assert tasks._setup_for_discovery_streams(**test_input) == test_input - - # Good case - no data router subscribers - test_input = {"streams_publishes": [{"name": "topic00", "type": "message_router"}], - 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'}]} - test_input["name"] = "some-foo-service-component" - - assert tasks._setup_for_discovery_streams(**test_input) == test_input - - # Bad case - something happened from the Consul call - test_input = {'feed01': {'type': 'data_router', 'name': 'feed01', - 'username': 'hero', 'password': '123456', 'location': 'Bedminster'}, - 'streams_publishes': {}, - 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'}, - {'type': 'data_router', 'name': 'feed01', 'username': 'hero', - 'password': '123456', 'location': 'Bedminster'}], - 'topic01': {'type': 'message_router', 'name': 'topic01'}} - test_input["name"] = "some-foo-service-component" - - def barf(conn, key, add_name, add_value): - raise RuntimeError("Barf") - - monkeypatch.setattr(k8splugin.discovery, "add_to_entry", - barf) - - with pytest.raises(NonRecoverableError): - tasks._setup_for_discovery_streams(**test_input) - def test_verify_container(monkeypatch, mockconfig): import k8sclient from k8splugin import tasks @@ -195,28 +139,6 @@ def test_verify_container(monkeypatch, mockconfig): assert not tasks._verify_k8s_deployment("some-location", "some-name", 2) - -def test_update_delivery_url(monkeypatch, mockconfig): - import k8splugin - from k8splugin import tasks - test_input = {'feed01': {'type': 'data_router', 'name': 'feed01', - 'username': 'hero', 'password': '123456', 'location': 'Bedminster', - 'route': 'some-path'}, - 'streams_publishes': {}, - 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'}, - {'type': 'data_router', 'name': 'feed01', 'username': 'hero', - 'password': '123456', 'location': 'Bedminster', - 'route': 'some-path'}], - 'topic01': {'type': 'message_router', 'name': 'topic01'}, - 'ports': ['8080/tcp:0']} - test_input["service_component_name"] = "some-foo-service-component" - - expected = copy.deepcopy(test_input) - expected["feed01"]["delivery_url"] = "http://some-foo-service-component:8080/some-path" - - assert tasks._update_delivery_url(**test_input) == expected - - def test_enhance_docker_params(mockconfig): from k8splugin import tasks # Good - Test empty docker config -- cgit 1.2.3-korg