aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/cloud
diff options
context:
space:
mode:
Diffstat (limited to 'src/onaptests/steps/cloud')
-rw-r--r--src/onaptests/steps/cloud/check_status.py90
-rw-r--r--src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py8
-rw-r--r--src/onaptests/steps/cloud/k8s_connectivity_info_create.py3
-rw-r--r--src/onaptests/steps/cloud/register_cloud.py2
-rw-r--r--src/onaptests/steps/cloud/resources.py5
5 files changed, 66 insertions, 42 deletions
diff --git a/src/onaptests/steps/cloud/check_status.py b/src/onaptests/steps/cloud/check_status.py
index 1fcb96b..2b5a8df 100644
--- a/src/onaptests/steps/cloud/check_status.py
+++ b/src/onaptests/steps/cloud/check_status.py
@@ -48,7 +48,7 @@ class CheckK8sResourcesStep(BaseStep):
self.all_resources = []
self.failing_resources = []
self.jinja_env = Environment(autoescape=select_autoescape(['html']),
- loader=PackageLoader('onaptests.templates','status'))
+ loader=PackageLoader('onaptests.templates', 'status'))
@property
def component(self) -> str:
@@ -69,7 +69,7 @@ class CheckK8sResourcesStep(BaseStep):
def _add_failing_resource(self, resource):
if (resource.labels and settings.EXCLUDED_LABELS
- and (resource.labels.keys() and settings.EXCLUDED_LABELS.keys())):
+ and (resource.labels.keys() and settings.EXCLUDED_LABELS.keys())):
for label in resource.labels.items():
for waived_label in settings.EXCLUDED_LABELS.items():
if label[0] in waived_label[0] and label[1] in waived_label[1]:
@@ -84,19 +84,24 @@ class CheckK8sResourcesStep(BaseStep):
try:
self._init_resources()
if len(self.k8s_resources) > 0:
- self.__logger.info("%4s %ss in the namespace", len(self.k8s_resources), self.resource_type)
+ self.__logger.info("%4s %ss in the namespace",
+ len(self.k8s_resources),
+ self.resource_type)
self._parse_resources()
- self.__logger.info("%4s %ss parsed, %s failing", len(self.all_resources), self.resource_type,
- len(self.failing_resources))
+ self.__logger.info("%4s %ss parsed, %s failing",
+ len(self.all_resources),
+ self.resource_type,
+ len(self.failing_resources))
except (ConnectionRefusedError, MaxRetryError, NewConnectionError):
self.__logger.error("Test of k8s %ss failed.", self.resource_type)
self.__logger.error("Cannot connect to Kubernetes.")
+
class CheckBasicK8sResourcesStep(CheckK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, resource_type: str, k8s_res_class, cleanup: bool = False,**kwargs):
+ def __init__(self, resource_type: str, k8s_res_class, cleanup: bool = False, **kwargs):
"""Init CheckBasicK8sResourcesStep."""
super().__init__(resource_type)
self.k8s_res_class = k8s_res_class
@@ -112,11 +117,12 @@ class CheckBasicK8sResourcesStep(CheckK8sResourcesStep):
def execute(self):
super().execute()
+
class CheckK8sConfigMapsStep(CheckBasicK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sConfigMapsStep."""
super().__init__("configmap", ConfigMap)
@@ -124,11 +130,12 @@ class CheckK8sConfigMapsStep(CheckBasicK8sResourcesStep):
super()._init_resources()
self.k8s_resources = self.core.list_namespaced_config_map(NAMESPACE).items
+
class CheckK8sSecretsStep(CheckBasicK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sSecretsStep."""
super().__init__("secret", Secret)
@@ -136,11 +143,12 @@ class CheckK8sSecretsStep(CheckBasicK8sResourcesStep):
super()._init_resources()
self.k8s_resources = self.core.list_namespaced_secret(NAMESPACE).items
+
class CheckK8sIngressesStep(CheckBasicK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sIngressesStep."""
super().__init__("ingress", Ingress)
@@ -148,11 +156,12 @@ class CheckK8sIngressesStep(CheckBasicK8sResourcesStep):
super()._init_resources()
self.k8s_resources = self.networking.list_namespaced_ingress(NAMESPACE).items
+
class CheckK8sPvcsStep(CheckK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sPvcsStep."""
super().__init__("pvc")
@@ -165,10 +174,10 @@ class CheckK8sPvcsStep(CheckK8sResourcesStep):
Return a list of Pods that were created to perform jobs.
"""
super()._parse_resources()
- jobs_pods = []
for k8s in self.k8s_resources:
pvc = Pvc(k8s=k8s)
- field_selector = f"involvedObject.name={pvc.name},involvedObject.kind=PersistentVolumeClaim"
+ field_selector = (f"involvedObject.name={pvc.name}, "
+ "involvedObject.kind=PersistentVolumeClaim")
pvc.events = self.core.list_namespaced_event(
NAMESPACE,
field_selector=field_selector).items
@@ -181,11 +190,12 @@ class CheckK8sPvcsStep(CheckK8sResourcesStep):
def execute(self):
super().execute()
+
class CheckK8sResourcesUsingPodsStep(CheckK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, resource_type: str, pods_source, cleanup: bool = False,**kwargs):
+ def __init__(self, resource_type: str, pods_source, cleanup: bool = False, **kwargs):
"""Init CheckK8sResourcesUsingPodsStep."""
super().__init__(resource_type)
self.pods_source = pods_source
@@ -219,11 +229,12 @@ class CheckK8sResourcesUsingPodsStep(CheckK8sResourcesStep):
def execute(self):
super().execute()
+
class CheckK8sJobsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sJobsStep."""
super().__init__("job", None)
@@ -256,19 +267,19 @@ class CheckK8sJobsStep(CheckK8sResourcesUsingPodsStep):
# timemout job
if not k8s.status.completion_time:
- if any(
- waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
+ if any(waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
self._add_failing_resource(job)
# completed job
if any(waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
self.all_resources.append(job)
jobs_pods += job_pods
+
class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sPodsStep."""
super().__init__("pod", pods)
@@ -276,7 +287,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
super()._init_resources()
self.k8s_resources = self.core.list_namespaced_pod(NAMESPACE).items
- def _parse_resources(self):
+ def _parse_resources(self): # noqa
"""Parse the pods."""
super()._parse_resources()
excluded_pods = self._get_used_pods()
@@ -295,9 +306,10 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
pod_component = k8s.metadata.labels[
'app.kubernetes.io/name']
else:
- self.__logger.error("pod %s has no 'app' or 'app.kubernetes.io/name' in metadata: %s", pod_component, k8s.metadata.labels)
+ self.__logger.error("pod %s has no 'app' or 'app.kubernetes.io/name' "
+ "in metadata: %s", pod_component, k8s.metadata.labels)
- ## looks for docker version
+ # looks for docker version
for container in k8s.spec.containers:
pod_version = {}
pod_container_version = container.image.rsplit(":", 1)
@@ -317,7 +329,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
search_rule = "^(?P<source>[^/]*)/*(?P<container>[^:]*):*(?P<version>.*)$"
search = re.search(search_rule, container.image)
name = "{}/{}".format(search.group('source'),
- search.group('container'))
+ search.group('container'))
version = search.group('version')
if name[-1] == '/':
name = name[0:-1]
@@ -416,7 +428,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
)
return logs
- def _parse_container(self, pod, k8s_container, init=False):
+ def _parse_container(self, pod, k8s_container, init=False): # noqa
"""Get the logs of a container."""
logs = ""
old_logs = ""
@@ -443,7 +455,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
logs = self._get_container_logs(pod=pod, container=container, full=False)
with open(
"{}/pod-{}-{}.log".format(self.res_dir,
- pod.name, container.name),
+ pod.name, container.name),
'w') as log_result:
log_result.write(logs)
if (not container.ready) and container.restart_count > 0:
@@ -451,15 +463,15 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
previous=True)
with open(
"{}/pod-{}-{}.old.log".format(self.res_dir,
- pod.name,
- container.name),
+ pod.name,
+ container.name),
'w') as log_result:
log_result.write(old_logs)
if (container.name in settings.FULL_LOGS_CONTAINERS):
logs = self._get_container_logs(pod=pod, container=container)
with open(
"{}/pod-{}-{}.log".format(self.res_dir,
- pod.name, container.name),
+ pod.name, container.name),
'w') as log_result:
log_result.write(logs)
if (container.name in settings.SPECIFIC_LOGS_CONTAINERS):
@@ -484,7 +496,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
log_result.write(log_files[log_file])
except client.rest.ApiException as exc:
self.__logger.warning("%scontainer %s of pod %s has an exception: %s",
- prefix, container.name, pod.name, exc.reason)
+ prefix, container.name, pod.name, exc.reason)
self.jinja_env.get_template('container_log.html.j2').stream(
container=container,
pod_name=pod.name,
@@ -501,11 +513,12 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
return 1
return 0
+
class CheckK8sServicesStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sServicesStep."""
super().__init__("service", pods)
@@ -527,11 +540,12 @@ class CheckK8sServicesStep(CheckK8sResourcesUsingPodsStep):
self.res_dir, service.name))
self.all_resources.append(service)
+
class CheckK8sDeploymentsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sDeploymentsStep."""
super().__init__("deployment", pods)
@@ -566,11 +580,12 @@ class CheckK8sDeploymentsStep(CheckK8sResourcesUsingPodsStep):
self.all_resources.append(deployment)
+
class CheckK8sResplicaSetsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sResplicaSetsStep."""
super().__init__("replicaset", pods)
@@ -601,17 +616,18 @@ class CheckK8sResplicaSetsStep(CheckK8sResourcesUsingPodsStep):
replicaset=replicaset).dump('{}/replicaset-{}.html'.format(
self.res_dir, replicaset.name))
- if (not k8s.status.ready_replicas
- or (k8s.status.ready_replicas < k8s.status.replicas)):
+ if (not k8s.status.ready_replicas or
+ (k8s.status.ready_replicas < k8s.status.replicas)):
self._add_failing_resource(replicaset)
self.all_resources.append(replicaset)
+
class CheckK8sStatefulSetsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sStatefulSetsStep."""
super().__init__("statefulset", pods)
@@ -648,11 +664,12 @@ class CheckK8sStatefulSetsStep(CheckK8sResourcesUsingPodsStep):
self.all_resources.append(statefulset)
+
class CheckK8sDaemonSetsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sDaemonSetsStep."""
super().__init__("daemonset", pods)
@@ -688,6 +705,7 @@ class CheckK8sDaemonSetsStep(CheckK8sResourcesUsingPodsStep):
self.all_resources.append(daemonset)
+
class CheckNamespaceStatusStep(CheckK8sResourcesStep):
"""Check status of all k8s resources in the selected namespace."""
@@ -777,7 +795,9 @@ class CheckNamespaceStatusStep(CheckK8sResourcesStep):
for step in self._steps:
if step.failing:
self.failing = True
- self.__logger.info("%s failing: %s", step.resource_type, len(step.failing_resources))
+ self.__logger.info("%s failing: %s",
+ step.resource_type,
+ len(step.failing_resources))
details[step.resource_type] = {
'number_all': len(step.all_resources),
'number_failing': len(step.failing_resources),
diff --git a/src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py b/src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py
index 97a7823..3bb08c7 100644
--- a/src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py
+++ b/src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py
@@ -52,8 +52,8 @@ class ConnectServiceSubToCloudRegionStep(BaseStep):
super().execute()
customer: Customer = Customer.get_by_global_customer_id(
settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(
- settings.SERVICE_NAME)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(settings.SERVICE_NAME)
cloud_region: CloudRegion = CloudRegion.get_by_id(
cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
cloud_region_id=settings.CLOUD_REGION_ID,
@@ -63,4 +63,6 @@ class ConnectServiceSubToCloudRegionStep(BaseStep):
# for which we are sure that an availability zone has been created
tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
- service_subscription.link_to_cloud_region_and_tenant(cloud_region=cloud_region, tenant=tenant)
+ service_subscription.link_to_cloud_region_and_tenant(
+ cloud_region=cloud_region,
+ tenant=tenant)
diff --git a/src/onaptests/steps/cloud/k8s_connectivity_info_create.py b/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
index 8c361e4..adcf862 100644
--- a/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
+++ b/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
@@ -4,7 +4,6 @@ from jinja2 import Environment, PackageLoader, select_autoescape
from onapsdk.configuration import settings
from onapsdk.exceptions import APIError
from onapsdk.k8s import ConnectivityInfo
-from onapsdk.utils.jinja import jinja_env
from onaptests.steps.base import BaseStep
@@ -31,7 +30,7 @@ class K8SConnectivityInfoStep(BaseStep):
- K8S_CONFIG.
"""
super().execute()
- ######## Create Connectivity Info #########################################
+ # Create Connectivity Info #########################################
try:
self._logger.info("Check if k8s connectivity information exists")
ConnectivityInfo.get_connectivity_info_by_region_id(
diff --git a/src/onaptests/steps/cloud/register_cloud.py b/src/onaptests/steps/cloud/register_cloud.py
index 3065461..c6b440f 100644
--- a/src/onaptests/steps/cloud/register_cloud.py
+++ b/src/onaptests/steps/cloud/register_cloud.py
@@ -33,7 +33,7 @@ class RegisterCloudRegionStep(BaseStep):
return "AAI"
@BaseStep.store_state
- def execute(self):
+ def execute(self): # noqa
"""Register cloud region.
Use settings values:
diff --git a/src/onaptests/steps/cloud/resources.py b/src/onaptests/steps/cloud/resources.py
index e8d16f9..f00b60e 100644
--- a/src/onaptests/steps/cloud/resources.py
+++ b/src/onaptests/steps/cloud/resources.py
@@ -37,6 +37,7 @@ class K8sResource():
else:
return False
+
class K8sPodParentResource(K8sResource):
"""K8sPodParentResource class."""
@@ -138,9 +139,11 @@ class Job(K8sPodParentResource):
class Deployment(K8sPodParentResource):
"""Deployment class."""
+
class ReplicaSet(K8sPodParentResource):
"""ReplicaSet class."""
+
class StatefulSet(K8sPodParentResource):
"""StatefulSet class."""
@@ -162,4 +165,4 @@ class Secret(K8sResource):
class Ingress(K8sResource):
- """Ingress class.""" \ No newline at end of file
+ """Ingress class."""