diff options
author | Lusheng Ji <lji@research.att.com> | 2018-03-23 19:21:59 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-03-23 19:21:59 +0000 |
commit | 7d9895ebb2ed0554ed8fafa5c909a9a930c3f962 (patch) | |
tree | e521755dea9fe4d32e66f3619f8beccedfc83121 /k8s/k8splugin | |
parent | 8534af4f1ef5be3c6f946913824d5ed807e2321c (diff) | |
parent | 1ed29c111f50e33ac89915ba86b530820da3a689 (diff) |
Merge "Add DCAE Kubernetes plugin"
Diffstat (limited to 'k8s/k8splugin')
-rw-r--r-- | k8s/k8splugin/__init__.py | 30 | ||||
-rw-r--r-- | k8s/k8splugin/decorators.py | 102 | ||||
-rw-r--r-- | k8s/k8splugin/discovery.py | 269 | ||||
-rw-r--r-- | k8s/k8splugin/exceptions.py | 29 | ||||
-rw-r--r-- | k8s/k8splugin/tasks.py | 682 | ||||
-rw-r--r-- | k8s/k8splugin/utils.py | 43 |
6 files changed, 1155 insertions, 0 deletions
diff --git a/k8s/k8splugin/__init__.py b/k8s/k8splugin/__init__.py new file mode 100644 index 0000000..28306ee --- /dev/null +++ b/k8s/k8splugin/__init__.py @@ -0,0 +1,30 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +# REVIEW: Tried to source the version from here but you run into import issues +# because "tasks" module is loaded. This method seems to be the PEP 396 +# recommended way and is listed #3 here https://packaging.python.org/single_source_version/ +# __version__ = '0.1.0' + +from .tasks import create_for_components, create_for_components_with_streams, \ + create_and_start_container_for_components_with_streams, \ + create_for_platforms, create_and_start_container, \ + create_and_start_container_for_components, create_and_start_container_for_platforms, \ + stop_and_remove_container, cleanup_discovery, policy_update, scale
\ No newline at end of file diff --git a/k8s/k8splugin/decorators.py b/k8s/k8splugin/decorators.py new file mode 100644 index 0000000..186b212 --- /dev/null +++ b/k8s/k8splugin/decorators.py @@ -0,0 +1,102 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import copy +from cloudify import ctx +from cloudify.exceptions import NonRecoverableError, RecoverableError +from dockering import utils as doc +from k8splugin import discovery as dis +from k8splugin.exceptions import DockerPluginDeploymentError, \ + DockerPluginDependencyNotReadyError +from k8splugin import utils + + +def monkeypatch_loggers(task_func): + """Sets up the dependent loggers""" + + def wrapper(**kwargs): + # Ouch! Monkeypatch loggers + doc.logger = ctx.logger + dis.logger = ctx.logger + + return task_func(**kwargs) + + return wrapper + + +def wrap_error_handling_start(task_start_func): + """Wrap error handling for the start operations""" + + def wrapper(**kwargs): + try: + return task_start_func(**kwargs) + except DockerPluginDependencyNotReadyError as e: + # You are here because things we need like a working docker host is not + # available yet so let Cloudify try again later. + raise RecoverableError(e) + except DockerPluginDeploymentError as e: + # Container failed to come up in the allotted time. This is deemed + # non-recoverable. + raise NonRecoverableError(e) + except Exception as e: + ctx.logger.error("Unexpected error while starting container: {0}" + .format(str(e))) + raise NonRecoverableError(e) + + return wrapper + + +def _wrapper_merge_inputs(task_func, properties, **kwargs): + """Merge Cloudify properties with input kwargs before calling task func""" + inputs = copy.deepcopy(properties) + # Recursively update + utils.update_dict(inputs, kwargs) + + # Apparently kwargs contains "ctx" which is cloudify.context.CloudifyContext + # This has to be removed and not copied into runtime_properties else you get + # JSON serialization errors. + if "ctx" in inputs: + del inputs["ctx"] + + return task_func(**inputs) + +def merge_inputs_for_create(task_create_func): + """Merge all inputs for start operation into one dict""" + + # Needed to wrap the wrapper because I was seeing issues with + # "RuntimeError: No context set in current execution thread" + def wrapper(**kwargs): + # NOTE: ctx.node.properties is an ImmutableProperties instance which is + # why it is passed into a mutable dict so that it can be deep copied + return _wrapper_merge_inputs(task_create_func, + dict(ctx.node.properties), **kwargs) + + return wrapper + +def merge_inputs_for_start(task_start_func): + """Merge all inputs for start operation into one dict""" + + # Needed to wrap the wrapper because I was seeing issues with + # "RuntimeError: No context set in current execution thread" + def wrapper(**kwargs): + return _wrapper_merge_inputs(task_start_func, + ctx.instance.runtime_properties, **kwargs) + + return wrapper diff --git a/k8s/k8splugin/discovery.py b/k8s/k8splugin/discovery.py new file mode 100644 index 0000000..f3b87b6 --- /dev/null +++ b/k8s/k8splugin/discovery.py @@ -0,0 +1,269 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +from functools import partial +import json +import logging +import uuid +import requests +import consul +import re + + +logger = logging.getLogger("discovery") + + +class DiscoveryError(RuntimeError): + pass + +class DiscoveryConnectionError(RuntimeError): + pass + +class DiscoveryServiceNotFoundError(RuntimeError): + pass + +class DiscoveryKVEntryNotFoundError(RuntimeError): + pass + + +def _wrap_consul_call(consul_func, *args, **kwargs): + """Wrap Consul call to map errors""" + try: + return consul_func(*args, **kwargs) + except requests.exceptions.ConnectionError as e: + raise DiscoveryConnectionError(e) + + +def generate_service_component_name(service_component_type): + """Generate service component id used to pass into the service component + instance and used as the key to the service component configuration. + + Updated for use with Kubernetes. Sometimes the service component name gets + used in Kubernetes in contexts (such as naming a Kubernetes Service) that + requires the name to conform to the RFC1035 DNS "label" syntax: + -- starts with an alpha + -- contains only of alphanumerics and "-" + -- <= 63 characters long + + Format: + s<service component id>-<service component type>, + truncated to 63 characters, "_" replaced with "-" in service_component_type, + other non-conforming characters removed from service_component_type + """ + # Random generated + # Copied from cdap plugin + sct = re.sub('[^A-Za-z0-9-]','',(service_component_type.replace('_','-'))) + return ("s{0}-{1}".format(str(uuid.uuid4()).replace("-",""),sct))[:63] + + +def create_kv_conn(host): + """Create connection to key-value store + + Returns a Consul client to the specified Consul host""" + try: + [hostname, port] = host.split(":") + return consul.Consul(host=hostname, port=int(port)) + except ValueError as e: + return consul.Consul(host=host) + +def push_service_component_config(kv_conn, service_component_name, config): + config_string = config if isinstance(config, str) else json.dumps(config) + kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put) + + if kv_put_func(service_component_name, config_string): + logger.info("Added config for {0}".format(service_component_name)) + else: + raise DiscoveryError("Failed to push configuration") + +def remove_service_component_config(kv_conn, service_component_name): + kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete) + kv_delete_func(service_component_name) + + +def get_kv_value(kv_conn, key): + """Get a key-value entry's value from Consul + + Raises DiscoveryKVEntryNotFoundError if entry not found + """ + kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get) + (index, val) = kv_get_func(key) + + if val: + return json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate + else: + raise DiscoveryKVEntryNotFoundError("{0} kv entry not found".format(key)) + + +def _create_rel_key(service_component_name): + return "{0}:rel".format(service_component_name) + +def store_relationship(kv_conn, source_name, target_name): + # TODO: Rel entry may already exist in a one-to-many situation. Need to + # support that. + rel_key = _create_rel_key(source_name) + rel_value = [target_name] if target_name else [] + + kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put) + kv_put_func(rel_key, json.dumps(rel_value)) + logger.info("Added relationship for {0}".format(rel_key)) + +def delete_relationship(kv_conn, service_component_name): + rel_key = _create_rel_key(service_component_name) + kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get) + index, rels = kv_get_func(rel_key) + + if rels: + rels = json.loads(rels["Value"].decode("utf-8")) + kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete) + kv_delete_func(rel_key) + return rels + else: + return [] + +def lookup_service(kv_conn, service_component_name): + catalog_get_func = partial(_wrap_consul_call, kv_conn.catalog.service) + index, results = catalog_get_func(service_component_name) + + if results: + return results + else: + raise DiscoveryServiceNotFoundError("Failed to find: {0}".format(service_component_name)) + + +# TODO: Note these functions have been (for the most part) shamelessly lifted from +# dcae-cli and should really be shared. + +def _is_healthy_pure(get_health_func, instance): + """Checks to see if a component instance is running healthy + + Pure function edition + + Args + ---- + get_health_func: func(string) -> complex object + Look at unittests in test_discovery to see examples + instance: (string) fully qualified name of component instance + + Returns + ------- + True if instance has been found and is healthy else False + """ + index, resp = get_health_func(instance) + + if resp: + def is_passing(instance): + return all([check["Status"] == "passing" for check in instance["Checks"]]) + + return any([is_passing(instance) for instance in resp]) + else: + return False + +def is_healthy(consul_host, instance): + """Checks to see if a component instance is running healthy + + Impure function edition + + Args + ---- + consul_host: (string) host string of Consul + instance: (string) fully qualified name of component instance + + Returns + ------- + True if instance has been found and is healthy else False + """ + cons = create_kv_conn(consul_host) + + get_health_func = partial(_wrap_consul_call, cons.health.service) + return _is_healthy_pure(get_health_func, instance) + + +def add_to_entry(conn, key, add_name, add_value): + """ + Find 'key' in consul. + Treat its value as a JSON string representing a dict. + Extend the dict by adding an entry with key 'add_name' and value 'add_value'. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under 'key'. + Watch out for conflicting concurrent updates. + + Example: + Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}' + add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'}) + should result in the value for key 'xyz:dmaap' in consul being updated to + '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}' + """ + while True: # do until update succeeds + (index, val) = conn.kv.get(key) # index gives version of key retrieved + + if val is None: # no key yet + vstring = '{}' + mod_index = 0 # Use 0 as the cas index for initial insertion of the key + else: + vstring = val['Value'] + mod_index = val['ModifyIndex'] + + # Build the updated dict + # Exceptions just propagate + v = json.loads(vstring) + v[add_name] = add_value + new_vstring = json.dumps(v) + + updated = conn.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false + if updated: + return v + + +def _find_matching_services(services, name_search, tags): + """Find matching services given search criteria""" + def is_match(service): + srv_name, srv_tags = service + return name_search in srv_name and \ + all(map(lambda tag: tag in srv_tags, tags)) + + return [ srv[0] for srv in services.items() if is_match(srv) ] + +def search_services(conn, name_search, tags): + """Search for services that match criteria + + Args: + ----- + name_search: (string) Name to search for as a substring + tags: (list) List of strings that are tags. A service must match **all** the + tags in the list. + + Retruns: + -------- + List of names of services that matched + """ + # srvs is dict where key is service name and value is list of tags + catalog_get_services_func = partial(_wrap_consul_call, conn.catalog.services) + index, srvs = catalog_get_services_func() + + if srvs: + matches = _find_matching_services(srvs, name_search, tags) + + if matches: + return matches + + raise DiscoveryServiceNotFoundError( + "No matches found: {0}, {1}".format(name_search, tags)) + else: + raise DiscoveryServiceNotFoundError("No services found") diff --git a/k8s/k8splugin/exceptions.py b/k8s/k8splugin/exceptions.py new file mode 100644 index 0000000..0d8a341 --- /dev/null +++ b/k8s/k8splugin/exceptions.py @@ -0,0 +1,29 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +class DockerPluginDeploymentError(RuntimeError): + pass + + +class DockerPluginDependencyNotReadyError(RuntimeError): + """Error to use when something that this plugin depends upon e.g. docker api, + consul is not ready""" + pass + diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py new file mode 100644 index 0000000..1718274 --- /dev/null +++ b/k8s/k8splugin/tasks.py @@ -0,0 +1,682 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +# Lifecycle interface calls for containerized components + +# Needed by Cloudify Manager to load google.auth for the Kubernetes python client +import cloudify_importer + +import time, copy +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError, RecoverableError +import dockering as doc +from onap_dcae_dcaepolicy_lib import Policies +from k8splugin import discovery as dis +from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \ + merge_inputs_for_start, merge_inputs_for_create +from k8splugin.exceptions import DockerPluginDeploymentError +from k8splugin import utils +from configure import configure +from k8sclient import k8sclient + +# Get configuration +plugin_conf = configure.configure() +CONSUL_HOST = plugin_conf.get("consul_host") +CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name") +DCAE_NAMESPACE = plugin_conf.get("namespace") + +# Used to construct delivery urls for data router subscribers. Data router in FTL +# requires https but this author believes that ONAP is to be defaulted to http. +DEFAULT_SCHEME = "http" + +# Property keys +SERVICE_COMPONENT_NAME = "service_component_name" +CONTAINER_ID = "container_id" +APPLICATION_CONFIG = "application_config" + + + +# Utility methods + +# Lifecycle interface calls for dcae.nodes.DockerContainer + +def _setup_for_discovery(**kwargs): + """Setup for config discovery""" + try: + name = kwargs['name'] + application_config = kwargs[APPLICATION_CONFIG] + + # NOTE: application_config is no longer a json string and is inputed as a + # YAML map which translates to a dict. We don't have to do any + # preprocessing anymore. + conn = dis.create_kv_conn(CONSUL_HOST) + dis.push_service_component_config(conn, name, application_config) + return kwargs + except dis.DiscoveryConnectionError as e: + raise RecoverableError(e) + except Exception as e: + ctx.logger.error("Unexpected error while pushing configuration: {0}" + .format(str(e))) + raise NonRecoverableError(e) + +def _generate_component_name(**kwargs): + """Generate component name""" + service_component_type = kwargs['service_component_type'] + name_override = kwargs['service_component_name_override'] + + kwargs['name'] = name_override if name_override \ + else dis.generate_service_component_name(service_component_type) + return kwargs + +def _done_for_create(**kwargs): + """Wrap up create operation""" + name = kwargs['name'] + kwargs[SERVICE_COMPONENT_NAME] = name + # All updates to the runtime_properties happens here. I don't see a reason + # why we shouldn't do this because the context is not being mutated by + # something else and will keep the other functions pure (pure in the sense + # not dealing with CloudifyContext). + ctx.instance.runtime_properties.update(kwargs) + ctx.logger.info("Done setting up: {0}".format(name)) + return kwargs + + +@merge_inputs_for_create +@monkeypatch_loggers +@Policies.gather_policies_to_node() +@operation +def create_for_components(**create_inputs): + """Create step for Docker containers that are components + + This interface is responsible for: + + 1. Generating service component name + 2. Populating config information into Consul + """ + _done_for_create( + **_setup_for_discovery( + **_generate_component_name( + **create_inputs))) + + +def _parse_streams(**kwargs): + """Parse streams and setup for DMaaP plugin""" + # The DMaaP plugin requires this plugin to set the runtime properties + # keyed by the node name. + def setup_publishes(s): + kwargs[s["name"]] = s + + map(setup_publishes, kwargs["streams_publishes"]) + + def setup_subscribes(s): + if s["type"] == "data_router": + # If username and password has been provided then generate it. The + # DMaaP plugin doesn't generate for subscribers. The generation code + # and length of username password has been lifted from the DMaaP + # plugin. + + # Don't want to mutate the source + s = copy.deepcopy(s) + if not s.get("username", None): + s["username"] = utils.random_string(8) + if not s.get("password", None): + s["password"] = utils.random_string(10) + + kwargs[s["name"]] = s + + # NOTE: That the delivery url is constructed and setup in the start operation + map(setup_subscribes, kwargs["streams_subscribes"]) + + return kwargs + +def _setup_for_discovery_streams(**kwargs): + """Setup for discovery of streams + + Specifically, there's a race condition this call addresses for data router + subscriber case. The component needs its feed subscriber information but the + DMaaP plugin doesn't provide this until after the docker plugin start + operation. + """ + dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ + if s["type"] == "data_router"] + + if dr_subs: + dmaap_kv_key = "{0}:dmaap".format(kwargs["name"]) + conn = dis.create_kv_conn(CONSUL_HOST) + + def add_feed(dr_sub): + # delivery url and subscriber id will be fill by the dmaap plugin later + v = { "location": dr_sub["location"], "delivery_url": None, + "username": dr_sub["username"], "password": dr_sub["password"], + "subscriber_id": None } + return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None + + try: + if all(map(add_feed, dr_subs)): + return kwargs + except Exception as e: + raise NonRecoverableError(e) + + # You should never get here + raise NonRecoverableError("Failure updating feed streams in Consul") + else: + return kwargs + + +@merge_inputs_for_create +@monkeypatch_loggers +@Policies.gather_policies_to_node() +@operation +def create_for_components_with_streams(**create_inputs): + """Create step for Docker containers that are components that use DMaaP + + This interface is responsible for: + + 1. Generating service component name + 2. Setup runtime properties for DMaaP plugin + 3. Populating application config into Consul + 4. Populating DMaaP config for data router subscribers in Consul + """ + _done_for_create( + **_setup_for_discovery( + **_setup_for_discovery_streams( + **_parse_streams( + **_generate_component_name( + **create_inputs))))) + + +@merge_inputs_for_create +@monkeypatch_loggers +@operation +def create_for_platforms(**create_inputs): + """Create step for Docker containers that are platform components + + This interface is responible for: + + 1. Populating config information into Consul + """ + _done_for_create( + **_setup_for_discovery( + **create_inputs)) + + +def _lookup_service(service_component_name, consul_host=CONSUL_HOST, + with_port=False): + conn = dis.create_kv_conn(consul_host) + results = dis.lookup_service(conn, service_component_name) + + if with_port: + # Just grab first + result = results[0] + return "{address}:{port}".format(address=result["ServiceAddress"], + port=result["ServicePort"]) + else: + return results[0]["ServiceAddress"] + + +def _verify_container(service_component_name, max_wait): + """Verify that the container is healthy + + Args: + ----- + max_wait (integer): limit to how may attempts to make which translates to + seconds because each sleep is one second. 0 means infinite. + + Return: + ------- + True if component is healthy else a DockerPluginDeploymentError exception + will be raised. + """ + num_attempts = 1 + + while True: + if k8sclient.is_available(DCAE_NAMESPACE, service_component_name): + return True + else: + num_attempts += 1 + + if max_wait > 0 and max_wait < num_attempts: + raise DockerPluginDeploymentError("Container never became healthy") + + time.sleep(1) + + return True + +def _create_and_start_container(container_name, image, **kwargs): + ''' + This will create a k8s Deployment and, if needed, a k8s Service or two. + (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use. + We're not exposing k8s to the component developer and the blueprint author. + This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide + the details from the component developer and the blueprint author.) + + kwargs may have: + - volumes: array of volume objects, where a volume object is: + {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"} + - ports: array of strings in the form "container_port:host_port" + - env: map of name-value pairs ( {name0: value0, name1: value1...} ) + - always_pull: boolean. If true, sets image pull policy to "Always" + so that a fresh copy of the image is always pull. Otherwise, sets + image pull policy to "IfNotPresent" + - msb_list: array of msb objects, where an msb object is as described in msb/msb.py. + - log_info: an object with info for setting up ELK logging, with the form: + {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}" + - replicas: number of replicas to be launched initially + ''' + env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME, + "CONFIG_BINDING_SERVICE": "config-binding-service" } + env.update(kwargs.get("env", {})) + ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs)) + ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf)) + replicas = kwargs.get("replicas", 1) + _,dep = k8sclient.deploy(DCAE_NAMESPACE, + container_name, + image, + replicas = replicas, + always_pull=kwargs.get("always_pull_image", False), + k8sconfig=plugin_conf, + volumes=kwargs.get("volumes",[]), + ports=kwargs.get("ports",[]), + msb_list=kwargs.get("msb_list"), + env = env, + labels = kwargs.get("labels", {}), + log_info=kwargs.get("log_info")) + + # Capture the result of deployment for future use + ctx.instance.runtime_properties["k8s_deployment"] = dep + ctx.instance.runtime_properties["replicas"] = replicas + ctx.logger.info ("Deployment complete: {0}".format(dep)) + +def _parse_cloudify_context(**kwargs): + """Parse Cloudify context + + Extract what is needed. This is impure function because it requires ctx. + """ + kwargs["deployment_id"] = ctx.deployment.id + + # Set some labels for the Kubernetes pods + kwargs["labels"] = { + "cfydeployment" : ctx.deployment.id, + "cfynode": ctx.node.name, + "cfynodeinstance": ctx.instance.id + } + + # Pick up the centralized logging info + if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]: + kwargs["log_info"] = ctx.node.properties["log_info"] + + # Pick up replica count and always_pull_image flag + if "replicas" in ctx.node.properties: + kwargs["replicas"] = ctx.node.properties["replicas"] + if "always_pull_image" in ctx.node.properties: + kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"] + + return kwargs + +def _enhance_docker_params(**kwargs): + """Setup Docker envs""" + docker_config = kwargs.get("docker_config", {}) + + envs = kwargs.get("envs", {}) + # NOTE: Healthchecks are optional until prepared to handle use cases that + # don't necessarily use http + envs_healthcheck = doc.create_envs_healthcheck(docker_config) \ + if "healthcheck" in docker_config else {} + envs.update(envs_healthcheck) + + # Set tags on this component for its Consul registration as a service + tags = [kwargs.get("deployment_id", None), kwargs["service_id"]] + tags = [ str(tag) for tag in tags if tag is not None ] + # Registrator will use this to register this component with tags. Must be + # comma delimited. + envs["SERVICE_TAGS"] = ",".join(tags) + + kwargs["envs"] = envs + + def combine_params(key, docker_config, kwargs): + v = docker_config.get(key, []) + kwargs.get(key, []) + if v: + kwargs[key] = v + return kwargs + + # Add the lists of ports and volumes unintelligently - meaning just add the + # lists together with no deduping. + kwargs = combine_params("ports", docker_config, kwargs) + kwargs = combine_params("volumes", docker_config, kwargs) + + + return kwargs + +def _create_and_start_component(**kwargs): + """Create and start component (container)""" + image = kwargs["image"] + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + # Need to be picky and manually select out pieces because just using kwargs + # which contains everything confused the execution of + # _create_and_start_container because duplicate variables exist + sub_kwargs = { + "volumes": kwargs.get("volumes", []), + "ports": kwargs.get("ports", None), + "envs": kwargs.get("envs", {}), + "log_info": kwargs.get("log_info", {}), + "labels": kwargs.get("labels", {})} + _create_and_start_container(service_component_name, image, **sub_kwargs) + + # TODO: Use regular logging here + ctx.logger.info("Container started: {0}".format(service_component_name)) + + return kwargs + +def _verify_component(**kwargs): + """Verify component (container) is healthy""" + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + # TODO: "Consul doesn't make its first health check immediately upon registration. + # Instead it waits for the health check interval to pass." + # Possible enhancement is to read the interval (and possibly the timeout) from + # docker_config and multiply that by a number to come up with a more suitable + # max_wait. + + max_wait = kwargs.get("max_wait", 300) + + # Verify that the container is healthy + + if _verify_container(service_component_name, max_wait): + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + + # TODO: Use regular logging here + ctx.logger.info("Container is healthy: {0}".format(service_component_name)) + + return kwargs + +def _done_for_start(**kwargs): + ctx.instance.runtime_properties.update(kwargs) + ctx.logger.info("Done starting: {0}".format(kwargs["name"])) + return kwargs + +def _setup_msb_registration(service_name, msb_reg): + return { + "serviceName" : service_name, + "port" : msb_reg.get("port", "80"), + "version" : msb_reg.get("version", "v1"), + "url" : msb_reg.get("url_path", "/v1"), + "protocol" : "REST", + "enable_ssl" : msb_reg.get("uses_ssl", False), + "visualRange" : "1" +} + +@wrap_error_handling_start +@merge_inputs_for_start +@monkeypatch_loggers +@operation +def create_and_start_container_for_components(**start_inputs): + """Create Docker container and start for components + + This operation method is to be used with the DockerContainerForComponents + node type. After launching the container, the plugin will verify with Consul + that the app is up and healthy before terminating. + """ + _done_for_start( + **_verify_component( + **_create_and_start_component( + **_enhance_docker_params( + **_parse_cloudify_context(**start_inputs))))) + + +def _update_delivery_url(**kwargs): + """Update the delivery url for data router subscribers""" + dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ + if s["type"] == "data_router"] + + if dr_subs: + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + # TODO: Should NOT be setting up the delivery url with ip addresses + # because in the https case, this will not work because data router does + # a certificate validation using the fqdn. + subscriber_host = _lookup_service(service_component_name, with_port=True) + + for dr_sub in dr_subs: + scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME + if "route" not in dr_sub: + raise NonRecoverableError("'route' key missing from data router subscriber") + path = dr_sub["route"] + dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format( + scheme=scheme, host=subscriber_host, path=path) + kwargs[dr_sub["name"]] = dr_sub + + return kwargs + +@wrap_error_handling_start +@merge_inputs_for_start +@monkeypatch_loggers +@operation +def create_and_start_container_for_components_with_streams(**start_inputs): + """Create Docker container and start for components that have streams + + This operation method is to be used with the DockerContainerForComponents + node type. After launching the container, the plugin will verify with Consul + that the app is up and healthy before terminating. + """ + _done_for_start( + **_update_delivery_url( + **_verify_component( + **_create_and_start_component( + **_enhance_docker_params( + **_parse_cloudify_context(**start_inputs)))))) + + +@wrap_error_handling_start +@monkeypatch_loggers +@operation +def create_and_start_container_for_platforms(**kwargs): + """Create Docker container and start for platform services + + This operation method is to be used with the ContainerizedPlatformComponent + node type. + """ + # Capture node properties + image = ctx.node.properties["image"] + docker_config = ctx.node.properties.get("docker_config", {}) + if "dns_name" in ctx.node.properties: + service_component_name = ctx.node.properties["dns_name"] + else: + service_component_name = ctx.node.properties["name"] + + + envs = kwargs.get("envs", {}) + # NOTE: Healthchecks are optional until prepared to handle use cases that + # don't necessarily use http + envs_healthcheck = doc.create_envs_healthcheck(docker_config) \ + if "healthcheck" in docker_config else {} + envs.update(envs_healthcheck) + kwargs["envs"] = envs + + # Set some labels for the Kubernetes pods + kwargs["labels"] = { + "cfydeployment" : ctx.deployment.id, + "cfynode": ctx.node.name, + "cfynodeinstance": ctx.instance.id + } + + host_port = ctx.node.properties["host_port"] + container_port = ctx.node.properties["container_port"] + + # Cloudify properties are all required and Cloudify complains that None + # is not a valid type for integer. Defaulting to 0 to indicate to not + # use this and not to set a specific port mapping in cases like service + # change handler. + if container_port != 0: + # Doing this because other nodes might want to use this property + port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port) + ports = kwargs.get("ports", []) + [ port_mapping ] + kwargs["ports"] = ports + if "ports" not in kwargs: + ctx.logger.warn("No port mappings defined. Will randomly assign port.") + + # All of the new node properties could be handled more DRYly! + # If a registration to MSB is required, then set up the registration info + if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]: + kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])] + + # If centralized logging via ELK is desired, then set up the logging info + if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]: + kwargs["log_info"] = ctx.node.properties["log_info"] + + # Pick up replica count and always_pull_image flag + if "replicas" in ctx.node.properties: + kwargs["replicas"] = ctx.node.properties["replicas"] + if "always_pull_image" in ctx.node.properties: + kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"] + _create_and_start_container(service_component_name, image, **kwargs) + + ctx.logger.info("Container started: {0}".format(service_component_name)) + + # Verify that the container is healthy + + max_wait = kwargs.get("max_wait", 300) + + if _verify_container(service_component_name, max_wait): + ctx.logger.info("Container is healthy: {0}".format(service_component_name)) + + +@wrap_error_handling_start +@monkeypatch_loggers +@operation +def create_and_start_container(**kwargs): + """Create Docker container and start""" + service_component_name = ctx.node.properties["name"] + ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name + + image = ctx.node.properties["image"] + + _create_and_start_container(service_component_name, image,**kwargs) + + ctx.logger.info("Component deployed: {0}".format(service_component_name)) + + +@monkeypatch_loggers +@operation +def stop_and_remove_container(**kwargs): + """Stop and remove Docker container""" + try: + deployment_description = ctx.instance.runtime_properties["k8s_deployment"] + k8sclient.undeploy(deployment_description) + + except Exception as e: + ctx.logger.error("Unexpected error while stopping container: {0}" + .format(str(e))) + +@monkeypatch_loggers +@operation +def scale(replicas, **kwargs): + """Change number of replicas in the deployment""" + if replicas > 0: + current_replicas = ctx.instance.runtime_properties["replicas"] + ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas)) + try: + deployment_description = ctx.instance.runtime_properties["k8s_deployment"] + k8sclient.scale(deployment_description, replicas) + ctx.instance.runtime_properties["replicas"] = replicas + except Exception as e: + ctx.logger.error ("Unexpected error while scaling {0}".format(str(e))) + else: + ctx.logger.info("Ignoring request to scale to zero replicas") + +@monkeypatch_loggers +@Policies.cleanup_policies_on_node +@operation +def cleanup_discovery(**kwargs): + """Delete configuration from Consul""" + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] + + try: + conn = dis.create_kv_conn(CONSUL_HOST) + dis.remove_service_component_config(conn, service_component_name) + except dis.DiscoveryConnectionError as e: + raise RecoverableError(e) + + +def _notify_container(**kwargs): + """Notify container using the policy section in the docker_config""" + dc = kwargs["docker_config"] + + if "policy" in dc: + if dc["policy"]["trigger_type"] == "docker": + pass + """ + Need replacement for this in kubernetes. + Need to find all the pods that have been deployed + and execute the script in them. + Kubernetes does not appear to have a way to ask for a script + to be executed in all of the currently running pods for a + Kubernetes Deployment or ReplicaSet. We will have to find + each of them and run the script. The problem is that set of + pods could be changing. We can query to get all the pods, but + there's no guarantee the list won't change while we're trying to + execute the script. + + In ONAP R2, all of the policy-driven components rely on polling. + """ + """ + # REVIEW: Need to finalize on the docker config policy data structure + script_path = dc["policy"]["script_path"] + updated_policies = kwargs["updated_policies"] + removed_policies = kwargs["removed_policies"] + policies = kwargs["policies"] + cmd = doc.build_policy_update_cmd(script_path, use_sh=False, + msg_type="policies", + updated_policies=updated_policies, + removed_policies=removed_policies, + policies=policies + ) + + docker_host = kwargs[SELECTED_CONTAINER_DESTINATION] + docker_host_ip = _lookup_service(docker_host) + logins = _get_docker_logins() + client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins) + + container_id = kwargs["container_id"] + + doc.notify_for_policy_update(client, container_id, cmd) + """ + # else the default is no trigger + + return kwargs + + +@monkeypatch_loggers +@Policies.update_policies_on_node() +@operation +def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs): + """Policy update task + + This method is responsible for updating the application configuration and + notifying the applications that the change has occurred. This is to be used + for the dcae.interfaces.policy.policy_update operation. + + :updated_policies: contains the list of changed policy-configs when configs_only=True + (default) Use configs_only=False to bring the full policy objects in :updated_policies:. + """ + update_inputs = copy.deepcopy(ctx.instance.runtime_properties) + update_inputs["updated_policies"] = updated_policies + update_inputs["removed_policies"] = removed_policies + update_inputs["policies"] = policies + + _notify_container(**update_inputs) diff --git a/k8s/k8splugin/utils.py b/k8s/k8splugin/utils.py new file mode 100644 index 0000000..c45af68 --- /dev/null +++ b/k8s/k8splugin/utils.py @@ -0,0 +1,43 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import string +import random +import collections + + +def random_string(n): + """Random generate an ascii string of "n" length""" + corpus = string.ascii_lowercase + string.ascii_uppercase + string.digits + return ''.join(random.choice(corpus) for x in range(n)) + + +def update_dict(d, u): + """Recursively updates dict + + Update dict d with dict u + """ + for k, v in u.iteritems(): + if isinstance(v, collections.Mapping): + r = update_dict(d.get(k, {}), v) + d[k] = r + else: + d[k] = u[k] + return d |