summaryrefslogtreecommitdiffstats
path: root/k8s/k8splugin/tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'k8s/k8splugin/tasks.py')
-rw-r--r--k8s/k8splugin/tasks.py36
1 files changed, 28 insertions, 8 deletions
diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py
index 399bc9f..fdb00c5 100644
--- a/k8s/k8splugin/tasks.py
+++ b/k8s/k8splugin/tasks.py
@@ -43,6 +43,7 @@ CONSUL_HOST = plugin_conf.get("consul_host")
CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
DCAE_NAMESPACE = plugin_conf.get("namespace")
DEFAULT_MAX_WAIT = plugin_conf.get("max_wait", 1800)
+DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location")
# Used to construct delivery urls for data router subscribers. Data router in FTL
# requires https but this author believes that ONAP is to be defaulted to http.
@@ -54,6 +55,7 @@ CONTAINER_ID = "container_id"
APPLICATION_CONFIG = "application_config"
K8S_DEPLOYMENT = "k8s_deployment"
RESOURCE_KW = "resource_config"
+LOCATION_ID = "location_id"
# Utility methods
@@ -106,6 +108,11 @@ def _get_resources(**kwargs):
ctx.logger.info("set resources to None")
return None
+def _get_location():
+ ''' Get the k8s location property. Set to the default if the property is missing, None, or zero-length '''
+ return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \
+ else DEFAULT_K8S_LOCATION
+
@merge_inputs_for_create
@monkeypatch_loggers
@Policies.gather_policies_to_node()
@@ -238,11 +245,13 @@ def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
else:
return results[0]["ServiceAddress"]
-def _verify_k8s_deployment(service_component_name, max_wait):
+def _verify_k8s_deployment(location, service_component_name, max_wait):
"""Verify that the k8s Deployment is ready
Args:
-----
+ location (string): location of the k8s cluster where the component was deployed
+ service_component_name: component's service component name
max_wait (integer): limit to how may attempts to make which translates to
seconds because each sleep is one second. 0 means infinite.
@@ -253,7 +262,7 @@ def _verify_k8s_deployment(service_component_name, max_wait):
num_attempts = 1
while True:
- if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
+ if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name):
return True
else:
num_attempts += 1
@@ -287,6 +296,7 @@ def _create_and_start_container(container_name, image, **kwargs):
- replicas: number of replicas to be launched initially
- readiness: object with information needed to create a readiness check
- liveness: object with information needed to create a liveness check
+ - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
'''
env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
"CONFIG_BINDING_SERVICE": "config-binding-service" }
@@ -310,7 +320,8 @@ def _create_and_start_container(container_name, image, **kwargs):
labels = kwargs.get("labels", {}),
log_info=kwargs.get("log_info"),
readiness=kwargs.get("readiness"),
- liveness=kwargs.get("liveness"))
+ liveness=kwargs.get("liveness"),
+ k8s_location=kwargs.get("k8s_location"))
# Capture the result of deployment for future use
ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
@@ -333,7 +344,7 @@ def _parse_cloudify_context(**kwargs):
"cfynodeinstance": ctx.instance.id[:63]
}
- # Pick up the centralized logging info
+ # Pick up the centralized logging info
if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
kwargs["log_info"] = ctx.node.properties["log_info"]
@@ -347,6 +358,9 @@ def _parse_cloudify_context(**kwargs):
if "always_pull_image" in ctx.node.properties:
kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
+ # Pick up location
+ kwargs["k8s_location"] = _get_location()
+
return kwargs
def _enhance_docker_params(**kwargs):
@@ -402,7 +416,8 @@ def _create_and_start_component(**kwargs):
"labels": kwargs.get("labels", {}),
"resource_config": kwargs.get("resource_config",{}),
"readiness": kwargs.get("readiness",{}),
- "liveness": kwargs.get("liveness",{})}
+ "liveness": kwargs.get("liveness",{}),
+ "k8s_location": kwargs.get("k8s_location")}
returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs)
kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT]
@@ -415,7 +430,7 @@ def _verify_component(**kwargs):
max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
- if _verify_k8s_deployment(service_component_name, max_wait):
+ if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait):
ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
else:
# The component did not become ready within the "max_wait" interval.
@@ -569,6 +584,10 @@ def create_and_start_container_for_platforms(**kwargs):
kwargs["replicas"] = ctx.node.properties["replicas"]
if "always_pull_image" in ctx.node.properties:
kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
+
+ # Pick up location
+ kwargs["k8s_location"] = _get_location()
+
returned_args = _create_and_start_container(service_component_name, image, **kwargs)
# Verify that the k8s deployment is ready
@@ -586,6 +605,7 @@ def create_and_start_container(**kwargs):
ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
image = ctx.node.properties["image"]
+ kwargs["k8s_location"] = _get_location()
_create_and_start_container(service_component_name, image,**kwargs)
@@ -624,7 +644,7 @@ def scale(replicas, **kwargs):
# Verify that the scaling took place as expected
max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
- if _verify_k8s_deployment(service_component_name, max_wait):
+ if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
else:
@@ -647,7 +667,7 @@ def update_image(image, **kwargs):
# Verify that the update took place as expected
max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
- if _verify_k8s_deployment(service_component_name, max_wait):
+ if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
else: