summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJack Lucas <jflucas@research.att.com>2019-07-03 11:13:25 -0400
committerJack Lucas <jflucas@research.att.com>2019-07-17 10:07:18 -0400
commit91f452a988c05fc901a5b4ca0ac76eefb6be3a3c (patch)
tree9e7da92ec82b42da5bfe07ec497ac507187a6a62
parent9c094d0581c46d3d107facdc55cb2cc7a1d9f765 (diff)
Move DR subscriber creation before container start5.0.2-ONAP5.0.1-ONAPelalto
Also update Kubernetes client library version Fix bug that allowed generating a too-long k8s deployment name. Issue-ID: DCAEGEN2-1651 Issue-ID: DCAEGEN2-1653 Issue-ID: DCAEGEN2-1667 Change-Id: Ied859073fb01d8623278cf9e58c1dcc26fed1712 Signed-off-by: Jack Lucas <jflucas@research.att.com>
-rw-r--r--k8s/k8s-node-type.yaml17
-rw-r--r--k8s/k8sclient/k8sclient.py8
-rw-r--r--k8s/k8splugin/__init__.py1
-rw-r--r--k8s/k8splugin/tasks.py126
-rw-r--r--k8s/pom.xml2
-rw-r--r--k8s/requirements.txt2
-rw-r--r--k8s/setup.py4
-rw-r--r--k8s/tests/test_tasks.py114
8 files changed, 62 insertions, 212 deletions
diff --git a/k8s/k8s-node-type.yaml b/k8s/k8s-node-type.yaml
index c32d834..86fdb2e 100644
--- a/k8s/k8s-node-type.yaml
+++ b/k8s/k8s-node-type.yaml
@@ -22,7 +22,7 @@ plugins:
k8s:
executor: 'central_deployment_agent'
package_name: k8splugin
- package_version: 1.5.0
+ package_version: 1.6.0
data_types:
@@ -94,7 +94,7 @@ data_types:
node_types:
dcae.nodes.ContainerizedComponent:
- # Bese type for all containerized components
+ # Base type for all containerized components
# Captures common properties and interfaces
derived_from: cloudify.nodes.Root
properties:
@@ -112,7 +112,7 @@ node_types:
docker_config:
default: {}
description: >
- This is what is the auxilary portion of the component spec that contains things
+ Copied from the auxiliary portion of the component spec that contains things
like healthcheck definitions for the Docker component. Health checks are
optional.
@@ -202,9 +202,9 @@ node_types:
type: string
description: >
Manually override and set the name for this Docker container node. If this
- is set, then the name will not be auto-generated. Platform services are the
- specific use cases for using this parameter because they have static
- names for example the CDAP broker.
+ is set, then the name will not be auto-generated. Using this feature provides
+ a service component with a fixed name that's known in advance, but care must be taken
+ to avoid attempting to deploy two components with the same name.
default: Null
interfaces:
@@ -278,11 +278,8 @@ node_types:
interfaces:
cloudify.interfaces.lifecycle:
create:
- # Generate service component name and populate config into Consul
+ # Generate service component name, populate config into Consul, set up runtime properties for DMaaP plugin
implementation: k8s.k8splugin.create_for_components_with_streams
- start:
- # Create Docker container and start
- implementation: k8s.k8splugin.create_and_start_container_for_components_with_streams
# ContainerizedPlatformComponent is intended for DCAE platform services. Unlike the components,
# platform services have well-known names and well-known ports.
diff --git a/k8s/k8sclient/k8sclient.py b/k8s/k8sclient/k8sclient.py
index 273b9f3..9a01536 100644
--- a/k8s/k8sclient/k8sclient.py
+++ b/k8s/k8sclient/k8sclient.py
@@ -43,10 +43,10 @@ FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
def _create_deployment_name(component_name):
- return "dep-{0}".format(component_name)
+ return "dep-{0}".format(component_name)[:63]
def _create_service_name(component_name):
- return "{0}".format(component_name)
+ return "{0}".format(component_name)[:63]
def _create_exposed_service_name(component_name):
return ("x{0}".format(component_name))[:63]
@@ -545,7 +545,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
core.delete_namespaced_service(_create_service_name(component_name), namespace)
# If the deployment was created but not the service, delete the deployment
if deployment_ok:
- client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
+ client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
raise e
return dep, deployment_description
@@ -561,7 +561,7 @@ def undeploy(deployment_description):
# Have k8s delete the underlying pods and replicaset when deleting the deployment.
options = client.V1DeleteOptions(propagation_policy="Foreground")
- client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
+ client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
def is_available(location, namespace, component_name):
_configure_api(location)
diff --git a/k8s/k8splugin/__init__.py b/k8s/k8splugin/__init__.py
index 7f721b2..aa4ceda 100644
--- a/k8s/k8splugin/__init__.py
+++ b/k8s/k8splugin/__init__.py
@@ -24,7 +24,6 @@
# __version__ = '0.1.0'
from .tasks import create_for_components, create_for_components_with_streams, \
- create_and_start_container_for_components_with_streams, \
create_for_platforms, create_and_start_container, \
create_and_start_container_for_components, create_and_start_container_for_platforms, \
stop_and_remove_container, cleanup_discovery, policy_update, scale, update_image \ No newline at end of file
diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py
index ecd3ffa..108cf31 100644
--- a/k8s/k8splugin/tasks.py
+++ b/k8s/k8splugin/tasks.py
@@ -129,8 +129,9 @@ def create_for_components(**create_inputs):
"""
_done_for_create(
**_setup_for_discovery(
- **_generate_component_name(
- **create_inputs)))
+ **_enhance_docker_params(
+ **_generate_component_name(
+ **create_inputs))))
def _parse_streams(**kwargs):
@@ -144,13 +145,32 @@ def _parse_streams(**kwargs):
def setup_subscribes(s):
if s["type"] == "data_router":
- # If username and password has been provided then generate it. The
- # DMaaP plugin doesn't generate for subscribers. The generation code
- # and length of username password has been lifted from the DMaaP
- # plugin.
# Don't want to mutate the source
s = copy.deepcopy(s)
+
+ # Set up the delivery URL
+ # Using service_component_name as the host name in the subscriber URL
+ # will work in a single-cluster ONAP deployment. Whether it will also work
+ # in a multi-cluster ONAP deployment--with a central location and one or
+ # more remote ("edge") locations depends on how networking and DNS is set
+ # up in a multi-cluster deployment
+ service_component_name = kwargs["name"]
+ ports,_ = k8sclient.parse_ports(kwargs["ports"])
+ (dport, _) = ports[0]
+ subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
+
+ scheme = s["scheme"] if "scheme" in s else DEFAULT_SCHEME
+ if "route" not in s:
+ raise NonRecoverableError("'route' key missing from data router subscriber")
+ path = s["route"]
+ s["delivery_url"] = "{scheme}://{host}/{path}".format(
+ scheme=scheme, host=subscriber_host, path=path)
+
+ # If username and password has not been provided then generate it. The
+ # DMaaP plugin doesn't generate for subscribers. The generation code
+ # and length of username password has been lifted from the DMaaP
+ # plugin.
if not s.get("username", None):
s["username"] = utils.random_string(8)
if not s.get("password", None):
@@ -158,45 +178,10 @@ def _parse_streams(**kwargs):
kwargs[s["name"]] = s
- # NOTE: That the delivery url is constructed and setup in the start operation
map(setup_subscribes, kwargs["streams_subscribes"])
return kwargs
-def _setup_for_discovery_streams(**kwargs):
- """Setup for discovery of streams
-
- Specifically, there's a race condition this call addresses for data router
- subscriber case. The component needs its feed subscriber information but the
- DMaaP plugin doesn't provide this until after the docker plugin start
- operation.
- """
- dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
- if s["type"] == "data_router"]
-
- if dr_subs:
- dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
- conn = dis.create_kv_conn(CONSUL_HOST)
-
- def add_feed(dr_sub):
- # delivery url and subscriber id will be fill by the dmaap plugin later
- v = { "location": dr_sub["location"], "delivery_url": None,
- "username": dr_sub["username"], "password": dr_sub["password"],
- "subscriber_id": None }
- return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
-
- try:
- if all(map(add_feed, dr_subs)):
- return kwargs
- except Exception as e:
- raise NonRecoverableError(e)
-
- # You should never get here
- raise NonRecoverableError("Failure updating feed streams in Consul")
- else:
- return kwargs
-
-
@merge_inputs_for_create
@monkeypatch_loggers
@Policies.gather_policies_to_node()
@@ -209,16 +194,14 @@ def create_for_components_with_streams(**create_inputs):
1. Generating service component name
2. Setup runtime properties for DMaaP plugin
3. Populating application config into Consul
- 4. Populating DMaaP config for data router subscribers in Consul
"""
_done_for_create(
**_setup_for_discovery(
- **_setup_for_discovery_streams(
- **_parse_streams(
+ **_parse_streams(
+ **_enhance_docker_params(
**_generate_component_name(
**create_inputs)))))
-
@merge_inputs_for_create
@monkeypatch_loggers
@operation
@@ -470,58 +453,7 @@ def create_and_start_container_for_components(**start_inputs):
_done_for_start(
**_verify_component(
**_create_and_start_component(
- **_enhance_docker_params(
- **_parse_cloudify_context(**start_inputs)))))
-
-
-def _update_delivery_url(**kwargs):
- """Update the delivery url for data router subscribers"""
- dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
- if s["type"] == "data_router"]
-
- if dr_subs:
- service_component_name = kwargs[SERVICE_COMPONENT_NAME]
- # TODO: Should NOT be setting up the delivery url with ip addresses
- # because in the https case, this will not work because data router does
- # a certificate validation using the fqdn.
- ports,_ = k8sclient.parse_ports(kwargs["ports"])
- (dport, _) = ports[0]
- # Using service_component_name as the host name in the subscriber URL
- # will work in a single-cluster ONAP deployment. Whether it will also work
- # in a multi-cluster ONAP deployment--with a central location and one or
- # more remote ("edge") locations depends on how networking and DNS is set
- # up in a multi-cluster deployment
- subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
-
- for dr_sub in dr_subs:
- scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
- if "route" not in dr_sub:
- raise NonRecoverableError("'route' key missing from data router subscriber")
- path = dr_sub["route"]
- dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
- scheme=scheme, host=subscriber_host, path=path)
- kwargs[dr_sub["name"]] = dr_sub
-
- return kwargs
-
-@wrap_error_handling_start
-@merge_inputs_for_start
-@monkeypatch_loggers
-@operation
-def create_and_start_container_for_components_with_streams(**start_inputs):
- """Initiate Kubernetes deployment for service components that have streams
-
- This operation method is to be used with the ContainerizedServiceComponentUsingDmaap
- node type. After initiating the Kubernetes deployment, the plugin will verify with
- Kubernetes that the app is up and responding successfully to readiness probes.
- """
- _done_for_start(
- **_update_delivery_url(
- **_verify_component(
- **_create_and_start_component(
- **_enhance_docker_params(
- **_parse_cloudify_context(**start_inputs))))))
-
+ **_parse_cloudify_context(**start_inputs))))
@wrap_error_handling_start
@monkeypatch_loggers
diff --git a/k8s/pom.xml b/k8s/pom.xml
index d96ebe1..5b66d5a 100644
--- a/k8s/pom.xml
+++ b/k8s/pom.xml
@@ -28,7 +28,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
<groupId>org.onap.dcaegen2.platform.plugins</groupId>
<artifactId>k8s</artifactId>
<name>k8s-plugin</name>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/k8s/requirements.txt b/k8s/requirements.txt
index 12d9fea..b2c0986 100644
--- a/k8s/requirements.txt
+++ b/k8s/requirements.txt
@@ -1,5 +1,5 @@
python-consul>=0.6.0,<1.0.0
uuid==1.30
onap-dcae-dcaepolicy-lib>=2.4.1,<3.0.0
-kubernetes==4.0.0
+kubernetes==9.0.0
cloudify-plugins-common==3.4
diff --git a/k8s/setup.py b/k8s/setup.py
index 5d27438..b0e43f6 100644
--- a/k8s/setup.py
+++ b/k8s/setup.py
@@ -23,7 +23,7 @@ from setuptools import setup
setup(
name='k8splugin',
description='Cloudify plugin for containerized components deployed using Kubernetes',
- version="1.5.0",
+ version="1.6.0",
author='J. F. Lucas, Michael Hwang, Tommy Carpenter',
packages=['k8splugin','k8sclient','msb','configure'],
zip_safe=False,
@@ -33,6 +33,6 @@ setup(
"onap-dcae-dcaepolicy-lib>=2.4.1,<3.0.0",
"cloudify-plugins-common==3.4",
"cloudify-python-importer==0.1.0",
- "kubernetes==4.0.0"
+ "kubernetes==9.0.0"
]
)
diff --git a/k8s/tests/test_tasks.py b/k8s/tests/test_tasks.py
index 933753a..c6781bb 100644
--- a/k8s/tests/test_tasks.py
+++ b/k8s/tests/test_tasks.py
@@ -53,38 +53,42 @@ def test_parse_streams(monkeypatch, mockconfig):
assert expected == tasks._parse_streams(**test_input)
# Good case for streams_subscribes (password provided)
- test_input = { "streams_publishes": {},
+ test_input = { "ports": ["1919:0", "1920:0"],"name": "testcomponent",
+ "streams_publishes": {},
"streams_subscribes": [{"name": "topic01", "type": "message_router"},
{"name": "feed01", "type": "data_router", "username": "hero",
- "password": "123456"}] }
+ "password": "123456", "route":"test/v0"}] }
- expected = {'feed01': {'type': 'data_router', 'name': 'feed01',
- 'username': 'hero', 'password': '123456'},
- 'streams_publishes': {},
- 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
+ expected = {'ports': ['1919:0', '1920:0'], 'name': 'testcomponent',
+ 'feed01': {'type': 'data_router', 'name': 'feed01',
+ 'username': 'hero', 'password': '123456', 'route': 'test/v0', 'delivery_url':'http://testcomponent:1919/test/v0'},
+ 'streams_publishes': {},
+ 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
{'type': 'data_router', 'name': 'feed01', 'username': 'hero',
- 'password': '123456'}],
- 'topic01': {'type': 'message_router', 'name': 'topic01'}}
+ 'password': '123456', 'route':'test/v0'}],
+ 'topic01': {'type': 'message_router', 'name': 'topic01'}}
assert expected == tasks._parse_streams(**test_input)
# Good case for streams_subscribes (password generated)
- test_input = { "streams_publishes": {},
- "streams_subscribes": [{"name": "topic01", "type": "message_router"},
+ test_input = { "ports": ["1919:0", "1920:0"],"name": "testcomponent",
+ "streams_publishes": {},
+ "streams_subscribes": [{"name": "topic01", "type": "message_router"},
{"name": "feed01", "type": "data_router", "username": None,
- "password": None}] }
+ "password": None, "route": "test/v0"}] }
def not_so_random(n):
return "nosurprise"
monkeypatch.setattr(k8splugin.utils, "random_string", not_so_random)
- expected = {'feed01': {'type': 'data_router', 'name': 'feed01',
- 'username': 'nosurprise', 'password': 'nosurprise'},
+ expected = { 'ports': ['1919:0', '1920:0'], 'name': 'testcomponent',
+ 'feed01': {'type': 'data_router', 'name': 'feed01',
+ 'username': 'nosurprise', 'password': 'nosurprise', 'route':'test/v0', 'delivery_url':'http://testcomponent:1919/test/v0'},
'streams_publishes': {},
'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
{'type': 'data_router', 'name': 'feed01', 'username': None,
- 'password': None}],
+ 'password': None, 'route': 'test/v0'}],
'topic01': {'type': 'message_router', 'name': 'topic01'}}
assert expected == tasks._parse_streams(**test_input)
@@ -114,66 +118,6 @@ def test_setup_for_discovery(monkeypatch, mockconfig):
with pytest.raises(RecoverableError):
tasks._setup_for_discovery(**test_input)
-
-def test_setup_for_discovery_streams(monkeypatch, mockconfig):
- import k8splugin
- from k8splugin import tasks
- test_input = {'feed01': {'type': 'data_router', 'name': 'feed01',
- 'username': 'hero', 'password': '123456', 'location': 'Bedminster'},
- 'streams_publishes': {},
- 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
- {'type': 'data_router', 'name': 'feed01', 'username': 'hero',
- 'password': '123456', 'location': 'Bedminster'}],
- 'topic01': {'type': 'message_router', 'name': 'topic01'}}
- test_input["name"] = "some-foo-service-component"
-
- # Good case
- def fake_add_to_entry(conn, key, add_name, add_value):
- """
- This fake method will check all the pieces that are used to make store
- details in Consul
- """
- if key != test_input["name"] + ":dmaap":
- return None
- if add_name != "feed01":
- return None
- if add_value != {"location": "Bedminster", "delivery_url": None,
- "username": "hero", "password": "123456", "subscriber_id": None}:
- return None
-
- return "SUCCESS!"
-
- monkeypatch.setattr(k8splugin.discovery, "add_to_entry",
- fake_add_to_entry)
-
- assert tasks._setup_for_discovery_streams(**test_input) == test_input
-
- # Good case - no data router subscribers
- test_input = {"streams_publishes": [{"name": "topic00", "type": "message_router"}],
- 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'}]}
- test_input["name"] = "some-foo-service-component"
-
- assert tasks._setup_for_discovery_streams(**test_input) == test_input
-
- # Bad case - something happened from the Consul call
- test_input = {'feed01': {'type': 'data_router', 'name': 'feed01',
- 'username': 'hero', 'password': '123456', 'location': 'Bedminster'},
- 'streams_publishes': {},
- 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
- {'type': 'data_router', 'name': 'feed01', 'username': 'hero',
- 'password': '123456', 'location': 'Bedminster'}],
- 'topic01': {'type': 'message_router', 'name': 'topic01'}}
- test_input["name"] = "some-foo-service-component"
-
- def barf(conn, key, add_name, add_value):
- raise RuntimeError("Barf")
-
- monkeypatch.setattr(k8splugin.discovery, "add_to_entry",
- barf)
-
- with pytest.raises(NonRecoverableError):
- tasks._setup_for_discovery_streams(**test_input)
-
def test_verify_container(monkeypatch, mockconfig):
import k8sclient
from k8splugin import tasks
@@ -195,28 +139,6 @@ def test_verify_container(monkeypatch, mockconfig):
assert not tasks._verify_k8s_deployment("some-location", "some-name", 2)
-
-def test_update_delivery_url(monkeypatch, mockconfig):
- import k8splugin
- from k8splugin import tasks
- test_input = {'feed01': {'type': 'data_router', 'name': 'feed01',
- 'username': 'hero', 'password': '123456', 'location': 'Bedminster',
- 'route': 'some-path'},
- 'streams_publishes': {},
- 'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
- {'type': 'data_router', 'name': 'feed01', 'username': 'hero',
- 'password': '123456', 'location': 'Bedminster',
- 'route': 'some-path'}],
- 'topic01': {'type': 'message_router', 'name': 'topic01'},
- 'ports': ['8080/tcp:0']}
- test_input["service_component_name"] = "some-foo-service-component"
-
- expected = copy.deepcopy(test_input)
- expected["feed01"]["delivery_url"] = "http://some-foo-service-component:8080/some-path"
-
- assert tasks._update_delivery_url(**test_input) == expected
-
-
def test_enhance_docker_params(mockconfig):
from k8splugin import tasks
# Good - Test empty docker config