summaryrefslogtreecommitdiffstats
path: root/k8s/k8splugin
diff options
context:
space:
mode:
Diffstat (limited to 'k8s/k8splugin')
-rw-r--r--k8s/k8splugin/__init__.py2
-rw-r--r--k8s/k8splugin/decorators.py14
-rw-r--r--k8s/k8splugin/tasks.py93
3 files changed, 74 insertions, 35 deletions
diff --git a/k8s/k8splugin/__init__.py b/k8s/k8splugin/__init__.py
index 28306ee..7f721b2 100644
--- a/k8s/k8splugin/__init__.py
+++ b/k8s/k8splugin/__init__.py
@@ -27,4 +27,4 @@ from .tasks import create_for_components, create_for_components_with_streams, \
create_and_start_container_for_components_with_streams, \
create_for_platforms, create_and_start_container, \
create_and_start_container_for_components, create_and_start_container_for_platforms, \
- stop_and_remove_container, cleanup_discovery, policy_update, scale \ No newline at end of file
+ stop_and_remove_container, cleanup_discovery, policy_update, scale, update_image \ No newline at end of file
diff --git a/k8s/k8splugin/decorators.py b/k8s/k8splugin/decorators.py
index 2edcc0d..59d14d8 100644
--- a/k8s/k8splugin/decorators.py
+++ b/k8s/k8splugin/decorators.py
@@ -98,3 +98,17 @@ def merge_inputs_for_start(task_start_func):
ctx.instance.runtime_properties, **kwargs)
return wrapper
+
+def wrap_error_handling_update(update_func):
+ """ Wrap error handling for update operations (scale and upgrade) """
+
+ def wrapper(**kwargs):
+ try:
+ return update_func(**kwargs)
+ except DockerPluginDeploymentError:
+ raise NonRecoverableError ("Update operation did not complete successfully in the alloted time")
+ except Exception as e:
+ ctx.logger.error ("Unexpected error during update operation: {0}".format(str(e)))
+ raise NonRecoverableError(e)
+
+ return wrapper \ No newline at end of file
diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py
index 50087fb..4205122 100644
--- a/k8s/k8splugin/tasks.py
+++ b/k8s/k8splugin/tasks.py
@@ -30,11 +30,11 @@ from cloudify.exceptions import NonRecoverableError, RecoverableError
from onap_dcae_dcaepolicy_lib import Policies
from k8splugin import discovery as dis
from k8splugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
- merge_inputs_for_start, merge_inputs_for_create
+ merge_inputs_for_start, merge_inputs_for_create, wrap_error_handling_update
from k8splugin.exceptions import DockerPluginDeploymentError
from k8splugin import utils
from configure import configure
-from k8sclient import k8sclient
+import k8sclient
# Get configuration
plugin_conf = configure.configure()
@@ -245,7 +245,7 @@ def _verify_container(service_component_name, max_wait):
will be raised.
"""
num_attempts = 1
-
+
while True:
if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
return True
@@ -256,7 +256,7 @@ def _verify_container(service_component_name, max_wait):
raise DockerPluginDeploymentError("Container never became healthy")
time.sleep(1)
-
+
return True
def _create_and_start_container(container_name, image, **kwargs):
@@ -266,7 +266,7 @@ def _create_and_start_container(container_name, image, **kwargs):
We're not exposing k8s to the component developer and the blueprint author.
This is a conscious choice. We want to use k8s in a controlled, consistent way, and we want to hide
the details from the component developer and the blueprint author.)
-
+
kwargs may have:
- volumes: array of volume objects, where a volume object is:
{"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw_or_ro"}
@@ -287,21 +287,21 @@ def _create_and_start_container(container_name, image, **kwargs):
ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs))
ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf))
replicas = kwargs.get("replicas", 1)
- _,dep = k8sclient.deploy(DCAE_NAMESPACE,
- container_name,
+ _,dep = k8sclient.deploy(DCAE_NAMESPACE,
+ container_name,
image,
- replicas = replicas,
+ replicas = replicas,
always_pull=kwargs.get("always_pull_image", False),
k8sconfig=plugin_conf,
- volumes=kwargs.get("volumes",[]),
+ volumes=kwargs.get("volumes",[]),
ports=kwargs.get("ports",[]),
- msb_list=kwargs.get("msb_list"),
+ msb_list=kwargs.get("msb_list"),
env = env,
labels = kwargs.get("labels", {}),
log_info=kwargs.get("log_info"),
readiness=kwargs.get("readiness"))
- # Capture the result of deployment for future use
+ # Capture the result of deployment for future use
ctx.instance.runtime_properties["k8s_deployment"] = dep
ctx.instance.runtime_properties["replicas"] = replicas
ctx.logger.info ("Deployment complete: {0}".format(dep))
@@ -337,11 +337,11 @@ def _enhance_docker_params(**kwargs):
Set up Docker environment variables and readiness check info
and inject into kwargs.
'''
-
+
# Get info for setting up readiness probe, if present
docker_config = kwargs.get("docker_config", {})
if "healthcheck" in docker_config:
- kwargs["readiness"] = docker_config["healthcheck"]
+ kwargs["readiness"] = docker_config["healthcheck"]
envs = kwargs.get("envs", {})
@@ -364,7 +364,7 @@ def _enhance_docker_params(**kwargs):
# lists together with no deduping.
kwargs = combine_params("ports", docker_config, kwargs)
kwargs = combine_params("volumes", docker_config, kwargs)
-
+
return kwargs
@@ -375,15 +375,15 @@ def _create_and_start_component(**kwargs):
# Need to be picky and manually select out pieces because just using kwargs
# which contains everything confused the execution of
# _create_and_start_container because duplicate variables exist
- sub_kwargs = {
+ sub_kwargs = {
"volumes": kwargs.get("volumes", []),
"ports": kwargs.get("ports", None),
- "envs": kwargs.get("envs", {}),
+ "envs": kwargs.get("envs", {}),
"log_info": kwargs.get("log_info", {}),
"labels": kwargs.get("labels", {}),
"readiness": kwargs.get("readiness",{})}
_create_and_start_container(service_component_name, image, **sub_kwargs)
-
+
# TODO: Use regular logging here
ctx.logger.info("Container started: {0}".format(service_component_name))
@@ -392,12 +392,7 @@ def _create_and_start_component(**kwargs):
def _verify_component(**kwargs):
"""Verify component (container) is healthy"""
service_component_name = kwargs[SERVICE_COMPONENT_NAME]
- # TODO: "Consul doesn't make its first health check immediately upon registration.
- # Instead it waits for the health check interval to pass."
- # Possible enhancement is to read the interval (and possibly the timeout) from
- # docker_config and multiply that by a number to come up with a more suitable
- # max_wait.
-
+
max_wait = kwargs.get("max_wait", 300)
# Verify that the container is healthy
@@ -407,7 +402,7 @@ def _verify_component(**kwargs):
# TODO: Use regular logging here
ctx.logger.info("Container is healthy: {0}".format(service_component_name))
-
+
return kwargs
def _done_for_start(**kwargs):
@@ -499,7 +494,7 @@ def create_and_start_container_for_platforms(**kwargs):
image = ctx.node.properties["image"]
docker_config = ctx.node.properties.get("docker_config", {})
if "healthcheck" in docker_config:
- kwargs["readiness"] = docker_config["healthcheck"]
+ kwargs["readiness"] = docker_config["healthcheck"]
if "dns_name" in ctx.node.properties:
service_component_name = ctx.node.properties["dns_name"]
else:
@@ -564,7 +559,7 @@ def create_and_start_container(**kwargs):
image = ctx.node.properties["image"]
_create_and_start_container(service_component_name, image,**kwargs)
-
+
ctx.logger.info("Component deployed: {0}".format(service_component_name))
@@ -580,6 +575,7 @@ def stop_and_remove_container(**kwargs):
ctx.logger.error("Unexpected error while stopping container: {0}"
.format(str(e)))
+@wrap_error_handling_update
@monkeypatch_loggers
@operation
def scale(replicas, **kwargs):
@@ -587,15 +583,44 @@ def scale(replicas, **kwargs):
if replicas > 0:
current_replicas = ctx.instance.runtime_properties["replicas"]
ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas))
- try:
- deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
- k8sclient.scale(deployment_description, replicas)
- ctx.instance.runtime_properties["replicas"] = replicas
- except Exception as e:
- ctx.logger.error ("Unexpected error while scaling {0}".format(str(e)))
+ deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
+ k8sclient.scale(deployment_description, replicas)
+ ctx.instance.runtime_properties["replicas"] = replicas
+
+ # Verify that the scaling took place as expected
+ max_wait = kwargs.get("max_wait", 300)
+ service_component_name = ctx.instance.runtime_properties["service_component_name"]
+ if _verify_container(service_component_name, max_wait):
+ ctx.logger.info("Scaling complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_replicas, replicas))
+
else:
ctx.logger.info("Ignoring request to scale to zero replicas")
-
+
+@wrap_error_handling_update
+@monkeypatch_loggers
+@operation
+def update_image(image, **kwargs):
+ if image:
+ current_image = ctx.instance.runtime_properties["image"]
+ ctx.logger.info("Updating application container image from {0} to {1}".format(current_image, image))
+ deployment_description = ctx.instance.runtime_properties["k8s_deployment"]
+ k8sclient.upgrade(deployment_description, image)
+ ctx.instance.runtime_properties["image"] = image
+
+ # Verify that the update took place as expected
+ max_wait = kwargs.get("max_wait", 300)
+ service_component_name = ctx.instance.runtime_properties["service_component_name"]
+ if _verify_container(service_component_name, max_wait):
+ ctx.logger.info("Update complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_image, image))
+
+ else:
+ ctx.logger.info("Ignoring update_image request with unusable image '{0}'".format(str(image)))
+
+#TODO: implement rollback operation when kubernetes python client fix is available.
+# (See comments in k8sclient.py.)
+# In the meantime, it's possible to undo an update_image operation by doing a second
+# update_image that specifies the older image.
+
@monkeypatch_loggers
@Policies.cleanup_policies_on_node
@operation
@@ -628,7 +653,7 @@ def _notify_container(**kwargs):
pods could be changing. We can query to get all the pods, but
there's no guarantee the list won't change while we're trying to
execute the script.
-
+
In ONAP R2, all of the policy-driven components rely on polling.
"""
"""