summaryrefslogtreecommitdiffstats
path: root/k8s/k8splugin/tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'k8s/k8splugin/tasks.py')
-rw-r--r--k8s/k8splugin/tasks.py165
1 files changed, 74 insertions, 91 deletions
diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py
index 4205122..c9df9f4 100644
--- a/k8s/k8splugin/tasks.py
+++ b/k8s/k8splugin/tasks.py
@@ -24,6 +24,7 @@
import cloudify_importer
import time, copy
+import json
from cloudify import ctx
from cloudify.decorators import operation
from cloudify.exceptions import NonRecoverableError, RecoverableError
@@ -103,7 +104,7 @@ def _done_for_create(**kwargs):
@Policies.gather_policies_to_node()
@operation
def create_for_components(**create_inputs):
- """Create step for Docker containers that are components
+ """Create step for service components
This interface is responsible for:
@@ -185,7 +186,7 @@ def _setup_for_discovery_streams(**kwargs):
@Policies.gather_policies_to_node()
@operation
def create_for_components_with_streams(**create_inputs):
- """Create step for Docker containers that are components that use DMaaP
+ """Create step for service components that use DMaaP
This interface is responsible for:
@@ -206,7 +207,7 @@ def create_for_components_with_streams(**create_inputs):
@monkeypatch_loggers
@operation
def create_for_platforms(**create_inputs):
- """Create step for Docker containers that are platform components
+ """Create step for platform components
This interface is responible for:
@@ -231,8 +232,8 @@ def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
return results[0]["ServiceAddress"]
-def _verify_container(service_component_name, max_wait):
- """Verify that the container is healthy
+def _verify_k8s_deployment(service_component_name, max_wait):
+ """Verify that the k8s Deployment is ready
Args:
-----
@@ -241,7 +242,7 @@ def _verify_container(service_component_name, max_wait):
Return:
-------
- True if component is healthy else a DockerPluginDeploymentError exception
+ True if deployment is ready else a DockerPluginDeploymentError exception
will be raised.
"""
num_attempts = 1
@@ -253,7 +254,7 @@ def _verify_container(service_component_name, max_wait):
num_attempts += 1
if max_wait > 0 and max_wait < num_attempts:
- raise DockerPluginDeploymentError("Container never became healthy")
+ raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name))
time.sleep(1)
@@ -284,7 +285,7 @@ def _create_and_start_container(container_name, image, **kwargs):
env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
"CONFIG_BINDING_SERVICE": "config-binding-service" }
env.update(kwargs.get("envs", {}))
- ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
+ ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
replicas = kwargs.get("replicas", 1)
_,dep = k8sclient.deploy(DCAE_NAMESPACE,
@@ -304,7 +305,7 @@ def _create_and_start_container(container_name, image, **kwargs):
# Capture the result of deployment for future use
ctx.instance.runtime_properties["k8s_deployment"] = dep
ctx.instance.runtime_properties["replicas"] = replicas
- ctx.logger.info ("Deployment complete: {0}".format(dep))
+ ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep))
def _parse_cloudify_context(**kwargs):
"""Parse Cloudify context
@@ -384,24 +385,17 @@ def _create_and_start_component(**kwargs):
"readiness": kwargs.get("readiness",{})}
_create_and_start_container(service_component_name, image, **sub_kwargs)
- # TODO: Use regular logging here
- ctx.logger.info("Container started: {0}".format(service_component_name))
-
return kwargs
def _verify_component(**kwargs):
- """Verify component (container) is healthy"""
+ """Verify deployment is ready"""
service_component_name = kwargs[SERVICE_COMPONENT_NAME]
max_wait = kwargs.get("max_wait", 300)
+ ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
- # Verify that the container is healthy
-
- if _verify_container(service_component_name, max_wait):
- service_component_name = kwargs[SERVICE_COMPONENT_NAME]
-
- # TODO: Use regular logging here
- ctx.logger.info("Container is healthy: {0}".format(service_component_name))
+ if _verify_k8s_deployment(service_component_name, max_wait):
+ ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
return kwargs
@@ -426,11 +420,11 @@ def _setup_msb_registration(service_name, msb_reg):
@monkeypatch_loggers
@operation
def create_and_start_container_for_components(**start_inputs):
- """Create Docker container and start for components
+ """Initiate Kubernetes deployment for service components
- This operation method is to be used with the DockerContainerForComponents
- node type. After launching the container, the plugin will verify with Consul
- that the app is up and healthy before terminating.
+ This operation method is to be used with the ContainerizedServiceComponent
+ node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes
+ that the app is up and responding successfully to readiness probes.
"""
_done_for_start(
**_verify_component(
@@ -467,11 +461,11 @@ def _update_delivery_url(**kwargs):
@monkeypatch_loggers
@operation
def create_and_start_container_for_components_with_streams(**start_inputs):
- """Create Docker container and start for components that have streams
+ """Initiate Kubernetes deployment for service components that have streams
- This operation method is to be used with the DockerContainerForComponents
- node type. After launching the container, the plugin will verify with Consul
- that the app is up and healthy before terminating.
+ This operation method is to be used with the ContainerizedServiceComponentUsingDmaap
+ node type. After initiating the Kubernetes deployment, the plugin will verify with
+ Kubernetes that the app is up and responding successfully to readiness probes.
"""
_done_for_start(
**_update_delivery_url(
@@ -485,7 +479,7 @@ def create_and_start_container_for_components_with_streams(**start_inputs):
@monkeypatch_loggers
@operation
def create_and_start_container_for_platforms(**kwargs):
- """Create Docker container and start for platform services
+ """Initiate Kubernetes deployment for platform components
This operation method is to be used with the ContainerizedPlatformComponent
node type.
@@ -538,21 +532,20 @@ def create_and_start_container_for_platforms(**kwargs):
kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
_create_and_start_container(service_component_name, image, **kwargs)
- ctx.logger.info("Container started: {0}".format(service_component_name))
-
- # Verify that the container is healthy
+ # Verify that the k8s deployment is ready
max_wait = kwargs.get("max_wait", 300)
+ ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
- if _verify_container(service_component_name, max_wait):
- ctx.logger.info("Container is healthy: {0}".format(service_component_name))
+ if _verify_k8s_deployment(service_component_name, max_wait):
+ ctx.logger.info("k8s deployment ready for: {0}".format(service_component_name))
@wrap_error_handling_start
@monkeypatch_loggers
@operation
def create_and_start_container(**kwargs):
- """Create Docker container and start"""
+ """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type"""
service_component_name = ctx.node.properties["name"]
ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
@@ -560,19 +553,16 @@ def create_and_start_container(**kwargs):
_create_and_start_container(service_component_name, image,**kwargs)
- ctx.logger.info("Component deployed: {0}".format(service_component_name))
-
-
@monkeypatch_loggers
@operation
def stop_and_remove_container(**kwargs):
- """Stop and remove Docker container"""
+ """Delete Kubernetes deployment"""
try:
deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
k8sclient.undeploy(deployment_description)
except Exception as e:
- ctx.logger.error("Unexpected error while stopping container: {0}"
+ ctx.logger.error("Unexpected error while deleting k8s deployment: {0}"
.format(str(e)))
@wrap_error_handling_update
@@ -580,41 +570,46 @@ def stop_and_remove_container(**kwargs):
@operation
def scale(replicas, **kwargs):
"""Change number of replicas in the deployment"""
+ service_component_name = ctx.instance.runtime_properties["service_component_name"]
+
if replicas > 0:
current_replicas = ctx.instance.runtime_properties["replicas"]
- ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
+ ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
k8sclient.scale(deployment_description, replicas)
ctx.instance.runtime_properties["replicas"] = replicas
# Verify that the scaling took place as expected
max_wait = kwargs.get("max_wait", 300)
- service_component_name = ctx.instance.runtime_properties["service_component_name"]
- if _verify_container(service_component_name, max_wait):
- ctx.logger.info("Scaling complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_replicas, replicas))
+ ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
+ if _verify_k8s_deployment(service_component_name, max_wait):
+ ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
else:
- ctx.logger.info("Ignoring request to scale to zero replicas")
+ ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name))
@wrap_error_handling_update
@monkeypatch_loggers
@operation
def update_image(image, **kwargs):
+ """ Restart component with a new Docker image """
+
+ service_component_name = ctx.instance.runtime_properties["service_component_name"]
if image:
current_image = ctx.instance.runtime_properties["image"]
- ctx.logger.info("Updating application container image from {0} to {1}".format(current_image, image))
+ ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image))
deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
k8sclient.upgrade(deployment_description, image)
ctx.instance.runtime_properties["image"] = image
# Verify that the update took place as expected
max_wait = kwargs.get("max_wait", 300)
- service_component_name = ctx.instance.runtime_properties["service_component_name"]
- if _verify_container(service_component_name, max_wait):
- ctx.logger.info("Update complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_image, image))
+ ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
+ if _verify_k8s_deployment(service_component_name, max_wait):
+ ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
else:
- ctx.logger.info("Ignoring update_image request with unusable image '{0}'".format(str(image)))
+ ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image)))
#TODO: implement rollback operation when kubernetes python client fix is available.
# (See comments in k8sclient.py.)
@@ -636,56 +631,40 @@ def cleanup_discovery(**kwargs):
def _notify_container(**kwargs):
- """Notify container using the policy section in the docker_config"""
+ """
+ Notify container using the policy section in the docker_config.
+ Notification consists of running a script in the application container
+ in each pod in the Kubernetes deployment associated with this node.
+ Return the list of notification results.
+ """
dc = kwargs["docker_config"]
+ resp = []
if "policy" in dc:
if dc["policy"]["trigger_type"] == "docker":
- pass
- """
- Need replacement for this in kubernetes.
- Need to find all the pods that have been deployed
- and execute the script in them.
- Kubernetes does not appear to have a way to ask for a script
- to be executed in all of the currently running pods for a
- Kubernetes Deployment or ReplicaSet. We will have to find
- each of them and run the script. The problem is that set of
- pods could be changing. We can query to get all the pods, but
- there's no guarantee the list won't change while we're trying to
- execute the script.
-
- In ONAP R2, all of the policy-driven components rely on polling.
- """
- """
- # REVIEW: Need to finalize on the docker config policy data structure
+
+ # Build the command to execute in the container
+ # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...}
script_path = dc["policy"]["script_path"]
- updated_policies = kwargs["updated_policies"]
- removed_policies = kwargs["removed_policies"]
- policies = kwargs["policies"]
- cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
- msg_type="policies",
- updated_policies=updated_policies,
- removed_policies=removed_policies,
- policies=policies
- )
-
- docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
- docker_host_ip = _lookup_service(docker_host)
- logins = _get_docker_logins()
- client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
-
- container_id = kwargs["container_id"]
-
- doc.notify_for_policy_update(client, container_id, cmd)
- """
- # else the default is no trigger
+ policy_data = {
+ "policies": kwargs["policies"],
+ "updated_policies": kwargs["updated_policies"],
+ "removed_policies": kwargs["removed_policies"]
+ }
- return kwargs
+ command = [script_path, "policies", json.dumps(policy_data)]
+
+ # Execute the command
+ deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
+ resp = k8sclient.execute_command_in_deployment(deployment_description, command)
+ # else the default is no trigger
+
+ return resp
+@operation
@monkeypatch_loggers
@Policies.update_policies_on_node()
-@operation
def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
"""Policy update task
@@ -696,9 +675,13 @@ def policy_update(updated_policies, removed_policies=None, policies=None, **kwar
:updated_policies: contains the list of changed policy-configs when configs_only=True
(default) Use configs_only=False to bring the full policy objects in :updated_policies:.
"""
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
+ ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}"
+ .format(service_component_name, updated_policies, removed_policies, policies))
update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
update_inputs["updated_policies"] = updated_policies
update_inputs["removed_policies"] = removed_policies
update_inputs["policies"] = policies
- _notify_container(**update_inputs)
+ resp = _notify_container(**update_inputs)
+ ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp)))