summaryrefslogtreecommitdiffstats
path: root/k8s/k8splugin
diff options
context:
space:
mode:
authorJack Lucas <jflucas@research.att.com>2018-03-22 17:43:01 +0000
committerJack Lucas <jflucas@research.att.com>2018-03-23 12:46:10 +0000
commit1ed29c111f50e33ac89915ba86b530820da3a689 (patch)
tree314ad8dc5dd2cad4ba89bac5662015f8a9c43ced /k8s/k8splugin
parent19a075b395d687753de0f0266842cb9af8f24682 (diff)
Add DCAE Kubernetes plugin
Change-Id: Ic329f77bb09dcccfd49a34a8f84d387e8da929c6 Issue-ID: DCAEGEN2-410 Signed-off-by: Jack Lucas <jflucas@research.att.com>
Diffstat (limited to 'k8s/k8splugin')
-rw-r--r--k8s/k8splugin/__init__.py30
-rw-r--r--k8s/k8splugin/decorators.py102
-rw-r--r--k8s/k8splugin/discovery.py269
-rw-r--r--k8s/k8splugin/exceptions.py29
-rw-r--r--k8s/k8splugin/tasks.py682
-rw-r--r--k8s/k8splugin/utils.py43
6 files changed, 1155 insertions, 0 deletions
diff --git a/k8s/k8splugin/__init__.py b/k8s/k8splugin/__init__.py
new file mode 100644
index 0000000..28306ee
--- /dev/null
+++ b/k8s/k8splugin/__init__.py
@@ -0,0 +1,30 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# REVIEW: Tried to source the version from here but you run into import issues
+# because "tasks" module is loaded. This method seems to be the PEP 396
+# recommended way and is listed #3 here https://packaging.python.org/single_source_version/
+# __version__ = '0.1.0'
+
+from .tasks import create_for_components, create_for_components_with_streams, \
+ create_and_start_container_for_components_with_streams, \
+ create_for_platforms, create_and_start_container, \
+ create_and_start_container_for_components, create_and_start_container_for_platforms, \
+ stop_and_remove_container, cleanup_discovery, policy_update, scale \ No newline at end of file
diff --git a/k8s/k8splugin/decorators.py b/k8s/k8splugin/decorators.py
new file mode 100644
index 0000000..186b212
--- /dev/null
+++ b/k8s/k8splugin/decorators.py
@@ -0,0 +1,102 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import copy
+from cloudify import ctx
+from cloudify.exceptions import NonRecoverableError, RecoverableError
+from dockering import utils as doc
+from k8splugin import discovery as dis
+from k8splugin.exceptions import DockerPluginDeploymentError, \
+ DockerPluginDependencyNotReadyError
+from k8splugin import utils
+
+
+def monkeypatch_loggers(task_func):
+ """Sets up the dependent loggers"""
+
+ def wrapper(**kwargs):
+ # Ouch! Monkeypatch loggers
+ doc.logger = ctx.logger
+ dis.logger = ctx.logger
+
+ return task_func(**kwargs)
+
+ return wrapper
+
+
+def wrap_error_handling_start(task_start_func):
+ """Wrap error handling for the start operations"""
+
+ def wrapper(**kwargs):
+ try:
+ return task_start_func(**kwargs)
+ except DockerPluginDependencyNotReadyError as e:
+ # You are here because things we need like a working docker host is not
+ # available yet so let Cloudify try again later.
+ raise RecoverableError(e)
+ except DockerPluginDeploymentError as e:
+ # Container failed to come up in the allotted time. This is deemed
+ # non-recoverable.
+ raise NonRecoverableError(e)
+ except Exception as e:
+ ctx.logger.error("Unexpected error while starting container: {0}"
+ .format(str(e)))
+ raise NonRecoverableError(e)
+
+ return wrapper
+
+
+def _wrapper_merge_inputs(task_func, properties, **kwargs):
+ """Merge Cloudify properties with input kwargs before calling task func"""
+ inputs = copy.deepcopy(properties)
+ # Recursively update
+ utils.update_dict(inputs, kwargs)
+
+ # Apparently kwargs contains "ctx" which is cloudify.context.CloudifyContext
+ # This has to be removed and not copied into runtime_properties else you get
+ # JSON serialization errors.
+ if "ctx" in inputs:
+ del inputs["ctx"]
+
+ return task_func(**inputs)
+
+def merge_inputs_for_create(task_create_func):
+ """Merge all inputs for start operation into one dict"""
+
+ # Needed to wrap the wrapper because I was seeing issues with
+ # "RuntimeError: No context set in current execution thread"
+ def wrapper(**kwargs):
+ # NOTE: ctx.node.properties is an ImmutableProperties instance which is
+ # why it is passed into a mutable dict so that it can be deep copied
+ return _wrapper_merge_inputs(task_create_func,
+ dict(ctx.node.properties), **kwargs)
+
+ return wrapper
+
+def merge_inputs_for_start(task_start_func):
+ """Merge all inputs for start operation into one dict"""
+
+ # Needed to wrap the wrapper because I was seeing issues with
+ # "RuntimeError: No context set in current execution thread"
+ def wrapper(**kwargs):
+ return _wrapper_merge_inputs(task_start_func,
+ ctx.instance.runtime_properties, **kwargs)
+
+ return wrapper
diff --git a/k8s/k8splugin/discovery.py b/k8s/k8splugin/discovery.py
new file mode 100644
index 0000000..f3b87b6
--- /dev/null
+++ b/k8s/k8splugin/discovery.py
@@ -0,0 +1,269 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+from functools import partial
+import json
+import logging
+import uuid
+import requests
+import consul
+import re
+
+
+logger = logging.getLogger("discovery")
+
+
+class DiscoveryError(RuntimeError):
+ pass
+
+class DiscoveryConnectionError(RuntimeError):
+ pass
+
+class DiscoveryServiceNotFoundError(RuntimeError):
+ pass
+
+class DiscoveryKVEntryNotFoundError(RuntimeError):
+ pass
+
+
+def _wrap_consul_call(consul_func, *args, **kwargs):
+ """Wrap Consul call to map errors"""
+ try:
+ return consul_func(*args, **kwargs)
+ except requests.exceptions.ConnectionError as e:
+ raise DiscoveryConnectionError(e)
+
+
+def generate_service_component_name(service_component_type):
+ """Generate service component id used to pass into the service component
+ instance and used as the key to the service component configuration.
+
+ Updated for use with Kubernetes. Sometimes the service component name gets
+ used in Kubernetes in contexts (such as naming a Kubernetes Service) that
+ requires the name to conform to the RFC1035 DNS "label" syntax:
+ -- starts with an alpha
+ -- contains only of alphanumerics and "-"
+ -- <= 63 characters long
+
+ Format:
+ s<service component id>-<service component type>,
+ truncated to 63 characters, "_" replaced with "-" in service_component_type,
+ other non-conforming characters removed from service_component_type
+ """
+ # Random generated
+ # Copied from cdap plugin
+ sct = re.sub('[^A-Za-z0-9-]','',(service_component_type.replace('_','-')))
+ return ("s{0}-{1}".format(str(uuid.uuid4()).replace("-",""),sct))[:63]
+
+
+def create_kv_conn(host):
+ """Create connection to key-value store
+
+ Returns a Consul client to the specified Consul host"""
+ try:
+ [hostname, port] = host.split(":")
+ return consul.Consul(host=hostname, port=int(port))
+ except ValueError as e:
+ return consul.Consul(host=host)
+
+def push_service_component_config(kv_conn, service_component_name, config):
+ config_string = config if isinstance(config, str) else json.dumps(config)
+ kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put)
+
+ if kv_put_func(service_component_name, config_string):
+ logger.info("Added config for {0}".format(service_component_name))
+ else:
+ raise DiscoveryError("Failed to push configuration")
+
+def remove_service_component_config(kv_conn, service_component_name):
+ kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete)
+ kv_delete_func(service_component_name)
+
+
+def get_kv_value(kv_conn, key):
+ """Get a key-value entry's value from Consul
+
+ Raises DiscoveryKVEntryNotFoundError if entry not found
+ """
+ kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get)
+ (index, val) = kv_get_func(key)
+
+ if val:
+ return json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate
+ else:
+ raise DiscoveryKVEntryNotFoundError("{0} kv entry not found".format(key))
+
+
+def _create_rel_key(service_component_name):
+ return "{0}:rel".format(service_component_name)
+
+def store_relationship(kv_conn, source_name, target_name):
+ # TODO: Rel entry may already exist in a one-to-many situation. Need to
+ # support that.
+ rel_key = _create_rel_key(source_name)
+ rel_value = [target_name] if target_name else []
+
+ kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put)
+ kv_put_func(rel_key, json.dumps(rel_value))
+ logger.info("Added relationship for {0}".format(rel_key))
+
+def delete_relationship(kv_conn, service_component_name):
+ rel_key = _create_rel_key(service_component_name)
+ kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get)
+ index, rels = kv_get_func(rel_key)
+
+ if rels:
+ rels = json.loads(rels["Value"].decode("utf-8"))
+ kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete)
+ kv_delete_func(rel_key)
+ return rels
+ else:
+ return []
+
+def lookup_service(kv_conn, service_component_name):
+ catalog_get_func = partial(_wrap_consul_call, kv_conn.catalog.service)
+ index, results = catalog_get_func(service_component_name)
+
+ if results:
+ return results
+ else:
+ raise DiscoveryServiceNotFoundError("Failed to find: {0}".format(service_component_name))
+
+
+# TODO: Note these functions have been (for the most part) shamelessly lifted from
+# dcae-cli and should really be shared.
+
+def _is_healthy_pure(get_health_func, instance):
+ """Checks to see if a component instance is running healthy
+
+ Pure function edition
+
+ Args
+ ----
+ get_health_func: func(string) -> complex object
+ Look at unittests in test_discovery to see examples
+ instance: (string) fully qualified name of component instance
+
+ Returns
+ -------
+ True if instance has been found and is healthy else False
+ """
+ index, resp = get_health_func(instance)
+
+ if resp:
+ def is_passing(instance):
+ return all([check["Status"] == "passing" for check in instance["Checks"]])
+
+ return any([is_passing(instance) for instance in resp])
+ else:
+ return False
+
+def is_healthy(consul_host, instance):
+ """Checks to see if a component instance is running healthy
+
+ Impure function edition
+
+ Args
+ ----
+ consul_host: (string) host string of Consul
+ instance: (string) fully qualified name of component instance
+
+ Returns
+ -------
+ True if instance has been found and is healthy else False
+ """
+ cons = create_kv_conn(consul_host)
+
+ get_health_func = partial(_wrap_consul_call, cons.health.service)
+ return _is_healthy_pure(get_health_func, instance)
+
+
+def add_to_entry(conn, key, add_name, add_value):
+ """
+ Find 'key' in consul.
+ Treat its value as a JSON string representing a dict.
+ Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
+ Turn the resulting extended dict into a JSON string.
+ Store the string back into Consul under 'key'.
+ Watch out for conflicting concurrent updates.
+
+ Example:
+ Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
+ add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
+ should result in the value for key 'xyz:dmaap' in consul being updated to
+ '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
+ """
+ while True: # do until update succeeds
+ (index, val) = conn.kv.get(key) # index gives version of key retrieved
+
+ if val is None: # no key yet
+ vstring = '{}'
+ mod_index = 0 # Use 0 as the cas index for initial insertion of the key
+ else:
+ vstring = val['Value']
+ mod_index = val['ModifyIndex']
+
+ # Build the updated dict
+ # Exceptions just propagate
+ v = json.loads(vstring)
+ v[add_name] = add_value
+ new_vstring = json.dumps(v)
+
+ updated = conn.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false
+ if updated:
+ return v
+
+
+def _find_matching_services(services, name_search, tags):
+ """Find matching services given search criteria"""
+ def is_match(service):
+ srv_name, srv_tags = service
+ return name_search in srv_name and \
+ all(map(lambda tag: tag in srv_tags, tags))
+
+ return [ srv[0] for srv in services.items() if is_match(srv) ]
+
+def search_services(conn, name_search, tags):
+ """Search for services that match criteria
+
+ Args:
+ -----
+ name_search: (string) Name to search for as a substring
+ tags: (list) List of strings that are tags. A service must match **all** the
+ tags in the list.
+
+ Retruns:
+ --------
+ List of names of services that matched
+ """
+ # srvs is dict where key is service name and value is list of tags
+ catalog_get_services_func = partial(_wrap_consul_call, conn.catalog.services)
+ index, srvs = catalog_get_services_func()
+
+ if srvs:
+ matches = _find_matching_services(srvs, name_search, tags)
+
+ if matches:
+ return matches
+
+ raise DiscoveryServiceNotFoundError(
+ "No matches found: {0}, {1}".format(name_search, tags))
+ else:
+ raise DiscoveryServiceNotFoundError("No services found")
diff --git a/k8s/k8splugin/exceptions.py b/k8s/k8splugin/exceptions.py
new file mode 100644
index 0000000..0d8a341
--- /dev/null
+++ b/k8s/k8splugin/exceptions.py
@@ -0,0 +1,29 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+class DockerPluginDeploymentError(RuntimeError):
+ pass
+
+
+class DockerPluginDependencyNotReadyError(RuntimeError):
+ """Error to use when something that this plugin depends upon e.g. docker api,
+ consul is not ready"""
+ pass
+
diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py
new file mode 100644
index 0000000..1718274
--- /dev/null
+++ b/k8s/k8splugin/tasks.py
@@ -0,0 +1,682 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# Lifecycle interface calls for containerized components
+
+# Needed by Cloudify Manager to load google.auth for the Kubernetes python client
+import cloudify_importer
+
+import time, copy
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError, RecoverableError
+import dockering as doc
+from onap_dcae_dcaepolicy_lib import Policies
+from k8splugin import discovery as dis
+from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
+ merge_inputs_for_start, merge_inputs_for_create
+from k8splugin.exceptions import DockerPluginDeploymentError
+from k8splugin import utils
+from configure import configure
+from k8sclient import k8sclient
+
+# Get configuration
+plugin_conf = configure.configure()
+CONSUL_HOST = plugin_conf.get("consul_host")
+CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
+DCAE_NAMESPACE = plugin_conf.get("namespace")
+
+# Used to construct delivery urls for data router subscribers. Data router in FTL
+# requires https but this author believes that ONAP is to be defaulted to http.
+DEFAULT_SCHEME = "http"
+
+# Property keys
+SERVICE_COMPONENT_NAME = "service_component_name"
+CONTAINER_ID = "container_id"
+APPLICATION_CONFIG = "application_config"
+
+
+
+# Utility methods
+
+# Lifecycle interface calls for dcae.nodes.DockerContainer
+
+def _setup_for_discovery(**kwargs):
+ """Setup for config discovery"""
+ try:
+ name = kwargs['name']
+ application_config = kwargs[APPLICATION_CONFIG]
+
+ # NOTE: application_config is no longer a json string and is inputed as a
+ # YAML map which translates to a dict. We don't have to do any
+ # preprocessing anymore.
+ conn = dis.create_kv_conn(CONSUL_HOST)
+ dis.push_service_component_config(conn, name, application_config)
+ return kwargs
+ except dis.DiscoveryConnectionError as e:
+ raise RecoverableError(e)
+ except Exception as e:
+ ctx.logger.error("Unexpected error while pushing configuration: {0}"
+ .format(str(e)))
+ raise NonRecoverableError(e)
+
+def _generate_component_name(**kwargs):
+ """Generate component name"""
+ service_component_type = kwargs['service_component_type']
+ name_override = kwargs['service_component_name_override']
+
+ kwargs['name'] = name_override if name_override \
+ else dis.generate_service_component_name(service_component_type)
+ return kwargs
+
+def _done_for_create(**kwargs):
+ """Wrap up create operation"""
+ name = kwargs['name']
+ kwargs[SERVICE_COMPONENT_NAME] = name
+ # All updates to the runtime_properties happens here. I don't see a reason
+ # why we shouldn't do this because the context is not being mutated by
+ # something else and will keep the other functions pure (pure in the sense
+ # not dealing with CloudifyContext).
+ ctx.instance.runtime_properties.update(kwargs)
+ ctx.logger.info("Done setting up: {0}".format(name))
+ return kwargs
+
+
+@merge_inputs_for_create
+@monkeypatch_loggers
+@Policies.gather_policies_to_node()
+@operation
+def create_for_components(**create_inputs):
+ """Create step for Docker containers that are components
+
+ This interface is responsible for:
+
+ 1. Generating service component name
+ 2. Populating config information into Consul
+ """
+ _done_for_create(
+ **_setup_for_discovery(
+ **_generate_component_name(
+ **create_inputs)))
+
+
+def _parse_streams(**kwargs):
+ """Parse streams and setup for DMaaP plugin"""
+ # The DMaaP plugin requires this plugin to set the runtime properties
+ # keyed by the node name.
+ def setup_publishes(s):
+ kwargs[s["name"]] = s
+
+ map(setup_publishes, kwargs["streams_publishes"])
+
+ def setup_subscribes(s):
+ if s["type"] == "data_router":
+ # If username and password has been provided then generate it. The
+ # DMaaP plugin doesn't generate for subscribers. The generation code
+ # and length of username password has been lifted from the DMaaP
+ # plugin.
+
+ # Don't want to mutate the source
+ s = copy.deepcopy(s)
+ if not s.get("username", None):
+ s["username"] = utils.random_string(8)
+ if not s.get("password", None):
+ s["password"] = utils.random_string(10)
+
+ kwargs[s["name"]] = s
+
+ # NOTE: That the delivery url is constructed and setup in the start operation
+ map(setup_subscribes, kwargs["streams_subscribes"])
+
+ return kwargs
+
+def _setup_for_discovery_streams(**kwargs):
+ """Setup for discovery of streams
+
+ Specifically, there's a race condition this call addresses for data router
+ subscriber case. The component needs its feed subscriber information but the
+ DMaaP plugin doesn't provide this until after the docker plugin start
+ operation.
+ """
+ dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
+ if s["type"] == "data_router"]
+
+ if dr_subs:
+ dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
+ conn = dis.create_kv_conn(CONSUL_HOST)
+
+ def add_feed(dr_sub):
+ # delivery url and subscriber id will be fill by the dmaap plugin later
+ v = { "location": dr_sub["location"], "delivery_url": None,
+ "username": dr_sub["username"], "password": dr_sub["password"],
+ "subscriber_id": None }
+ return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
+
+ try:
+ if all(map(add_feed, dr_subs)):
+ return kwargs
+ except Exception as e:
+ raise NonRecoverableError(e)
+
+ # You should never get here
+ raise NonRecoverableError("Failure updating feed streams in Consul")
+ else:
+ return kwargs
+
+
+@merge_inputs_for_create
+@monkeypatch_loggers
+@Policies.gather_policies_to_node()
+@operation
+def create_for_components_with_streams(**create_inputs):
+ """Create step for Docker containers that are components that use DMaaP
+
+ This interface is responsible for:
+
+ 1. Generating service component name
+ 2. Setup runtime properties for DMaaP plugin
+ 3. Populating application config into Consul
+ 4. Populating DMaaP config for data router subscribers in Consul
+ """
+ _done_for_create(
+ **_setup_for_discovery(
+ **_setup_for_discovery_streams(
+ **_parse_streams(
+ **_generate_component_name(
+ **create_inputs)))))
+
+
+@merge_inputs_for_create
+@monkeypatch_loggers
+@operation
+def create_for_platforms(**create_inputs):
+ """Create step for Docker containers that are platform components
+
+ This interface is responible for:
+
+ 1. Populating config information into Consul
+ """
+ _done_for_create(
+ **_setup_for_discovery(
+ **create_inputs))
+
+
+def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
+ with_port=False):
+ conn = dis.create_kv_conn(consul_host)
+ results = dis.lookup_service(conn, service_component_name)
+
+ if with_port:
+ # Just grab first
+ result = results[0]
+ return "{address}:{port}".format(address=result["ServiceAddress"],
+ port=result["ServicePort"])
+ else:
+ return results[0]["ServiceAddress"]
+
+
+def _verify_container(service_component_name, max_wait):
+ """Verify that the container is healthy
+
+ Args:
+ -----
+ max_wait (integer): limit to how may attempts to make which translates to
+ seconds because each sleep is one second. 0 means infinite.
+
+ Return:
+ -------
+ True if component is healthy else a DockerPluginDeploymentError exception
+ will be raised.
+ """
+ num_attempts = 1
+
+ while True:
+ if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
+ return True
+ else:
+ num_attempts += 1
+
+ if max_wait > 0 and max_wait < num_attempts:
+ raise DockerPluginDeploymentError("Container never became healthy")
+
+ time.sleep(1)
+
+ return True
+
+def _create_and_start_container(container_name, image, **kwargs):
+ '''
+ This will create a k8s Deployment and, if needed, a k8s Service or two.
+ (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
+ We're not exposing k8s to the component developer and the blueprint author.
+ This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
+ the details from the component developer and the blueprint author.)
+
+ kwargs may have:
+ - volumes: array of volume objects, where a volume object is:
+ {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
+ - ports: array of strings in the form "container_port:host_port"
+ - env: map of name-value pairs ( {name0: value0, name1: value1...} )
+ - always_pull: boolean. If true, sets image pull policy to "Always"
+ so that a fresh copy of the image is always pull. Otherwise, sets
+ image pull policy to "IfNotPresent"
+ - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
+ - log_info: an object with info for setting up ELK logging, with the form:
+ {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
+ - replicas: number of replicas to be launched initially
+ '''
+ env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
+ "CONFIG_BINDING_SERVICE": "config-binding-service" }
+ env.update(kwargs.get("env", {}))
+ ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
+ ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
+ replicas = kwargs.get("replicas", 1)
+ _,dep = k8sclient.deploy(DCAE_NAMESPACE,
+ container_name,
+ image,
+ replicas = replicas,
+ always_pull=kwargs.get("always_pull_image", False),
+ k8sconfig=plugin_conf,
+ volumes=kwargs.get("volumes",[]),
+ ports=kwargs.get("ports",[]),
+ msb_list=kwargs.get("msb_list"),
+ env = env,
+ labels = kwargs.get("labels", {}),
+ log_info=kwargs.get("log_info"))
+
+ # Capture the result of deployment for future use
+ ctx.instance.runtime_properties["k8s_deployment"] = dep
+ ctx.instance.runtime_properties["replicas"] = replicas
+ ctx.logger.info ("Deployment complete: {0}".format(dep))
+
+def _parse_cloudify_context(**kwargs):
+ """Parse Cloudify context
+
+ Extract what is needed. This is impure function because it requires ctx.
+ """
+ kwargs["deployment_id"] = ctx.deployment.id
+
+ # Set some labels for the Kubernetes pods
+ kwargs["labels"] = {
+ "cfydeployment" : ctx.deployment.id,
+ "cfynode": ctx.node.name,
+ "cfynodeinstance": ctx.instance.id
+ }
+
+ # Pick up the centralized logging info
+ if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
+ kwargs["log_info"] = ctx.node.properties["log_info"]
+
+ # Pick up replica count and always_pull_image flag
+ if "replicas" in ctx.node.properties:
+ kwargs["replicas"] = ctx.node.properties["replicas"]
+ if "always_pull_image" in ctx.node.properties:
+ kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
+
+ return kwargs
+
+def _enhance_docker_params(**kwargs):
+ """Setup Docker envs"""
+ docker_config = kwargs.get("docker_config", {})
+
+ envs = kwargs.get("envs", {})
+ # NOTE: Healthchecks are optional until prepared to handle use cases that
+ # don't necessarily use http
+ envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
+ if "healthcheck" in docker_config else {}
+ envs.update(envs_healthcheck)
+
+ # Set tags on this component for its Consul registration as a service
+ tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
+ tags = [ str(tag) for tag in tags if tag is not None ]
+ # Registrator will use this to register this component with tags. Must be
+ # comma delimited.
+ envs["SERVICE_TAGS"] = ",".join(tags)
+
+ kwargs["envs"] = envs
+
+ def combine_params(key, docker_config, kwargs):
+ v = docker_config.get(key, []) + kwargs.get(key, [])
+ if v:
+ kwargs[key] = v
+ return kwargs
+
+ # Add the lists of ports and volumes unintelligently - meaning just add the
+ # lists together with no deduping.
+ kwargs = combine_params("ports", docker_config, kwargs)
+ kwargs = combine_params("volumes", docker_config, kwargs)
+
+
+ return kwargs
+
+def _create_and_start_component(**kwargs):
+ """Create and start component (container)"""
+ image = kwargs["image"]
+ service_component_name = kwargs[SERVICE_COMPONENT_NAME]
+ # Need to be picky and manually select out pieces because just using kwargs
+ # which contains everything confused the execution of
+ # _create_and_start_container because duplicate variables exist
+ sub_kwargs = {
+ "volumes": kwargs.get("volumes", []),
+ "ports": kwargs.get("ports", None),
+ "envs": kwargs.get("envs", {}),
+ "log_info": kwargs.get("log_info", {}),
+ "labels": kwargs.get("labels", {})}
+ _create_and_start_container(service_component_name, image, **sub_kwargs)
+
+ # TODO: Use regular logging here
+ ctx.logger.info("Container started: {0}".format(service_component_name))
+
+ return kwargs
+
+def _verify_component(**kwargs):
+ """Verify component (container) is healthy"""
+ service_component_name = kwargs[SERVICE_COMPONENT_NAME]
+ # TODO: "Consul doesn't make its first health check immediately upon registration.
+ # Instead it waits for the health check interval to pass."
+ # Possible enhancement is to read the interval (and possibly the timeout) from
+ # docker_config and multiply that by a number to come up with a more suitable
+ # max_wait.
+
+ max_wait = kwargs.get("max_wait", 300)
+
+ # Verify that the container is healthy
+
+ if _verify_container(service_component_name, max_wait):
+ service_component_name = kwargs[SERVICE_COMPONENT_NAME]
+
+ # TODO: Use regular logging here
+ ctx.logger.info("Container is healthy: {0}".format(service_component_name))
+
+ return kwargs
+
+def _done_for_start(**kwargs):
+ ctx.instance.runtime_properties.update(kwargs)
+ ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
+ return kwargs
+
+def _setup_msb_registration(service_name, msb_reg):
+ return {
+ "serviceName" : service_name,
+ "port" : msb_reg.get("port", "80"),
+ "version" : msb_reg.get("version", "v1"),
+ "url" : msb_reg.get("url_path", "/v1"),
+ "protocol" : "REST",
+ "enable_ssl" : msb_reg.get("uses_ssl", False),
+ "visualRange" : "1"
+}
+
+@wrap_error_handling_start
+@merge_inputs_for_start
+@monkeypatch_loggers
+@operation
+def create_and_start_container_for_components(**start_inputs):
+ """Create Docker container and start for components
+
+ This operation method is to be used with the DockerContainerForComponents
+ node type. After launching the container, the plugin will verify with Consul
+ that the app is up and healthy before terminating.
+ """
+ _done_for_start(
+ **_verify_component(
+ **_create_and_start_component(
+ **_enhance_docker_params(
+ **_parse_cloudify_context(**start_inputs)))))
+
+
+def _update_delivery_url(**kwargs):
+ """Update the delivery url for data router subscribers"""
+ dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
+ if s["type"] == "data_router"]
+
+ if dr_subs:
+ service_component_name = kwargs[SERVICE_COMPONENT_NAME]
+ # TODO: Should NOT be setting up the delivery url with ip addresses
+ # because in the https case, this will not work because data router does
+ # a certificate validation using the fqdn.
+ subscriber_host = _lookup_service(service_component_name, with_port=True)
+
+ for dr_sub in dr_subs:
+ scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
+ if "route" not in dr_sub:
+ raise NonRecoverableError("'route' key missing from data router subscriber")
+ path = dr_sub["route"]
+ dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
+ scheme=scheme, host=subscriber_host, path=path)
+ kwargs[dr_sub["name"]] = dr_sub
+
+ return kwargs
+
+@wrap_error_handling_start
+@merge_inputs_for_start
+@monkeypatch_loggers
+@operation
+def create_and_start_container_for_components_with_streams(**start_inputs):
+ """Create Docker container and start for components that have streams
+
+ This operation method is to be used with the DockerContainerForComponents
+ node type. After launching the container, the plugin will verify with Consul
+ that the app is up and healthy before terminating.
+ """
+ _done_for_start(
+ **_update_delivery_url(
+ **_verify_component(
+ **_create_and_start_component(
+ **_enhance_docker_params(
+ **_parse_cloudify_context(**start_inputs))))))
+
+
+@wrap_error_handling_start
+@monkeypatch_loggers
+@operation
+def create_and_start_container_for_platforms(**kwargs):
+ """Create Docker container and start for platform services
+
+ This operation method is to be used with the ContainerizedPlatformComponent
+ node type.
+ """
+ # Capture node properties
+ image = ctx.node.properties["image"]
+ docker_config = ctx.node.properties.get("docker_config", {})
+ if "dns_name" in ctx.node.properties:
+ service_component_name = ctx.node.properties["dns_name"]
+ else:
+ service_component_name = ctx.node.properties["name"]
+
+
+ envs = kwargs.get("envs", {})
+ # NOTE: Healthchecks are optional until prepared to handle use cases that
+ # don't necessarily use http
+ envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
+ if "healthcheck" in docker_config else {}
+ envs.update(envs_healthcheck)
+ kwargs["envs"] = envs
+
+ # Set some labels for the Kubernetes pods
+ kwargs["labels"] = {
+ "cfydeployment" : ctx.deployment.id,
+ "cfynode": ctx.node.name,
+ "cfynodeinstance": ctx.instance.id
+ }
+
+ host_port = ctx.node.properties["host_port"]
+ container_port = ctx.node.properties["container_port"]
+
+ # Cloudify properties are all required and Cloudify complains that None
+ # is not a valid type for integer. Defaulting to 0 to indicate to not
+ # use this and not to set a specific port mapping in cases like service
+ # change handler.
+ if container_port != 0:
+ # Doing this because other nodes might want to use this property
+ port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
+ ports = kwargs.get("ports", []) + [ port_mapping ]
+ kwargs["ports"] = ports
+ if "ports" not in kwargs:
+ ctx.logger.warn("No port mappings defined. Will randomly assign port.")
+
+ # All of the new node properties could be handled more DRYly!
+ # If a registration to MSB is required, then set up the registration info
+ if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
+ kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
+
+ # If centralized logging via ELK is desired, then set up the logging info
+ if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
+ kwargs["log_info"] = ctx.node.properties["log_info"]
+
+ # Pick up replica count and always_pull_image flag
+ if "replicas" in ctx.node.properties:
+ kwargs["replicas"] = ctx.node.properties["replicas"]
+ if "always_pull_image" in ctx.node.properties:
+ kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
+ _create_and_start_container(service_component_name, image, **kwargs)
+
+ ctx.logger.info("Container started: {0}".format(service_component_name))
+
+ # Verify that the container is healthy
+
+ max_wait = kwargs.get("max_wait", 300)
+
+ if _verify_container(service_component_name, max_wait):
+ ctx.logger.info("Container is healthy: {0}".format(service_component_name))
+
+
+@wrap_error_handling_start
+@monkeypatch_loggers
+@operation
+def create_and_start_container(**kwargs):
+ """Create Docker container and start"""
+ service_component_name = ctx.node.properties["name"]
+ ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
+
+ image = ctx.node.properties["image"]
+
+ _create_and_start_container(service_component_name, image,**kwargs)
+
+ ctx.logger.info("Component deployed: {0}".format(service_component_name))
+
+
+@monkeypatch_loggers
+@operation
+def stop_and_remove_container(**kwargs):
+ """Stop and remove Docker container"""
+ try:
+ deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
+ k8sclient.undeploy(deployment_description)
+
+ except Exception as e:
+ ctx.logger.error("Unexpected error while stopping container: {0}"
+ .format(str(e)))
+
+@monkeypatch_loggers
+@operation
+def scale(replicas, **kwargs):
+ """Change number of replicas in the deployment"""
+ if replicas > 0:
+ current_replicas = ctx.instance.runtime_properties["replicas"]
+ ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
+ try:
+ deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
+ k8sclient.scale(deployment_description, replicas)
+ ctx.instance.runtime_properties["replicas"] = replicas
+ except Exception as e:
+ ctx.logger.error ("Unexpected error while scaling {0}".format(str(e)))
+ else:
+ ctx.logger.info("Ignoring request to scale to zero replicas")
+
+@monkeypatch_loggers
+@Policies.cleanup_policies_on_node
+@operation
+def cleanup_discovery(**kwargs):
+ """Delete configuration from Consul"""
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
+
+ try:
+ conn = dis.create_kv_conn(CONSUL_HOST)
+ dis.remove_service_component_config(conn, service_component_name)
+ except dis.DiscoveryConnectionError as e:
+ raise RecoverableError(e)
+
+
+def _notify_container(**kwargs):
+ """Notify container using the policy section in the docker_config"""
+ dc = kwargs["docker_config"]
+
+ if "policy" in dc:
+ if dc["policy"]["trigger_type"] == "docker":
+ pass
+ """
+ Need replacement for this in kubernetes.
+ Need to find all the pods that have been deployed
+ and execute the script in them.
+ Kubernetes does not appear to have a way to ask for a script
+ to be executed in all of the currently running pods for a
+ Kubernetes Deployment or ReplicaSet. We will have to find
+ each of them and run the script. The problem is that set of
+ pods could be changing. We can query to get all the pods, but
+ there's no guarantee the list won't change while we're trying to
+ execute the script.
+
+ In ONAP R2, all of the policy-driven components rely on polling.
+ """
+ """
+ # REVIEW: Need to finalize on the docker config policy data structure
+ script_path = dc["policy"]["script_path"]
+ updated_policies = kwargs["updated_policies"]
+ removed_policies = kwargs["removed_policies"]
+ policies = kwargs["policies"]
+ cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
+ msg_type="policies",
+ updated_policies=updated_policies,
+ removed_policies=removed_policies,
+ policies=policies
+ )
+
+ docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
+ docker_host_ip = _lookup_service(docker_host)
+ logins = _get_docker_logins()
+ client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
+
+ container_id = kwargs["container_id"]
+
+ doc.notify_for_policy_update(client, container_id, cmd)
+ """
+ # else the default is no trigger
+
+ return kwargs
+
+
+@monkeypatch_loggers
+@Policies.update_policies_on_node()
+@operation
+def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
+ """Policy update task
+
+ This method is responsible for updating the application configuration and
+ notifying the applications that the change has occurred. This is to be used
+ for the dcae.interfaces.policy.policy_update operation.
+
+ :updated_policies: contains the list of changed policy-configs when configs_only=True
+ (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
+ """
+ update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
+ update_inputs["updated_policies"] = updated_policies
+ update_inputs["removed_policies"] = removed_policies
+ update_inputs["policies"] = policies
+
+ _notify_container(**update_inputs)
diff --git a/k8s/k8splugin/utils.py b/k8s/k8splugin/utils.py
new file mode 100644
index 0000000..c45af68
--- /dev/null
+++ b/k8s/k8splugin/utils.py
@@ -0,0 +1,43 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import string
+import random
+import collections
+
+
+def random_string(n):
+ """Random generate an ascii string of "n" length"""
+ corpus = string.ascii_lowercase + string.ascii_uppercase + string.digits
+ return ''.join(random.choice(corpus) for x in range(n))
+
+
+def update_dict(d, u):
+ """Recursively updates dict
+
+ Update dict d with dict u
+ """
+ for k, v in u.iteritems():
+ if isinstance(v, collections.Mapping):
+ r = update_dict(d.get(k, {}), v)
+ d[k] = r
+ else:
+ d[k] = u[k]
+ return d