From a3f62fa65e34b2dd1130ea8dd647a98e0575a330 Mon Sep 17 00:00:00 2001 From: Jack Lucas Date: Mon, 11 Mar 2019 17:43:37 -0400 Subject: Add multi-cluster support Issue-ID: DCAEGEN2-1136 Change-Id: I314e5d8c501198b3e87c45813201498935c7bacc Signed-off-by: Jack Lucas --- k8s/k8sclient/k8sclient.py | 53 ++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 23 deletions(-) (limited to 'k8s/k8sclient/k8sclient.py') diff --git a/k8s/k8sclient/k8sclient.py b/k8s/k8sclient/k8sclient.py index d3417a7..ee4250d 100644 --- a/k8s/k8sclient/k8sclient.py +++ b/k8s/k8sclient/k8sclient.py @@ -27,6 +27,9 @@ from kubernetes import config, client, stream PROBE_DEFAULT_PERIOD = 15 PROBE_DEFAULT_TIMEOUT = 1 +# Location of k8s cluster config file ("kubeconfig") +K8S_CONFIG_PATH="/opt/onap/kubeconfig" + # Regular expression for interval/timeout specification INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") # Conversion factors to seconds @@ -48,11 +51,10 @@ def _create_service_name(component_name): def _create_exposed_service_name(component_name): return ("x{0}".format(component_name))[:63] -def _configure_api(): - # Look for a kubernetes config file in ~/.kube/config - kubepath = os.path.join(os.environ["HOME"], '.kube/config') - if os.path.exists(kubepath): - config.load_kube_config(kubepath) +def _configure_api(location=None): + # Look for a kubernetes config file + if os.path.exists(K8S_CONFIG_PATH): + config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) else: # Maybe we're running in a k8s container and we can use info provided by k8s # We would like to use: @@ -271,10 +273,10 @@ def _parse_volumes(volume_list): return volumes, volume_mounts -def _service_exists(namespace, component_name): +def _service_exists(location, namespace, component_name): exists = False try: - _configure_api() + _configure_api(location) client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) exists = True except client.rest.ApiException: @@ -282,13 +284,14 @@ def _service_exists(namespace, component_name): return exists -def _patch_deployment(namespace, deployment, modify): +def _patch_deployment(location, namespace, deployment, modify): ''' - Gets the current spec for 'deployment' in 'namespace', + Gets the current spec for 'deployment' in 'namespace' + in the k8s cluster at 'location', uses the 'modify' function to change the spec, then sends the updated spec to k8s. ''' - _configure_api() + _configure_api(location) # Get deployment spec spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace) @@ -299,10 +302,11 @@ def _patch_deployment(namespace, deployment, modify): # Patch the deploy with updated spec client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec) -def _execute_command_in_pod(namespace, pod_name, command): +def _execute_command_in_pod(location, namespace, pod_name, command): ''' Execute the command (specified by an argv-style list in the "command" parameter) in - the specified pod in the specified namespace. For now at least, we use this only to + the specified pod in the specified namespace at the specified location. + For now at least, we use this only to run a notification script in a pod after a configuration change. The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". @@ -323,7 +327,7 @@ def _execute_command_in_pod(namespace, pod_name, command): The "stream" approach returns a string containing any output sent by the command to stdout or stderr. We'll return that so it can logged. ''' - _configure_api() + _configure_api(location) try: output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, name=pod_name, @@ -399,6 +403,8 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r - timeout: time (in seconds) to allow a probe to complete - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types - path: the full path to the script to be executed in the container for "script" and "docker" types + - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed + ''' @@ -406,6 +412,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r cip_service_created = False deployment_description = { "namespace": namespace, + "location" : kwargs.get("k8s_location"), "deployment": '', "services": [] } @@ -413,7 +420,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r try: # Get API handles - _configure_api() + _configure_api(kwargs.get("k8s_location")) core = client.CoreV1Api() ext = client.ExtensionsV1beta1Api() @@ -523,7 +530,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r return dep, deployment_description def undeploy(deployment_description): - _configure_api() + _configure_api(deployment_description["location"]) namespace = deployment_description["namespace"] @@ -535,8 +542,8 @@ def undeploy(deployment_description): options = client.V1DeleteOptions(propagation_policy="Foreground") client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options) -def is_available(namespace, component_name): - _configure_api() +def is_available(location, namespace, component_name): + _configure_api(location) dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace) # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation @@ -549,7 +556,7 @@ def scale(deployment_description, replicas): spec.spec.replicas = replicas return spec - _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count) + _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count) def upgrade(deployment_description, image, container_index = 0): ''' Trigger a rolling upgrade by sending a new image name/tag to k8s ''' @@ -558,7 +565,7 @@ def upgrade(deployment_description, image, container_index = 0): spec.spec.template.spec.containers[container_index].image = image return spec - _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image) + _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image) def rollback(deployment_description, rollback_to=0): ''' @@ -576,7 +583,7 @@ def rollback(deployment_description, rollback_to=0): - https://github.com/kubernetes/kubernetes/pull/63837 The fix has been merged into the master branch but is not in the latest release. ''' - _configure_api() + _configure_api(deployment_description["location"]) deployment = deployment_description["deployment"] namespace = deployment_description["namespace"] @@ -620,8 +627,8 @@ def execute_command_in_deployment(deployment_description, command): the pod that has the k8s deployment name. To list the pods, the code below queries for pods with the label carrying the deployment name. ''' - - _configure_api() + location = deployment_description["location"] + _configure_api(location) deployment = deployment_description["deployment"] namespace = deployment_description["namespace"] @@ -633,7 +640,7 @@ def execute_command_in_deployment(deployment_description, command): ).items] def do_execute(pod_name): - return _execute_command_in_pod(namespace, pod_name, command) + return _execute_command_in_pod(location, namespace, pod_name, command) # Execute command in the running pods return map(do_execute, pod_names) -- cgit 1.2.3-korg