summaryrefslogtreecommitdiffstats
path: root/k8s/k8splugin/tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'k8s/k8splugin/tasks.py')
-rw-r--r--k8s/k8splugin/tasks.py682
1 files changed, 682 insertions, 0 deletions
diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py
new file mode 100644
index 0000000..1718274
--- /dev/null
+++ b/k8s/k8splugin/tasks.py
@@ -0,0 +1,682 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+# Lifecycle interface calls for containerized components
+
+# Needed by Cloudify Manager to load google.auth for the Kubernetes python client
+import cloudify_importer
+
+import time, copy
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError, RecoverableError
+import dockering as doc
+from onap_dcae_dcaepolicy_lib import Policies
+from k8splugin import discovery as dis
+from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
+ merge_inputs_for_start, merge_inputs_for_create
+from k8splugin.exceptions import DockerPluginDeploymentError
+from k8splugin import utils
+from configure import configure
+from k8sclient import k8sclient
+
+# Get configuration
+plugin_conf = configure.configure()
+CONSUL_HOST = plugin_conf.get("consul_host")
+CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
+DCAE_NAMESPACE = plugin_conf.get("namespace")
+
+# Used to construct delivery urls for data router subscribers. Data router in FTL
+# requires https but this author believes that ONAP is to be defaulted to http.
+DEFAULT_SCHEME = "http"
+
+# Property keys
+SERVICE_COMPONENT_NAME = "service_component_name"
+CONTAINER_ID = "container_id"
+APPLICATION_CONFIG = "application_config"
+
+
+
+# Utility methods
+
+# Lifecycle interface calls for dcae.nodes.DockerContainer
+
+def _setup_for_discovery(**kwargs):
+ """Setup for config discovery"""
+ try:
+ name = kwargs['name']
+ application_config = kwargs[APPLICATION_CONFIG]
+
+ # NOTE: application_config is no longer a json string and is inputed as a
+ # YAML map which translates to a dict. We don't have to do any
+ # preprocessing anymore.
+ conn = dis.create_kv_conn(CONSUL_HOST)
+ dis.push_service_component_config(conn, name, application_config)
+ return kwargs
+ except dis.DiscoveryConnectionError as e:
+ raise RecoverableError(e)
+ except Exception as e:
+ ctx.logger.error("Unexpected error while pushing configuration: {0}"
+ .format(str(e)))
+ raise NonRecoverableError(e)
+
+def _generate_component_name(**kwargs):
+ """Generate component name"""
+ service_component_type = kwargs['service_component_type']
+ name_override = kwargs['service_component_name_override']
+
+ kwargs['name'] = name_override if name_override \
+ else dis.generate_service_component_name(service_component_type)
+ return kwargs
+
+def _done_for_create(**kwargs):
+ """Wrap up create operation"""
+ name = kwargs['name']
+ kwargs[SERVICE_COMPONENT_NAME] = name
+ # All updates to the runtime_properties happens here. I don't see a reason
+ # why we shouldn't do this because the context is not being mutated by
+ # something else and will keep the other functions pure (pure in the sense
+ # not dealing with CloudifyContext).
+ ctx.instance.runtime_properties.update(kwargs)
+ ctx.logger.info("Done setting up: {0}".format(name))
+ return kwargs
+
+
+@merge_inputs_for_create
+@monkeypatch_loggers
+@Policies.gather_policies_to_node()
+@operation
+def create_for_components(**create_inputs):
+ """Create step for Docker containers that are components
+
+ This interface is responsible for:
+
+ 1. Generating service component name
+ 2. Populating config information into Consul
+ """
+ _done_for_create(
+ **_setup_for_discovery(
+ **_generate_component_name(
+ **create_inputs)))
+
+
+def _parse_streams(**kwargs):
+ """Parse streams and setup for DMaaP plugin"""
+ # The DMaaP plugin requires this plugin to set the runtime properties
+ # keyed by the node name.
+ def setup_publishes(s):
+ kwargs[s["name"]] = s
+
+ map(setup_publishes, kwargs["streams_publishes"])
+
+ def setup_subscribes(s):
+ if s["type"] == "data_router":
+ # If username and password has been provided then generate it. The
+ # DMaaP plugin doesn't generate for subscribers. The generation code
+ # and length of username password has been lifted from the DMaaP
+ # plugin.
+
+ # Don't want to mutate the source
+ s = copy.deepcopy(s)
+ if not s.get("username", None):
+ s["username"] = utils.random_string(8)
+ if not s.get("password", None):
+ s["password"] = utils.random_string(10)
+
+ kwargs[s["name"]] = s
+
+ # NOTE: That the delivery url is constructed and setup in the start operation
+ map(setup_subscribes, kwargs["streams_subscribes"])
+
+ return kwargs
+
+def _setup_for_discovery_streams(**kwargs):
+ """Setup for discovery of streams
+
+ Specifically, there's a race condition this call addresses for data router
+ subscriber case. The component needs its feed subscriber information but the
+ DMaaP plugin doesn't provide this until after the docker plugin start
+ operation.
+ """
+ dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
+ if s["type"] == "data_router"]
+
+ if dr_subs:
+ dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
+ conn = dis.create_kv_conn(CONSUL_HOST)
+
+ def add_feed(dr_sub):
+ # delivery url and subscriber id will be fill by the dmaap plugin later
+ v = { "location": dr_sub["location"], "delivery_url": None,
+ "username": dr_sub["username"], "password": dr_sub["password"],
+ "subscriber_id": None }
+ return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
+
+ try:
+ if all(map(add_feed, dr_subs)):
+ return kwargs
+ except Exception as e:
+ raise NonRecoverableError(e)
+
+ # You should never get here
+ raise NonRecoverableError("Failure updating feed streams in Consul")
+ else:
+ return kwargs
+
+
+@merge_inputs_for_create
+@monkeypatch_loggers
+@Policies.gather_policies_to_node()
+@operation
+def create_for_components_with_streams(**create_inputs):
+ """Create step for Docker containers that are components that use DMaaP
+
+ This interface is responsible for:
+
+ 1. Generating service component name
+ 2. Setup runtime properties for DMaaP plugin
+ 3. Populating application config into Consul
+ 4. Populating DMaaP config for data router subscribers in Consul
+ """
+ _done_for_create(
+ **_setup_for_discovery(
+ **_setup_for_discovery_streams(
+ **_parse_streams(
+ **_generate_component_name(
+ **create_inputs)))))
+
+
+@merge_inputs_for_create
+@monkeypatch_loggers
+@operation
+def create_for_platforms(**create_inputs):
+ """Create step for Docker containers that are platform components
+
+ This interface is responible for:
+
+ 1. Populating config information into Consul
+ """
+ _done_for_create(
+ **_setup_for_discovery(
+ **create_inputs))
+
+
+def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
+ with_port=False):
+ conn = dis.create_kv_conn(consul_host)
+ results = dis.lookup_service(conn, service_component_name)
+
+ if with_port:
+ # Just grab first
+ result = results[0]
+ return "{address}:{port}".format(address=result["ServiceAddress"],
+ port=result["ServicePort"])
+ else:
+ return results[0]["ServiceAddress"]
+
+
+def _verify_container(service_component_name, max_wait):
+ """Verify that the container is healthy
+
+ Args:
+ -----
+ max_wait (integer): limit to how may attempts to make which translates to
+ seconds because each sleep is one second. 0 means infinite.
+
+ Return:
+ -------
+ True if component is healthy else a DockerPluginDeploymentError exception
+ will be raised.
+ """
+ num_attempts = 1
+
+ while True:
+ if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
+ return True
+ else:
+ num_attempts += 1
+
+ if max_wait > 0 and max_wait < num_attempts:
+ raise DockerPluginDeploymentError("Container never became healthy")
+
+ time.sleep(1)
+
+ return True
+
+def _create_and_start_container(container_name, image, **kwargs):
+ '''
+ This will create a k8s Deployment and, if needed, a k8s Service or two.
+ (We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
+ We're not exposing k8s to the component developer and the blueprint author.
+ This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
+ the details from the component developer and the blueprint author.)
+
+ kwargs may have:
+ - volumes: array of volume objects, where a volume object is:
+ {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
+ - ports: array of strings in the form "container_port:host_port"
+ - env: map of name-value pairs ( {name0: value0, name1: value1...} )
+ - always_pull: boolean. If true, sets image pull policy to "Always"
+ so that a fresh copy of the image is always pull. Otherwise, sets
+ image pull policy to "IfNotPresent"
+ - msb_list: array of msb objects, where an msb object is as described in msb/msb.py.
+ - log_info: an object with info for setting up ELK logging, with the form:
+ {"log_directory": "/path/to/container/log/directory", "alternate_fb_path" : "/alternate/sidecar/log/path"}"
+ - replicas: number of replicas to be launched initially
+ '''
+ env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
+ "CONFIG_BINDING_SERVICE": "config-binding-service" }
+ env.update(kwargs.get("env", {}))
+ ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
+ ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
+ replicas = kwargs.get("replicas", 1)
+ _,dep = k8sclient.deploy(DCAE_NAMESPACE,
+ container_name,
+ image,
+ replicas = replicas,
+ always_pull=kwargs.get("always_pull_image", False),
+ k8sconfig=plugin_conf,
+ volumes=kwargs.get("volumes",[]),
+ ports=kwargs.get("ports",[]),
+ msb_list=kwargs.get("msb_list"),
+ env = env,
+ labels = kwargs.get("labels", {}),
+ log_info=kwargs.get("log_info"))
+
+ # Capture the result of deployment for future use
+ ctx.instance.runtime_properties["k8s_deployment"] = dep
+ ctx.instance.runtime_properties["replicas"] = replicas
+ ctx.logger.info ("Deployment complete: {0}".format(dep))
+
+def _parse_cloudify_context(**kwargs):
+ """Parse Cloudify context
+
+ Extract what is needed. This is impure function because it requires ctx.
+ """
+ kwargs["deployment_id"] = ctx.deployment.id
+
+ # Set some labels for the Kubernetes pods
+ kwargs["labels"] = {
+ "cfydeployment" : ctx.deployment.id,
+ "cfynode": ctx.node.name,
+ "cfynodeinstance": ctx.instance.id
+ }
+
+ # Pick up the centralized logging info
+ if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
+ kwargs["log_info"] = ctx.node.properties["log_info"]
+
+ # Pick up replica count and always_pull_image flag
+ if "replicas" in ctx.node.properties:
+ kwargs["replicas"] = ctx.node.properties["replicas"]
+ if "always_pull_image" in ctx.node.properties:
+ kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
+
+ return kwargs
+
+def _enhance_docker_params(**kwargs):
+ """Setup Docker envs"""
+ docker_config = kwargs.get("docker_config", {})
+
+ envs = kwargs.get("envs", {})
+ # NOTE: Healthchecks are optional until prepared to handle use cases that
+ # don't necessarily use http
+ envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
+ if "healthcheck" in docker_config else {}
+ envs.update(envs_healthcheck)
+
+ # Set tags on this component for its Consul registration as a service
+ tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
+ tags = [ str(tag) for tag in tags if tag is not None ]
+ # Registrator will use this to register this component with tags. Must be
+ # comma delimited.
+ envs["SERVICE_TAGS"] = ",".join(tags)
+
+ kwargs["envs"] = envs
+
+ def combine_params(key, docker_config, kwargs):
+ v = docker_config.get(key, []) + kwargs.get(key, [])
+ if v:
+ kwargs[key] = v
+ return kwargs
+
+ # Add the lists of ports and volumes unintelligently - meaning just add the
+ # lists together with no deduping.
+ kwargs = combine_params("ports", docker_config, kwargs)
+ kwargs = combine_params("volumes", docker_config, kwargs)
+
+
+ return kwargs
+
+def _create_and_start_component(**kwargs):
+ """Create and start component (container)"""
+ image = kwargs["image"]
+ service_component_name = kwargs[SERVICE_COMPONENT_NAME]
+ # Need to be picky and manually select out pieces because just using kwargs
+ # which contains everything confused the execution of
+ # _create_and_start_container because duplicate variables exist
+ sub_kwargs = {
+ "volumes": kwargs.get("volumes", []),
+ "ports": kwargs.get("ports", None),
+ "envs": kwargs.get("envs", {}),
+ "log_info": kwargs.get("log_info", {}),
+ "labels": kwargs.get("labels", {})}
+ _create_and_start_container(service_component_name, image, **sub_kwargs)
+
+ # TODO: Use regular logging here
+ ctx.logger.info("Container started: {0}".format(service_component_name))
+
+ return kwargs
+
+def _verify_component(**kwargs):
+ """Verify component (container) is healthy"""
+ service_component_name = kwargs[SERVICE_COMPONENT_NAME]
+ # TODO: "Consul doesn't make its first health check immediately upon registration.
+ # Instead it waits for the health check interval to pass."
+ # Possible enhancement is to read the interval (and possibly the timeout) from
+ # docker_config and multiply that by a number to come up with a more suitable
+ # max_wait.
+
+ max_wait = kwargs.get("max_wait", 300)
+
+ # Verify that the container is healthy
+
+ if _verify_container(service_component_name, max_wait):
+ service_component_name = kwargs[SERVICE_COMPONENT_NAME]
+
+ # TODO: Use regular logging here
+ ctx.logger.info("Container is healthy: {0}".format(service_component_name))
+
+ return kwargs
+
+def _done_for_start(**kwargs):
+ ctx.instance.runtime_properties.update(kwargs)
+ ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
+ return kwargs
+
+def _setup_msb_registration(service_name, msb_reg):
+ return {
+ "serviceName" : service_name,
+ "port" : msb_reg.get("port", "80"),
+ "version" : msb_reg.get("version", "v1"),
+ "url" : msb_reg.get("url_path", "/v1"),
+ "protocol" : "REST",
+ "enable_ssl" : msb_reg.get("uses_ssl", False),
+ "visualRange" : "1"
+}
+
+@wrap_error_handling_start
+@merge_inputs_for_start
+@monkeypatch_loggers
+@operation
+def create_and_start_container_for_components(**start_inputs):
+ """Create Docker container and start for components
+
+ This operation method is to be used with the DockerContainerForComponents
+ node type. After launching the container, the plugin will verify with Consul
+ that the app is up and healthy before terminating.
+ """
+ _done_for_start(
+ **_verify_component(
+ **_create_and_start_component(
+ **_enhance_docker_params(
+ **_parse_cloudify_context(**start_inputs)))))
+
+
+def _update_delivery_url(**kwargs):
+ """Update the delivery url for data router subscribers"""
+ dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
+ if s["type"] == "data_router"]
+
+ if dr_subs:
+ service_component_name = kwargs[SERVICE_COMPONENT_NAME]
+ # TODO: Should NOT be setting up the delivery url with ip addresses
+ # because in the https case, this will not work because data router does
+ # a certificate validation using the fqdn.
+ subscriber_host = _lookup_service(service_component_name, with_port=True)
+
+ for dr_sub in dr_subs:
+ scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
+ if "route" not in dr_sub:
+ raise NonRecoverableError("'route' key missing from data router subscriber")
+ path = dr_sub["route"]
+ dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
+ scheme=scheme, host=subscriber_host, path=path)
+ kwargs[dr_sub["name"]] = dr_sub
+
+ return kwargs
+
+@wrap_error_handling_start
+@merge_inputs_for_start
+@monkeypatch_loggers
+@operation
+def create_and_start_container_for_components_with_streams(**start_inputs):
+ """Create Docker container and start for components that have streams
+
+ This operation method is to be used with the DockerContainerForComponents
+ node type. After launching the container, the plugin will verify with Consul
+ that the app is up and healthy before terminating.
+ """
+ _done_for_start(
+ **_update_delivery_url(
+ **_verify_component(
+ **_create_and_start_component(
+ **_enhance_docker_params(
+ **_parse_cloudify_context(**start_inputs))))))
+
+
+@wrap_error_handling_start
+@monkeypatch_loggers
+@operation
+def create_and_start_container_for_platforms(**kwargs):
+ """Create Docker container and start for platform services
+
+ This operation method is to be used with the ContainerizedPlatformComponent
+ node type.
+ """
+ # Capture node properties
+ image = ctx.node.properties["image"]
+ docker_config = ctx.node.properties.get("docker_config", {})
+ if "dns_name" in ctx.node.properties:
+ service_component_name = ctx.node.properties["dns_name"]
+ else:
+ service_component_name = ctx.node.properties["name"]
+
+
+ envs = kwargs.get("envs", {})
+ # NOTE: Healthchecks are optional until prepared to handle use cases that
+ # don't necessarily use http
+ envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
+ if "healthcheck" in docker_config else {}
+ envs.update(envs_healthcheck)
+ kwargs["envs"] = envs
+
+ # Set some labels for the Kubernetes pods
+ kwargs["labels"] = {
+ "cfydeployment" : ctx.deployment.id,
+ "cfynode": ctx.node.name,
+ "cfynodeinstance": ctx.instance.id
+ }
+
+ host_port = ctx.node.properties["host_port"]
+ container_port = ctx.node.properties["container_port"]
+
+ # Cloudify properties are all required and Cloudify complains that None
+ # is not a valid type for integer. Defaulting to 0 to indicate to not
+ # use this and not to set a specific port mapping in cases like service
+ # change handler.
+ if container_port != 0:
+ # Doing this because other nodes might want to use this property
+ port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
+ ports = kwargs.get("ports", []) + [ port_mapping ]
+ kwargs["ports"] = ports
+ if "ports" not in kwargs:
+ ctx.logger.warn("No port mappings defined. Will randomly assign port.")
+
+ # All of the new node properties could be handled more DRYly!
+ # If a registration to MSB is required, then set up the registration info
+ if "msb_registration" in ctx.node.properties and "port" in ctx.node.properties["msb_registration"]:
+ kwargs["msb_list"] = [_setup_msb_registration(service_component_name, ctx.node.properties["msb_registration"])]
+
+ # If centralized logging via ELK is desired, then set up the logging info
+ if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
+ kwargs["log_info"] = ctx.node.properties["log_info"]
+
+ # Pick up replica count and always_pull_image flag
+ if "replicas" in ctx.node.properties:
+ kwargs["replicas"] = ctx.node.properties["replicas"]
+ if "always_pull_image" in ctx.node.properties:
+ kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
+ _create_and_start_container(service_component_name, image, **kwargs)
+
+ ctx.logger.info("Container started: {0}".format(service_component_name))
+
+ # Verify that the container is healthy
+
+ max_wait = kwargs.get("max_wait", 300)
+
+ if _verify_container(service_component_name, max_wait):
+ ctx.logger.info("Container is healthy: {0}".format(service_component_name))
+
+
+@wrap_error_handling_start
+@monkeypatch_loggers
+@operation
+def create_and_start_container(**kwargs):
+ """Create Docker container and start"""
+ service_component_name = ctx.node.properties["name"]
+ ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
+
+ image = ctx.node.properties["image"]
+
+ _create_and_start_container(service_component_name, image,**kwargs)
+
+ ctx.logger.info("Component deployed: {0}".format(service_component_name))
+
+
+@monkeypatch_loggers
+@operation
+def stop_and_remove_container(**kwargs):
+ """Stop and remove Docker container"""
+ try:
+ deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
+ k8sclient.undeploy(deployment_description)
+
+ except Exception as e:
+ ctx.logger.error("Unexpected error while stopping container: {0}"
+ .format(str(e)))
+
+@monkeypatch_loggers
+@operation
+def scale(replicas, **kwargs):
+ """Change number of replicas in the deployment"""
+ if replicas > 0:
+ current_replicas = ctx.instance.runtime_properties["replicas"]
+ ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
+ try:
+ deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
+ k8sclient.scale(deployment_description, replicas)
+ ctx.instance.runtime_properties["replicas"] = replicas
+ except Exception as e:
+ ctx.logger.error ("Unexpected error while scaling {0}".format(str(e)))
+ else:
+ ctx.logger.info("Ignoring request to scale to zero replicas")
+
+@monkeypatch_loggers
+@Policies.cleanup_policies_on_node
+@operation
+def cleanup_discovery(**kwargs):
+ """Delete configuration from Consul"""
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
+
+ try:
+ conn = dis.create_kv_conn(CONSUL_HOST)
+ dis.remove_service_component_config(conn, service_component_name)
+ except dis.DiscoveryConnectionError as e:
+ raise RecoverableError(e)
+
+
+def _notify_container(**kwargs):
+ """Notify container using the policy section in the docker_config"""
+ dc = kwargs["docker_config"]
+
+ if "policy" in dc:
+ if dc["policy"]["trigger_type"] == "docker":
+ pass
+ """
+ Need replacement for this in kubernetes.
+ Need to find all the pods that have been deployed
+ and execute the script in them.
+ Kubernetes does not appear to have a way to ask for a script
+ to be executed in all of the currently running pods for a
+ Kubernetes Deployment or ReplicaSet. We will have to find
+ each of them and run the script. The problem is that set of
+ pods could be changing. We can query to get all the pods, but
+ there's no guarantee the list won't change while we're trying to
+ execute the script.
+
+ In ONAP R2, all of the policy-driven components rely on polling.
+ """
+ """
+ # REVIEW: Need to finalize on the docker config policy data structure
+ script_path = dc["policy"]["script_path"]
+ updated_policies = kwargs["updated_policies"]
+ removed_policies = kwargs["removed_policies"]
+ policies = kwargs["policies"]
+ cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
+ msg_type="policies",
+ updated_policies=updated_policies,
+ removed_policies=removed_policies,
+ policies=policies
+ )
+
+ docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
+ docker_host_ip = _lookup_service(docker_host)
+ logins = _get_docker_logins()
+ client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
+
+ container_id = kwargs["container_id"]
+
+ doc.notify_for_policy_update(client, container_id, cmd)
+ """
+ # else the default is no trigger
+
+ return kwargs
+
+
+@monkeypatch_loggers
+@Policies.update_policies_on_node()
+@operation
+def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
+ """Policy update task
+
+ This method is responsible for updating the application configuration and
+ notifying the applications that the change has occurred. This is to be used
+ for the dcae.interfaces.policy.policy_update operation.
+
+ :updated_policies: contains the list of changed policy-configs when configs_only=True
+ (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
+ """
+ update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
+ update_inputs["updated_policies"] = updated_policies
+ update_inputs["removed_policies"] = removed_policies
+ update_inputs["policies"] = policies
+
+ _notify_container(**update_inputs)