blob: 05728ca4ca2576fc0c26bc836310d349067bfdef (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import base64
from kubernetes import client, config
from onapsdk.configuration import settings
from onaptests.utils.exceptions import EnvironmentPreparationException
class KubernetesHelper:
"""Helper class to perform operations on kubernetes cluster"""
@classmethod
def load_config(cls):
"""Load kubernetes client configuration"""
if settings.IN_CLUSTER:
config.load_incluster_config()
else:
config.load_kube_config(config_file=settings.K8S_CONFIG)
@classmethod
def get_credentials_from_secret(cls,
secret_name: str,
login_key: str,
password_key: str,
namespace: str = settings.K8S_ONAP_NAMESPACE):
"""Resolve SDNC datbase credentials from k8s secret.
Args:
secret_name (str): name of the secret to load
login_key (str): key of the login in secret
password_key (str): key of the password in secret
namespace (str): k8s namespace to load key from
"""
cls.load_config()
api_instance = client.CoreV1Api()
try:
secret = api_instance.read_namespaced_secret(secret_name, namespace)
if secret.data:
if (login_key in secret.data and password_key in secret.data):
login_base64 = secret.data[login_key]
login = base64.b64decode(login_base64).decode("utf-8")
password_base64 = secret.data[password_key]
password = base64.b64decode(password_base64).decode("utf-8")
return login, password
raise EnvironmentPreparationException(
"Login key or password key not found in secret")
raise EnvironmentPreparationException("Secret data not found in secret")
except client.rest.ApiException as e:
raise EnvironmentPreparationException("Error accessing secret") from e
|