summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJack Lucas <jflucas@research.att.com>2018-07-25 20:33:50 +0000
committerJack Lucas <jflucas@research.att.com>2018-07-25 20:34:34 +0000
commit03ac66c04143f712378a92ef258253b8ce6b1e8b (patch)
tree3fd27e57217ece214b1e602cfd046cbacb099c9e
parent926eb8c21979a6972838dae8e9de29903d1b26f1 (diff)
Add policy update notification support
Use k8s terminology instead of Docker terminology Remove some redundant logging Change-Id: Ic8e581ebde9ea062a7d0e145465425eec1db57c3 Issue-ID: DCAEGEN2-504 Signed-off-by: Jack Lucas <jflucas@research.att.com>
-rw-r--r--k8s/k8s-node-type.yaml2
-rw-r--r--k8s/k8sclient/__init__.py2
-rw-r--r--k8s/k8sclient/k8sclient.py110
-rw-r--r--k8s/k8splugin/decorators.py8
-rw-r--r--k8s/k8splugin/discovery.py16
-rw-r--r--k8s/k8splugin/tasks.py165
-rw-r--r--k8s/pom.xml2
-rw-r--r--k8s/setup.py2
-rw-r--r--k8s/tests/test_tasks.py8
9 files changed, 201 insertions, 114 deletions
diff --git a/k8s/k8s-node-type.yaml b/k8s/k8s-node-type.yaml
index 4810f3a..7d64500 100644
--- a/k8s/k8s-node-type.yaml
+++ b/k8s/k8s-node-type.yaml
@@ -25,7 +25,7 @@ plugins:
k8s:
executor: 'central_deployment_agent'
package_name: k8splugin
- package_version: 1.3.0
+ package_version: 1.4.0
data_types:
diff --git a/k8s/k8sclient/__init__.py b/k8s/k8sclient/__init__.py
index 1ba4553..3cc19f2 100644
--- a/k8s/k8sclient/__init__.py
+++ b/k8s/k8sclient/__init__.py
@@ -17,4 +17,4 @@
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-from .k8sclient import deploy, undeploy, is_available, scale, upgrade, rollback \ No newline at end of file
+from .k8sclient import deploy, undeploy, is_available, scale, upgrade, rollback, execute_command_in_deployment \ No newline at end of file
diff --git a/k8s/k8sclient/k8sclient.py b/k8s/k8sclient/k8sclient.py
index c4ba67d..e388fb5 100644
--- a/k8s/k8sclient/k8sclient.py
+++ b/k8s/k8sclient/k8sclient.py
@@ -20,7 +20,7 @@
import os
import uuid
from msb import msb
-from kubernetes import config, client
+from kubernetes import config, client, stream
# Default values for readiness probe
PROBE_DEFAULT_PERIOD = 15
@@ -121,9 +121,14 @@ def _create_deployment_object(component_name,
containers,
replicas,
volumes,
- labels,
+ labels={},
pull_secrets=[]):
+ deployment_name = _create_deployment_name(component_name)
+
+ # Label the pod with the deployment name, so we can find it easily
+ labels.update({"k8sdeployment" : deployment_name})
+
# pull_secrets is a list of the names of the k8s secrets containing docker registry credentials
# See https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod
ips = []
@@ -148,7 +153,7 @@ def _create_deployment_object(component_name,
# Create deployment object
deployment = client.ExtensionsV1beta1Deployment(
kind="Deployment",
- metadata=client.V1ObjectMeta(name=_create_deployment_name(component_name)),
+ metadata=client.V1ObjectMeta(name=deployment_name),
spec=spec
)
@@ -229,6 +234,55 @@ def _patch_deployment(namespace, deployment, modify):
# Patch the deploy with updated spec
client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
+def _execute_command_in_pod(namespace, pod_name, command):
+ '''
+ Execute the command (specified by an argv-style list in the "command" parameter) in
+ the specified pod in the specified namespace. For now at least, we use this only to
+ run a notification script in a pod after a configuration change.
+
+ The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
+ Unfortunately due to a bug/limitation in the Python client library, we can't call it directly.
+ We have to make the API call through a Websocket connection using the kubernetes.stream wrapper API.
+ I'm following the example code at https://github.com/kubernetes-client/python/blob/master/examples/exec.py.
+ There are several issues tracking this, in various states. It isn't clear that there will ever
+ be a fix.
+ - https://github.com/kubernetes-client/python/issues/58
+ - https://github.com/kubernetes-client/python/issues/409
+ - https://github.com/kubernetes-client/python/issues/526
+
+ The main consequence of the workaround using "stream" is that the caller does not get an indication
+ of the exit code returned by the command when it completes execution. It turns out that the
+ original implementation of notification in the Docker plugin did not use this result, so we can
+ still match the original notification functionality.
+
+ The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
+ We'll return that so it can logged.
+ '''
+ _configure_api()
+ try:
+ output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
+ name=pod_name,
+ namespace=namespace,
+ command=command,
+ stdout=True,
+ stderr=True,
+ stdin=False,
+ tty=False)
+ except client.rest.ApiException as e:
+ # If the exception indicates the pod wasn't found, it's not a fatal error.
+ # It existed when we enumerated the pods for the deployment but no longer exists.
+ # Unfortunately, the only way to distinguish a pod not found from any other error
+ # is by looking at the reason text.
+ # (The ApiException's "status" field should contain the HTTP status code, which would
+ # be 404 if the pod isn't found, but empirical testing reveals that "status" is set
+ # to zero.)
+ if "404 not found" in e.reason.lower():
+ output = "Pod not found"
+ else:
+ raise e
+
+ return {"pod" : pod_name, "output" : output}
+
def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
'''
This will create a k8s Deployment and, if needed, one or two k8s Services.
@@ -441,4 +495,52 @@ def rollback(deployment_description, rollback_to=0):
# Read back the spec for the rolled-back deployment
spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
- return spec.spec.template.spec.containers[0].image, spec.spec.replicas \ No newline at end of file
+ return spec.spec.template.spec.containers[0].image, spec.spec.replicas
+
+def execute_command_in_deployment(deployment_description, command):
+ '''
+ Enumerates the pods in the k8s deployment identified by "deployment_description",
+ then executes the command (represented as an argv-style list) in "command" in
+ container 0 (the main application container) each of those pods.
+
+ Note that the sets of pods associated with a deployment can change over time. The
+ enumeration is a snapshot at one point in time. The command will not be executed in
+ pods that are created after the initial enumeration. If a pod disappears after the
+ initial enumeration and before the command is executed, the attempt to execute the
+ command will fail. This is not treated as a fatal error.
+
+ This approach is reasonable for the one current use case for "execute_command": running a
+ script to notify a container that its configuration has changed as a result of a
+ policy change. In this use case, the new configuration information is stored into
+ the configuration store (Consul), the pods are enumerated, and the command is executed.
+ If a pod disappears after the enumeration, the fact that the command cannot be run
+ doesn't matter--a nonexistent pod doesn't need to be reconfigured. Similarly, a pod that
+ comes up after the enumeration will get its initial configuration from the updated version
+ in Consul.
+
+ The optimal solution here would be for k8s to provide an API call to execute a command in
+ all of the pods for a deployment. Unfortunately, k8s does not provide such a call--the
+ only call provided by k8s operates at the pod level, not the deployment level.
+
+ Another interesting k8s factoid: there's no direct way to list the pods belong to a
+ particular k8s deployment. The deployment code above sets a label ("k8sdeployment") on
+ the pod that has the k8s deployment name. To list the pods, the code below queries for
+ pods with the label carrying the deployment name.
+ '''
+
+ _configure_api()
+ deployment = deployment_description["deployment"]
+ namespace = deployment_description["namespace"]
+
+ # Get names of all the running pods belonging to the deployment
+ pod_names = [pod.metadata.name for pod in client.CoreV1Api().list_namespaced_pod(
+ namespace = namespace,
+ label_selector = "k8sdeployment={0}".format(deployment),
+ field_selector = "status.phase=Running"
+ ).items]
+
+ def do_execute(pod_name):
+ return _execute_command_in_pod(namespace, pod_name, command)
+
+ # Execute command in the running pods
+ return map(do_execute, pod_names) \ No newline at end of file
diff --git a/k8s/k8splugin/decorators.py b/k8s/k8splugin/decorators.py
index 59d14d8..b9b32bf 100644
--- a/k8s/k8splugin/decorators.py
+++ b/k8s/k8splugin/decorators.py
@@ -19,12 +19,14 @@
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import copy
+
from cloudify import ctx
from cloudify.exceptions import NonRecoverableError, RecoverableError
+
from k8splugin import discovery as dis
-from k8splugin.exceptions import DockerPluginDeploymentError, \
- DockerPluginDependencyNotReadyError
from k8splugin import utils
+from k8splugin.exceptions import (DockerPluginDependencyNotReadyError,
+ DockerPluginDeploymentError)
def monkeypatch_loggers(task_func):
@@ -111,4 +113,4 @@ def wrap_error_handling_update(update_func):
ctx.logger.error ("Unexpected error during update operation: {0}".format(str(e)))
raise NonRecoverableError(e)
- return wrapper \ No newline at end of file
+ return wrapper
diff --git a/k8s/k8splugin/discovery.py b/k8s/k8splugin/discovery.py
index f3b87b6..56f8260 100644
--- a/k8s/k8splugin/discovery.py
+++ b/k8s/k8splugin/discovery.py
@@ -18,14 +18,14 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-from functools import partial
import json
import logging
-import uuid
-import requests
-import consul
import re
+import uuid
+from functools import partial
+import consul
+import requests
logger = logging.getLogger("discovery")
@@ -54,16 +54,16 @@ def _wrap_consul_call(consul_func, *args, **kwargs):
def generate_service_component_name(service_component_type):
"""Generate service component id used to pass into the service component
instance and used as the key to the service component configuration.
-
+
Updated for use with Kubernetes. Sometimes the service component name gets
used in Kubernetes in contexts (such as naming a Kubernetes Service) that
- requires the name to conform to the RFC1035 DNS "label" syntax:
+ requires the name to conform to the RFC1035 DNS "label" syntax:
-- starts with an alpha
-- contains only of alphanumerics and "-"
-- <= 63 characters long
Format:
- s<service component id>-<service component type>,
+ s<service component id>-<service component type>,
truncated to 63 characters, "_" replaced with "-" in service_component_type,
other non-conforming characters removed from service_component_type
"""
@@ -197,7 +197,7 @@ def is_healthy(consul_host, instance):
def add_to_entry(conn, key, add_name, add_value):
"""
- Find 'key' in consul.
+ Find 'key' in consul.
Treat its value as a JSON string representing a dict.
Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
Turn the resulting extended dict into a JSON string.
diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py
index 4205122..c9df9f4 100644
--- a/k8s/k8splugin/tasks.py
+++ b/k8s/k8splugin/tasks.py
@@ -24,6 +24,7 @@
import cloudify_importer
import time, copy
+import json
from cloudify import ctx
from cloudify.decorators import operation
from cloudify.exceptions import NonRecoverableError, RecoverableError
@@ -103,7 +104,7 @@ def _done_for_create(**kwargs):
@Policies.gather_policies_to_node()
@operation
def create_for_components(**create_inputs):
- """Create step for Docker containers that are components
+ """Create step for service components
This interface is responsible for:
@@ -185,7 +186,7 @@ def _setup_for_discovery_streams(**kwargs):
@Policies.gather_policies_to_node()
@operation
def create_for_components_with_streams(**create_inputs):
- """Create step for Docker containers that are components that use DMaaP
+ """Create step for service components that use DMaaP
This interface is responsible for:
@@ -206,7 +207,7 @@ def create_for_components_with_streams(**create_inputs):
@monkeypatch_loggers
@operation
def create_for_platforms(**create_inputs):
- """Create step for Docker containers that are platform components
+ """Create step for platform components
This interface is responible for:
@@ -231,8 +232,8 @@ def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
return results[0]["ServiceAddress"]
-def _verify_container(service_component_name, max_wait):
- """Verify that the container is healthy
+def _verify_k8s_deployment(service_component_name, max_wait):
+ """Verify that the k8s Deployment is ready
Args:
-----
@@ -241,7 +242,7 @@ def _verify_container(service_component_name, max_wait):
Return:
-------
- True if component is healthy else a DockerPluginDeploymentError exception
+ True if deployment is ready else a DockerPluginDeploymentError exception
will be raised.
"""
num_attempts = 1
@@ -253,7 +254,7 @@ def _verify_container(service_component_name, max_wait):
num_attempts += 1
if max_wait > 0 and max_wait < num_attempts:
- raise DockerPluginDeploymentError("Container never became healthy")
+ raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
time.sleep(1)
@@ -284,7 +285,7 @@ def _create_and_start_container(container_name, image, **kwargs):
env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
"CONFIG_BINDING_SERVICE": "config-binding-service" }
env.update(kwargs.get("envs", {}))
- ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
+ ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
replicas = kwargs.get("replicas", 1)
_,dep = k8sclient.deploy(DCAE_NAMESPACE,
@@ -304,7 +305,7 @@ def _create_and_start_container(container_name, image, **kwargs):
# Capture the result of deployment for future use
ctx.instance.runtime_properties["k8s_deployment"] = dep
ctx.instance.runtime_properties["replicas"] = replicas
- ctx.logger.info ("Deployment complete: {0}".format(dep))
+ ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
def _parse_cloudify_context(**kwargs):
"""Parse Cloudify context
@@ -384,24 +385,17 @@ def _create_and_start_component(**kwargs):
"readiness": kwargs.get("readiness",{})}
_create_and_start_container(service_component_name, image, **sub_kwargs)
- # TODO: Use regular logging here
- ctx.logger.info("Container started: {0}".format(service_component_name))
-
return kwargs
def _verify_component(**kwargs):
- """Verify component (container) is healthy"""
+ """Verify deployment is ready"""
service_component_name = kwargs[SERVICE_COMPONENT_NAME]
max_wait = kwargs.get("max_wait", 300)
+ ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
- # Verify that the container is healthy
-
- if _verify_container(service_component_name, max_wait):
- service_component_name = kwargs[SERVICE_COMPONENT_NAME]
-
- # TODO: Use regular logging here
- ctx.logger.info("Container is healthy: {0}".format(service_component_name))
+ if _verify_k8s_deployment(service_component_name, max_wait):
+ ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
return kwargs
@@ -426,11 +420,11 @@ def _setup_msb_registration(service_name, msb_reg):
@monkeypatch_loggers
@operation
def create_and_start_container_for_components(**start_inputs):
- """Create Docker container and start for components
+ """Initiate Kubernetes deployment for service components
- This operation method is to be used with the DockerContainerForComponents
- node type. After launching the container, the plugin will verify with Consul
- that the app is up and healthy before terminating.
+ This operation method is to be used with the ContainerizedServiceComponent
+ node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
+ that the app is up and responding successfully to readiness probes.
"""
_done_for_start(
**_verify_component(
@@ -467,11 +461,11 @@ def _update_delivery_url(**kwargs):
@monkeypatch_loggers
@operation
def create_and_start_container_for_components_with_streams(**start_inputs):
- """Create Docker container and start for components that have streams
+ """Initiate Kubernetes deployment for service components that have streams
- This operation method is to be used with the DockerContainerForComponents
- node type. After launching the container, the plugin will verify with Consul
- that the app is up and healthy before terminating.
+ This operation method is to be used with the ContainerizedServiceComponentUsingDmaap
+ node type. After initiating the Kubernetes deployment, the plugin will verify with
+ Kubernetes that the app is up and responding successfully to readiness probes.
"""
_done_for_start(
**_update_delivery_url(
@@ -485,7 +479,7 @@ def create_and_start_container_for_components_with_streams(**start_inputs):
@monkeypatch_loggers
@operation
def create_and_start_container_for_platforms(**kwargs):
- """Create Docker container and start for platform services
+ """Initiate Kubernetes deployment for platform components
This operation method is to be used with the ContainerizedPlatformComponent
node type.
@@ -538,21 +532,20 @@ def create_and_start_container_for_platforms(**kwargs):
kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
_create_and_start_container(service_component_name, image, **kwargs)
- ctx.logger.info("Container started: {0}".format(service_component_name))
-
- # Verify that the container is healthy
+ # Verify that the k8s deployment is ready
max_wait = kwargs.get("max_wait", 300)
+ ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
- if _verify_container(service_component_name, max_wait):
- ctx.logger.info("Container is healthy: {0}".format(service_component_name))
+ if _verify_k8s_deployment(service_component_name, max_wait):
+ ctx.logger.info("k8s deployment ready for: {0}".format(service_component_name))
@wrap_error_handling_start
@monkeypatch_loggers
@operation
def create_and_start_container(**kwargs):
- """Create Docker container and start"""
+ """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
service_component_name = ctx.node.properties["name"]
ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
@@ -560,19 +553,16 @@ def create_and_start_container(**kwargs):
_create_and_start_container(service_component_name, image,**kwargs)
- ctx.logger.info("Component deployed: {0}".format(service_component_name))
-
-
@monkeypatch_loggers
@operation
def stop_and_remove_container(**kwargs):
- """Stop and remove Docker container"""
+ """Delete Kubernetes deployment"""
try:
deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
k8sclient.undeploy(deployment_description)
except Exception as e:
- ctx.logger.error("Unexpected error while stopping container: {0}"
+ ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
.format(str(e)))
@wrap_error_handling_update
@@ -580,41 +570,46 @@ def stop_and_remove_container(**kwargs):
@operation
def scale(replicas, **kwargs):
"""Change number of replicas in the deployment"""
+ service_component_name = ctx.instance.runtime_properties["service_component_name"]
+
if replicas > 0:
current_replicas = ctx.instance.runtime_properties["replicas"]
- ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
+ ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
k8sclient.scale(deployment_description, replicas)
ctx.instance.runtime_properties["replicas"] = replicas
# Verify that the scaling took place as expected
max_wait = kwargs.get("max_wait", 300)
- service_component_name = ctx.instance.runtime_properties["service_component_name"]
- if _verify_container(service_component_name, max_wait):
- ctx.logger.info("Scaling complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_replicas, replicas))
+ ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
+ if _verify_k8s_deployment(service_component_name, max_wait):
+ ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
else:
- ctx.logger.info("Ignoring request to scale to zero replicas")
+ ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
@wrap_error_handling_update
@monkeypatch_loggers
@operation
def update_image(image, **kwargs):
+ """ Restart component with a new Docker image """
+
+ service_component_name = ctx.instance.runtime_properties["service_component_name"]
if image:
current_image = ctx.instance.runtime_properties["image"]
- ctx.logger.info("Updating application container image from {0} to {1}".format(current_image, image))
+ ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
k8sclient.upgrade(deployment_description, image)
ctx.instance.runtime_properties["image"] = image
# Verify that the update took place as expected
max_wait = kwargs.get("max_wait", 300)
- service_component_name = ctx.instance.runtime_properties["service_component_name"]
- if _verify_container(service_component_name, max_wait):
- ctx.logger.info("Update complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_image, image))
+ ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
+ if _verify_k8s_deployment(service_component_name, max_wait):
+ ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
else:
- ctx.logger.info("Ignoring update_image request with unusable image '{0}'".format(str(image)))
+ ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
#TODO: implement rollback operation when kubernetes python client fix is available.
# (See comments in k8sclient.py.)
@@ -636,56 +631,40 @@ def cleanup_discovery(**kwargs):
def _notify_container(**kwargs):
- """Notify container using the policy section in the docker_config"""
+ """
+ Notify container using the policy section in the docker_config.
+ Notification consists of running a script in the application container
+ in each pod in the Kubernetes deployment associated with this node.
+ Return the list of notification results.
+ """
dc = kwargs["docker_config"]
+ resp = []
if "policy" in dc:
if dc["policy"]["trigger_type"] == "docker":
- pass
- """
- Need replacement for this in kubernetes.
- Need to find all the pods that have been deployed
- and execute the script in them.
- Kubernetes does not appear to have a way to ask for a script
- to be executed in all of the currently running pods for a
- Kubernetes Deployment or ReplicaSet. We will have to find
- each of them and run the script. The problem is that set of
- pods could be changing. We can query to get all the pods, but
- there's no guarantee the list won't change while we're trying to
- execute the script.
-
- In ONAP R2, all of the policy-driven components rely on polling.
- """
- """
- # REVIEW: Need to finalize on the docker config policy data structure
+
+ # Build the command to execute in the container
+ # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
script_path = dc["policy"]["script_path"]
- updated_policies = kwargs["updated_policies"]
- removed_policies = kwargs["removed_policies"]
- policies = kwargs["policies"]
- cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
- msg_type="policies",
- updated_policies=updated_policies,
- removed_policies=removed_policies,
- policies=policies
- )
-
- docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
- docker_host_ip = _lookup_service(docker_host)
- logins = _get_docker_logins()
- client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
-
- container_id = kwargs["container_id"]
-
- doc.notify_for_policy_update(client, container_id, cmd)
- """
- # else the default is no trigger
+ policy_data = {
+ "policies": kwargs["policies"],
+ "updated_policies": kwargs["updated_policies"],
+ "removed_policies": kwargs["removed_policies"]
+ }
- return kwargs
+ command = [script_path, "policies", json.dumps(policy_data)]
+
+ # Execute the command
+ deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
+ resp = k8sclient.execute_command_in_deployment(deployment_description, command)
+ # else the default is no trigger
+
+ return resp
+@operation
@monkeypatch_loggers
@Policies.update_policies_on_node()
-@operation
def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
"""Policy update task
@@ -696,9 +675,13 @@ def policy_update(updated_policies, removed_policies=None, policies=None, **kwar
:updated_policies: contains the list of changed policy-configs when configs_only=True
(default) Use configs_only=False to bring the full policy objects in :updated_policies:.
"""
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
+ ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
+ .format(service_component_name, updated_policies, removed_policies, policies))
update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
update_inputs["updated_policies"] = updated_policies
update_inputs["removed_policies"] = removed_policies
update_inputs["policies"] = policies
- _notify_container(**update_inputs)
+ resp = _notify_container(**update_inputs)
+ ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))
diff --git a/k8s/pom.xml b/k8s/pom.xml
index bfaa3f5..cd5a8d2 100644
--- a/k8s/pom.xml
+++ b/k8s/pom.xml
@@ -28,7 +28,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
<groupId>org.onap.dcaegen2.platform.plugins</groupId>
<artifactId>k8s</artifactId>
<name>k8s-plugin</name>
- <version>1.3.0-SNAPSHOT</version>
+ <version>1.4.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/k8s/setup.py b/k8s/setup.py
index 190d603..7991584 100644
--- a/k8s/setup.py
+++ b/k8s/setup.py
@@ -23,7 +23,7 @@ from setuptools import setup
setup(
name='k8splugin',
description='Cloudify plugin for containerized components deployed using Kubernetes',
- version="1.3.0",
+ version="1.4.0",
author='J. F. Lucas, Michael Hwang, Tommy Carpenter',
packages=['k8splugin','k8sclient','msb','configure'],
zip_safe=False,
diff --git a/k8s/tests/test_tasks.py b/k8s/tests/test_tasks.py
index 077a940..69e866d 100644
--- a/k8s/tests/test_tasks.py
+++ b/k8s/tests/test_tasks.py
@@ -200,7 +200,7 @@ def test_verify_container(monkeypatch, mockconfig):
monkeypatch.setattr(k8sclient, "is_available",
fake_is_available_success)
- assert tasks._verify_container("some-name", 3)
+ assert tasks._verify_k8s_deployment("some-name", 3)
def fake_is_available_never_good(ch, scn):
return False
@@ -209,7 +209,7 @@ def test_verify_container(monkeypatch, mockconfig):
fake_is_available_never_good)
with pytest.raises(DockerPluginDeploymentError):
- tasks._verify_container("some-name", 2)
+ tasks._verify_k8s_deployment("some-name", 2)
def test_update_delivery_url(monkeypatch, mockconfig):
@@ -289,5 +289,5 @@ def test_enhance_docker_params(mockconfig):
def test_notify_container(mockconfig):
from k8splugin import tasks
- test_input = { "docker_config": { "trigger_type": "unknown" } }
- assert test_input == tasks._notify_container(**test_input) \ No newline at end of file
+ test_input = { "docker_config": { "policy": { "trigger_type": "unknown" } } }
+ assert [] == tasks._notify_container(**test_input) \ No newline at end of file