aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps
diff options
context:
space:
mode:
authorMichal Jagiello <michal.jagiello@t-mobile.pl>2023-07-17 13:30:55 +0000
committerMichal Jagiello <michal.jagiello@t-mobile.pl>2023-07-17 13:50:59 +0000
commit939f1933c8635a18de428c696c97cc5d8600abff (patch)
treea58cb9342f1a9d2cec80fc6a792eae0daae7bc49 /src/onaptests/steps
parentb1e1a5fa99e29b3ffc2bc48d11b163e28cbb7744 (diff)
Add pylama into checks
Checks code with pylama Issue-ID: INT-2222 Signed-off-by: Michal Jagiello <michal.jagiello@t-mobile.pl> Change-Id: If275ef90f21e3049abe72bcf373473b40846005c
Diffstat (limited to 'src/onaptests/steps')
-rw-r--r--src/onaptests/steps/base.py22
-rw-r--r--src/onaptests/steps/cloud/check_status.py90
-rw-r--r--src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py8
-rw-r--r--src/onaptests/steps/cloud/k8s_connectivity_info_create.py3
-rw-r--r--src/onaptests/steps/cloud/register_cloud.py2
-rw-r--r--src/onaptests/steps/cloud/resources.py5
-rw-r--r--src/onaptests/steps/instantiate/k8s_profile_create.py15
-rw-r--r--src/onaptests/steps/instantiate/service_ala_carte.py15
-rw-r--r--src/onaptests/steps/instantiate/service_macro.py50
-rw-r--r--src/onaptests/steps/instantiate/vf_module_ala_carte.py7
-rw-r--r--src/onaptests/steps/instantiate/vl_ala_carte.py12
-rw-r--r--src/onaptests/steps/instantiate/vnf_ala_carte.py6
-rw-r--r--src/onaptests/steps/loop/clamp.py15
-rw-r--r--src/onaptests/steps/loop/instantiate_loop.py13
-rw-r--r--src/onaptests/steps/onboard/clamp.py1
-rw-r--r--src/onaptests/steps/onboard/msb_k8s.py9
-rw-r--r--src/onaptests/steps/onboard/service.py9
-rw-r--r--src/onaptests/steps/onboard/vsp.py8
-rw-r--r--src/onaptests/steps/reports_collection.py7
-rw-r--r--src/onaptests/steps/simulator/cds_mockserver.py4
-rw-r--r--src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py24
-rw-r--r--src/onaptests/steps/wrapper/helm_charts.py25
-rw-r--r--src/onaptests/steps/wrapper/start.py1
23 files changed, 211 insertions, 140 deletions
diff --git a/src/onaptests/steps/base.py b/src/onaptests/steps/base.py
index 8ebe247..2c5fb29 100644
--- a/src/onaptests/steps/base.py
+++ b/src/onaptests/steps/base.py
@@ -163,13 +163,15 @@ class BaseStep(ABC):
def store_state(cls, fun=None, *, cleanup=False):
if fun is None:
return functools.partial(cls.store_state, cleanup=cleanup)
+
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
try:
if cleanup:
self._start_cleanup_time = time.time()
self._logger.info("*****************************************************")
- self._logger.info(f"START [{self.component}] {self.name} cleanup: {self.description}")
+ self._logger.info(f"START [{self.component}] {self.name} cleanup: "
+ f"{self.description}")
self._logger.info("*****************************************************")
else:
self._logger.info("*****************************************************")
@@ -180,7 +182,8 @@ class BaseStep(ABC):
execution_status = ReportStepStatus.PASS
return ret
except SubstepExecutionException:
- execution_status = ReportStepStatus.PASS if cleanup else ReportStepStatus.NOT_EXECUTED
+ execution_status = (ReportStepStatus.PASS if cleanup else
+ ReportStepStatus.NOT_EXECUTED)
raise
except (OnapTestException, SDKException):
execution_status = ReportStepStatus.FAIL
@@ -188,10 +191,12 @@ class BaseStep(ABC):
finally:
if cleanup:
self._logger.info("*****************************************************")
- self._logger.info(f"STOP [{self.component}] {self.name} cleanup: {self.description}")
+ self._logger.info(f"STOP [{self.component}] {self.name} cleanup: "
+ f"{self.description}")
self._logger.info("*****************************************************")
self._cleanup_report = Report(
- step_description=f"[{self.component}] {self.name} cleanup: {self.description}",
+ step_description=(f"[{self.component}] {self.name} cleanup: "
+ f"{self.description}"),
step_execution_status=execution_status,
step_execution_duration=time.time() - self._start_cleanup_time,
step_component=self.component
@@ -202,12 +207,15 @@ class BaseStep(ABC):
self._logger.info("*****************************************************")
if not self._start_execution_time:
if execution_status != ReportStepStatus.NOT_EXECUTED:
- self._logger.error("No execution start time saved for %s step. Fix it by call `super.execute()` "
- "in step class `execute()` method definition", self.name)
+ self._logger.error("No execution start time saved for %s step. "
+ "Fix it by call `super.execute()` "
+ "in step class `execute()` method definition",
+ self.name)
self._start_execution_time = time.time()
self._execution_report = Report(
step_description=f"[{self.component}] {self.name}: {self.description}",
- step_execution_status=execution_status if execution_status else ReportStepStatus.FAIL,
+ step_execution_status=(execution_status if execution_status else
+ ReportStepStatus.FAIL),
step_execution_duration=time.time() - self._start_execution_time,
step_component=self.component
)
diff --git a/src/onaptests/steps/cloud/check_status.py b/src/onaptests/steps/cloud/check_status.py
index 1fcb96b..2b5a8df 100644
--- a/src/onaptests/steps/cloud/check_status.py
+++ b/src/onaptests/steps/cloud/check_status.py
@@ -48,7 +48,7 @@ class CheckK8sResourcesStep(BaseStep):
self.all_resources = []
self.failing_resources = []
self.jinja_env = Environment(autoescape=select_autoescape(['html']),
- loader=PackageLoader('onaptests.templates','status'))
+ loader=PackageLoader('onaptests.templates', 'status'))
@property
def component(self) -> str:
@@ -69,7 +69,7 @@ class CheckK8sResourcesStep(BaseStep):
def _add_failing_resource(self, resource):
if (resource.labels and settings.EXCLUDED_LABELS
- and (resource.labels.keys() and settings.EXCLUDED_LABELS.keys())):
+ and (resource.labels.keys() and settings.EXCLUDED_LABELS.keys())):
for label in resource.labels.items():
for waived_label in settings.EXCLUDED_LABELS.items():
if label[0] in waived_label[0] and label[1] in waived_label[1]:
@@ -84,19 +84,24 @@ class CheckK8sResourcesStep(BaseStep):
try:
self._init_resources()
if len(self.k8s_resources) > 0:
- self.__logger.info("%4s %ss in the namespace", len(self.k8s_resources), self.resource_type)
+ self.__logger.info("%4s %ss in the namespace",
+ len(self.k8s_resources),
+ self.resource_type)
self._parse_resources()
- self.__logger.info("%4s %ss parsed, %s failing", len(self.all_resources), self.resource_type,
- len(self.failing_resources))
+ self.__logger.info("%4s %ss parsed, %s failing",
+ len(self.all_resources),
+ self.resource_type,
+ len(self.failing_resources))
except (ConnectionRefusedError, MaxRetryError, NewConnectionError):
self.__logger.error("Test of k8s %ss failed.", self.resource_type)
self.__logger.error("Cannot connect to Kubernetes.")
+
class CheckBasicK8sResourcesStep(CheckK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, resource_type: str, k8s_res_class, cleanup: bool = False,**kwargs):
+ def __init__(self, resource_type: str, k8s_res_class, cleanup: bool = False, **kwargs):
"""Init CheckBasicK8sResourcesStep."""
super().__init__(resource_type)
self.k8s_res_class = k8s_res_class
@@ -112,11 +117,12 @@ class CheckBasicK8sResourcesStep(CheckK8sResourcesStep):
def execute(self):
super().execute()
+
class CheckK8sConfigMapsStep(CheckBasicK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sConfigMapsStep."""
super().__init__("configmap", ConfigMap)
@@ -124,11 +130,12 @@ class CheckK8sConfigMapsStep(CheckBasicK8sResourcesStep):
super()._init_resources()
self.k8s_resources = self.core.list_namespaced_config_map(NAMESPACE).items
+
class CheckK8sSecretsStep(CheckBasicK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sSecretsStep."""
super().__init__("secret", Secret)
@@ -136,11 +143,12 @@ class CheckK8sSecretsStep(CheckBasicK8sResourcesStep):
super()._init_resources()
self.k8s_resources = self.core.list_namespaced_secret(NAMESPACE).items
+
class CheckK8sIngressesStep(CheckBasicK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sIngressesStep."""
super().__init__("ingress", Ingress)
@@ -148,11 +156,12 @@ class CheckK8sIngressesStep(CheckBasicK8sResourcesStep):
super()._init_resources()
self.k8s_resources = self.networking.list_namespaced_ingress(NAMESPACE).items
+
class CheckK8sPvcsStep(CheckK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sPvcsStep."""
super().__init__("pvc")
@@ -165,10 +174,10 @@ class CheckK8sPvcsStep(CheckK8sResourcesStep):
Return a list of Pods that were created to perform jobs.
"""
super()._parse_resources()
- jobs_pods = []
for k8s in self.k8s_resources:
pvc = Pvc(k8s=k8s)
- field_selector = f"involvedObject.name={pvc.name},involvedObject.kind=PersistentVolumeClaim"
+ field_selector = (f"involvedObject.name={pvc.name}, "
+ "involvedObject.kind=PersistentVolumeClaim")
pvc.events = self.core.list_namespaced_event(
NAMESPACE,
field_selector=field_selector).items
@@ -181,11 +190,12 @@ class CheckK8sPvcsStep(CheckK8sResourcesStep):
def execute(self):
super().execute()
+
class CheckK8sResourcesUsingPodsStep(CheckK8sResourcesStep):
__logger = logging.getLogger(__name__)
- def __init__(self, resource_type: str, pods_source, cleanup: bool = False,**kwargs):
+ def __init__(self, resource_type: str, pods_source, cleanup: bool = False, **kwargs):
"""Init CheckK8sResourcesUsingPodsStep."""
super().__init__(resource_type)
self.pods_source = pods_source
@@ -219,11 +229,12 @@ class CheckK8sResourcesUsingPodsStep(CheckK8sResourcesStep):
def execute(self):
super().execute()
+
class CheckK8sJobsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, cleanup: bool = False,**kwargs):
+ def __init__(self, cleanup: bool = False, **kwargs):
"""Init CheckK8sJobsStep."""
super().__init__("job", None)
@@ -256,19 +267,19 @@ class CheckK8sJobsStep(CheckK8sResourcesUsingPodsStep):
# timemout job
if not k8s.status.completion_time:
- if any(
- waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
+ if any(waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
self._add_failing_resource(job)
# completed job
if any(waiver_elt not in job.name for waiver_elt in settings.WAIVER_LIST):
self.all_resources.append(job)
jobs_pods += job_pods
+
class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sPodsStep."""
super().__init__("pod", pods)
@@ -276,7 +287,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
super()._init_resources()
self.k8s_resources = self.core.list_namespaced_pod(NAMESPACE).items
- def _parse_resources(self):
+ def _parse_resources(self): # noqa
"""Parse the pods."""
super()._parse_resources()
excluded_pods = self._get_used_pods()
@@ -295,9 +306,10 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
pod_component = k8s.metadata.labels[
'app.kubernetes.io/name']
else:
- self.__logger.error("pod %s has no 'app' or 'app.kubernetes.io/name' in metadata: %s", pod_component, k8s.metadata.labels)
+ self.__logger.error("pod %s has no 'app' or 'app.kubernetes.io/name' "
+ "in metadata: %s", pod_component, k8s.metadata.labels)
- ## looks for docker version
+ # looks for docker version
for container in k8s.spec.containers:
pod_version = {}
pod_container_version = container.image.rsplit(":", 1)
@@ -317,7 +329,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
search_rule = "^(?P<source>[^/]*)/*(?P<container>[^:]*):*(?P<version>.*)$"
search = re.search(search_rule, container.image)
name = "{}/{}".format(search.group('source'),
- search.group('container'))
+ search.group('container'))
version = search.group('version')
if name[-1] == '/':
name = name[0:-1]
@@ -416,7 +428,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
)
return logs
- def _parse_container(self, pod, k8s_container, init=False):
+ def _parse_container(self, pod, k8s_container, init=False): # noqa
"""Get the logs of a container."""
logs = ""
old_logs = ""
@@ -443,7 +455,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
logs = self._get_container_logs(pod=pod, container=container, full=False)
with open(
"{}/pod-{}-{}.log".format(self.res_dir,
- pod.name, container.name),
+ pod.name, container.name),
'w') as log_result:
log_result.write(logs)
if (not container.ready) and container.restart_count > 0:
@@ -451,15 +463,15 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
previous=True)
with open(
"{}/pod-{}-{}.old.log".format(self.res_dir,
- pod.name,
- container.name),
+ pod.name,
+ container.name),
'w') as log_result:
log_result.write(old_logs)
if (container.name in settings.FULL_LOGS_CONTAINERS):
logs = self._get_container_logs(pod=pod, container=container)
with open(
"{}/pod-{}-{}.log".format(self.res_dir,
- pod.name, container.name),
+ pod.name, container.name),
'w') as log_result:
log_result.write(logs)
if (container.name in settings.SPECIFIC_LOGS_CONTAINERS):
@@ -484,7 +496,7 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
log_result.write(log_files[log_file])
except client.rest.ApiException as exc:
self.__logger.warning("%scontainer %s of pod %s has an exception: %s",
- prefix, container.name, pod.name, exc.reason)
+ prefix, container.name, pod.name, exc.reason)
self.jinja_env.get_template('container_log.html.j2').stream(
container=container,
pod_name=pod.name,
@@ -501,11 +513,12 @@ class CheckK8sPodsStep(CheckK8sResourcesUsingPodsStep):
return 1
return 0
+
class CheckK8sServicesStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sServicesStep."""
super().__init__("service", pods)
@@ -527,11 +540,12 @@ class CheckK8sServicesStep(CheckK8sResourcesUsingPodsStep):
self.res_dir, service.name))
self.all_resources.append(service)
+
class CheckK8sDeploymentsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sDeploymentsStep."""
super().__init__("deployment", pods)
@@ -566,11 +580,12 @@ class CheckK8sDeploymentsStep(CheckK8sResourcesUsingPodsStep):
self.all_resources.append(deployment)
+
class CheckK8sResplicaSetsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sResplicaSetsStep."""
super().__init__("replicaset", pods)
@@ -601,17 +616,18 @@ class CheckK8sResplicaSetsStep(CheckK8sResourcesUsingPodsStep):
replicaset=replicaset).dump('{}/replicaset-{}.html'.format(
self.res_dir, replicaset.name))
- if (not k8s.status.ready_replicas
- or (k8s.status.ready_replicas < k8s.status.replicas)):
+ if (not k8s.status.ready_replicas or
+ (k8s.status.ready_replicas < k8s.status.replicas)):
self._add_failing_resource(replicaset)
self.all_resources.append(replicaset)
+
class CheckK8sStatefulSetsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sStatefulSetsStep."""
super().__init__("statefulset", pods)
@@ -648,11 +664,12 @@ class CheckK8sStatefulSetsStep(CheckK8sResourcesUsingPodsStep):
self.all_resources.append(statefulset)
+
class CheckK8sDaemonSetsStep(CheckK8sResourcesUsingPodsStep):
__logger = logging.getLogger(__name__)
- def __init__(self, pods, cleanup: bool = False,**kwargs):
+ def __init__(self, pods, cleanup: bool = False, **kwargs):
"""Init CheckK8sDaemonSetsStep."""
super().__init__("daemonset", pods)
@@ -688,6 +705,7 @@ class CheckK8sDaemonSetsStep(CheckK8sResourcesUsingPodsStep):
self.all_resources.append(daemonset)
+
class CheckNamespaceStatusStep(CheckK8sResourcesStep):
"""Check status of all k8s resources in the selected namespace."""
@@ -777,7 +795,9 @@ class CheckNamespaceStatusStep(CheckK8sResourcesStep):
for step in self._steps:
if step.failing:
self.failing = True
- self.__logger.info("%s failing: %s", step.resource_type, len(step.failing_resources))
+ self.__logger.info("%s failing: %s",
+ step.resource_type,
+ len(step.failing_resources))
details[step.resource_type] = {
'number_all': len(step.all_resources),
'number_failing': len(step.failing_resources),
diff --git a/src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py b/src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py
index 97a7823..3bb08c7 100644
--- a/src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py
+++ b/src/onaptests/steps/cloud/connect_service_subscription_to_cloud_region.py
@@ -52,8 +52,8 @@ class ConnectServiceSubToCloudRegionStep(BaseStep):
super().execute()
customer: Customer = Customer.get_by_global_customer_id(
settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(
- settings.SERVICE_NAME)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(settings.SERVICE_NAME)
cloud_region: CloudRegion = CloudRegion.get_by_id(
cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
cloud_region_id=settings.CLOUD_REGION_ID,
@@ -63,4 +63,6 @@ class ConnectServiceSubToCloudRegionStep(BaseStep):
# for which we are sure that an availability zone has been created
tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
- service_subscription.link_to_cloud_region_and_tenant(cloud_region=cloud_region, tenant=tenant)
+ service_subscription.link_to_cloud_region_and_tenant(
+ cloud_region=cloud_region,
+ tenant=tenant)
diff --git a/src/onaptests/steps/cloud/k8s_connectivity_info_create.py b/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
index 8c361e4..adcf862 100644
--- a/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
+++ b/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
@@ -4,7 +4,6 @@ from jinja2 import Environment, PackageLoader, select_autoescape
from onapsdk.configuration import settings
from onapsdk.exceptions import APIError
from onapsdk.k8s import ConnectivityInfo
-from onapsdk.utils.jinja import jinja_env
from onaptests.steps.base import BaseStep
@@ -31,7 +30,7 @@ class K8SConnectivityInfoStep(BaseStep):
- K8S_CONFIG.
"""
super().execute()
- ######## Create Connectivity Info #########################################
+ # Create Connectivity Info #########################################
try:
self._logger.info("Check if k8s connectivity information exists")
ConnectivityInfo.get_connectivity_info_by_region_id(
diff --git a/src/onaptests/steps/cloud/register_cloud.py b/src/onaptests/steps/cloud/register_cloud.py
index 3065461..c6b440f 100644
--- a/src/onaptests/steps/cloud/register_cloud.py
+++ b/src/onaptests/steps/cloud/register_cloud.py
@@ -33,7 +33,7 @@ class RegisterCloudRegionStep(BaseStep):
return "AAI"
@BaseStep.store_state
- def execute(self):
+ def execute(self): # noqa
"""Register cloud region.
Use settings values:
diff --git a/src/onaptests/steps/cloud/resources.py b/src/onaptests/steps/cloud/resources.py
index e8d16f9..f00b60e 100644
--- a/src/onaptests/steps/cloud/resources.py
+++ b/src/onaptests/steps/cloud/resources.py
@@ -37,6 +37,7 @@ class K8sResource():
else:
return False
+
class K8sPodParentResource(K8sResource):
"""K8sPodParentResource class."""
@@ -138,9 +139,11 @@ class Job(K8sPodParentResource):
class Deployment(K8sPodParentResource):
"""Deployment class."""
+
class ReplicaSet(K8sPodParentResource):
"""ReplicaSet class."""
+
class StatefulSet(K8sPodParentResource):
"""StatefulSet class."""
@@ -162,4 +165,4 @@ class Secret(K8sResource):
class Ingress(K8sResource):
- """Ingress class.""" \ No newline at end of file
+ """Ingress class."""
diff --git a/src/onaptests/steps/instantiate/k8s_profile_create.py b/src/onaptests/steps/instantiate/k8s_profile_create.py
index f0ad61b..27831c5 100644
--- a/src/onaptests/steps/instantiate/k8s_profile_create.py
+++ b/src/onaptests/steps/instantiate/k8s_profile_create.py
@@ -12,6 +12,7 @@ import onaptests.utils.exceptions as onap_test_exceptions
from ..base import BaseStep
from .vnf_ala_carte import YamlTemplateVnfAlaCarteInstantiateStep
+
class K8SProfileStep(BaseStep):
"""CreateK8sProfileStep."""
@@ -116,8 +117,10 @@ class K8SProfileStep(BaseStep):
self._logger.info("Create the k8s profile if it doesn't exist")
super().execute()
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
for vnf_instance in self._service_instance.vnf_instances:
# possible to have several modules for 1 VNF
@@ -140,15 +143,15 @@ class K8SProfileStep(BaseStep):
self._logger.error("Missing rb profile information")
raise onap_test_exceptions.ProfileInformationException
- ######## Check profile for Definition ###################################
+ # Check profile for Definition
try:
rbdef.get_profile_by_name(k8s_profile_name)
except ResourceNotFound:
- ######## Create profile for Definition ###################################
+ # Create profile for Definition
profile = rbdef.create_profile(k8s_profile_name,
k8s_profile_namespace,
settings.K8S_PROFILE_K8S_VERSION)
- ####### Upload artifact for created profile ##############################
+ # Upload artifact for created profile
profile.upload_artifact(open(settings.K8S_PROFILE_ARTIFACT_PATH, 'rb').read())
@BaseStep.store_state(cleanup=True)
@@ -172,7 +175,7 @@ class K8SProfileStep(BaseStep):
if k8s_profile_name == "":
self._logger.error("K8s profile deletion failed, missing rb profile name")
raise onap_test_exceptions.ProfileInformationException
- ######## Delete profile for Definition ###################################
+ # Delete profile for Definition
try:
profile = rbdef.get_profile_by_name(k8s_profile_name)
profile.delete()
diff --git a/src/onaptests/steps/instantiate/service_ala_carte.py b/src/onaptests/steps/instantiate/service_ala_carte.py
index a2e1812..9908b6d 100644
--- a/src/onaptests/steps/instantiate/service_ala_carte.py
+++ b/src/onaptests/steps/instantiate/service_ala_carte.py
@@ -188,7 +188,8 @@ class YamlTemplateServiceAlaCarteInstantiateStep(YamlTemplateBaseStep):
super().execute()
service = Service(self.service_name)
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(service.name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(service.name)
cloud_region: CloudRegion = CloudRegion.get_by_id(
cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
cloud_region_id=settings.CLOUD_REGION_ID,
@@ -210,8 +211,8 @@ class YamlTemplateServiceAlaCarteInstantiateStep(YamlTemplateBaseStep):
distribution_completed = service.distributed
if distribution_completed is True:
self._logger.info(
- "Service Distribution for %s is sucessfully finished",
- service.name)
+ "Service Distribution for %s is sucessfully finished",
+ service.name)
break
self._logger.info(
"Service Distribution for %s ongoing, Wait for 60 s",
@@ -221,7 +222,7 @@ class YamlTemplateServiceAlaCarteInstantiateStep(YamlTemplateBaseStep):
if distribution_completed is False:
self._logger.error(
- "Service Distribution for %s failed !!",service.name)
+ "Service Distribution for %s failed !!", service.name)
raise onap_test_exceptions.ServiceDistributionException
service_instantiation = ServiceInstantiation.instantiate_ala_carte(
@@ -243,8 +244,10 @@ class YamlTemplateServiceAlaCarteInstantiateStep(YamlTemplateBaseStep):
self._logger.error("Service instantiation %s failed", self.service_instance_name)
raise onap_test_exceptions.ServiceInstantiateException
else:
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
@YamlTemplateBaseStep.store_state(cleanup=True)
def cleanup(self) -> None:
diff --git a/src/onaptests/steps/instantiate/service_macro.py b/src/onaptests/steps/instantiate/service_macro.py
index b32d936..d97a109 100644
--- a/src/onaptests/steps/instantiate/service_macro.py
+++ b/src/onaptests/steps/instantiate/service_macro.py
@@ -12,13 +12,23 @@ from onapsdk.aai.cloud_infrastructure.tenant import Tenant
from onapsdk.configuration import settings
from onapsdk.exceptions import ResourceNotFound
from onapsdk.sdc.service import Service
-from onapsdk.so.instantiation import InstantiationParameter, ServiceInstantiation, VfmoduleParameters, VnfParameters, SoService
-from onaptests.steps.cloud.customer_service_subscription_create import CustomerServiceSubscriptionCreateStep
+from onapsdk.so.instantiation import (
+ InstantiationParameter,
+ ServiceInstantiation,
+ VfmoduleParameters,
+ VnfParameters,
+ SoService
+)
+from onaptests.steps.cloud.customer_service_subscription_create import (
+ CustomerServiceSubscriptionCreateStep
+)
import onaptests.utils.exceptions as onap_test_exceptions
from onaptests.steps.base import YamlTemplateBaseStep
from onaptests.steps.onboard.service import YamlTemplateServiceOnboardStep
-from onaptests.steps.cloud.connect_service_subscription_to_cloud_region import ConnectServiceSubToCloudRegionStep
+from onaptests.steps.cloud.connect_service_subscription_to_cloud_region import (
+ ConnectServiceSubToCloudRegionStep
+)
class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
@@ -48,7 +58,6 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
else: # only pnfs
self.add_step(CustomerServiceSubscriptionCreateStep(cleanup))
-
@property
def description(self) -> str:
"""Step description."""
@@ -125,7 +134,7 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
return self.parent.service_instance_name
@YamlTemplateBaseStep.store_state
- def execute(self):
+ def execute(self): # noqa
"""Instantiate service.
Use settings values:
@@ -143,7 +152,8 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
super().execute()
service = Service(self.service_name)
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(service.name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(service.name)
if any(
filter(lambda x: x in self.yaml_template[self.service_name].keys(),
["vnfs", "networks"])):
@@ -153,8 +163,9 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
)
tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
else:
- cloud_region, tenant = None, None # Only PNF is going to be instantiated so
- # neither cloud_region nor tenant are needed
+ # Only PNF is going to be instantiated so
+ # neither cloud_region nor tenant are needed
+ cloud_region, tenant = None, None
try:
owning_entity = OwningEntity.get_by_owning_entity_name(
settings.OWNING_ENTITY)
@@ -171,12 +182,12 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
distribution_completed = service.distributed
if distribution_completed is True:
self._logger.info(
- "Service Distribution for %s is sucessfully finished",
- service.name)
+ "Service Distribution for %s is sucessfully finished",
+ service.name)
break
self._logger.info(
"Service Distribution for %s ongoing, Wait for %d s",
- service.name,settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
+ service.name, settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
time.sleep(settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
nb_try += 1
@@ -190,16 +201,19 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
if settings.MODEL_YAML_TEMPLATE:
so_data = self.yaml_template[self.service_name]
so_service = SoService(vnfs=so_data.get("vnfs", []),
- subscription_service_type=so_data.get('subscription_service_type'))
+ subscription_service_type=so_data.get(
+ 'subscription_service_type'))
else:
for vnf_data in self.yaml_template[self.service_name].get("vnfs", []):
vnf_params_list.append(VnfParameters(
vnf_data["vnf_name"],
- [InstantiationParameter(name=parameter["name"], value=parameter["value"]) for parameter in
+ [InstantiationParameter(name=parameter["name"],
+ value=parameter["value"]) for parameter in
vnf_data.get("vnf_parameters", [])],
[VfmoduleParameters(vf_module_data["vf_module_name"],
- [InstantiationParameter(name=parameter["name"], value=parameter["value"]) for
- parameter in vf_module_data.get("parameters", [])]) \
+ [InstantiationParameter(name=parameter["name"],
+ value=parameter["value"]) for
+ parameter in vf_module_data.get("parameters", [])])
for vf_module_data in vnf_data.get("vf_module_parameters", [])]
))
@@ -227,8 +241,10 @@ class YamlTemplateServiceMacroInstantiateStep(YamlTemplateBaseStep):
self._logger.error("Service instantiation %s failed", self.service_instance_name)
raise onap_test_exceptions.ServiceInstantiateException
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
@YamlTemplateBaseStep.store_state(cleanup=True)
def cleanup(self) -> None:
diff --git a/src/onaptests/steps/instantiate/vf_module_ala_carte.py b/src/onaptests/steps/instantiate/vf_module_ala_carte.py
index 38bdee9..b4a7c77 100644
--- a/src/onaptests/steps/instantiate/vf_module_ala_carte.py
+++ b/src/onaptests/steps/instantiate/vf_module_ala_carte.py
@@ -12,6 +12,7 @@ from ..base import YamlTemplateBaseStep
from .vnf_ala_carte import YamlTemplateVnfAlaCarteInstantiateStep
from .k8s_profile_create import K8SProfileStep
+
class YamlTemplateVfModuleAlaCarteInstantiateStep(YamlTemplateBaseStep):
"""Instantiate vf module a'la carte using YAML template."""
@@ -129,8 +130,10 @@ class YamlTemplateVfModuleAlaCarteInstantiateStep(YamlTemplateBaseStep):
"""
super().execute()
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
cloud_region: CloudRegion = CloudRegion.get_by_id(
cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
cloud_region_id=settings.CLOUD_REGION_ID,
diff --git a/src/onaptests/steps/instantiate/vl_ala_carte.py b/src/onaptests/steps/instantiate/vl_ala_carte.py
index 7186c24..022a8b2 100644
--- a/src/onaptests/steps/instantiate/vl_ala_carte.py
+++ b/src/onaptests/steps/instantiate/vl_ala_carte.py
@@ -101,7 +101,7 @@ class YamlTemplateVlAlaCarteInstantiateStep(YamlTemplateBaseStep):
"""
# workaround, as Network name differs from model name (added " 0")
- network_name=re.sub(r"\s\d$", r"", network_name)
+ network_name = re.sub(r"\s\d$", r"", network_name)
for net in self.yaml_template[self.service_name]["networks"]:
if net["vl_name"] == network_name:
if net['subnets'] is None:
@@ -130,11 +130,13 @@ class YamlTemplateVlAlaCarteInstantiateStep(YamlTemplateBaseStep):
super().execute()
service: Service = Service(self.service_name)
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
self._service_instance = service_instance
for idx, network in enumerate(service.networks):
- #for network in self.yaml_template[self.service_name]["networks"]:
+ # for network in self.yaml_template[self.service_name]["networks"]:
net_instantiation = service_instance.add_network(
network,
settings.LINE_OF_BUSINESS,
@@ -159,7 +161,7 @@ class YamlTemplateVlAlaCarteInstantiateStep(YamlTemplateBaseStep):
"""
if self._cleanup:
for net_instance in self._service_instance.network_instances:
- self._logger.info("Start network deletion %s",net_instance.name)
+ self._logger.info("Start network deletion %s", net_instance.name)
net_deletion = net_instance.delete(a_la_carte=True)
try:
net_deletion.wait_for_finish(settings.ORCHESTRATION_REQUEST_TIMEOUT)
diff --git a/src/onaptests/steps/instantiate/vnf_ala_carte.py b/src/onaptests/steps/instantiate/vnf_ala_carte.py
index eb9896f..379e285 100644
--- a/src/onaptests/steps/instantiate/vnf_ala_carte.py
+++ b/src/onaptests/steps/instantiate/vnf_ala_carte.py
@@ -103,8 +103,10 @@ class YamlTemplateVnfAlaCarteInstantiateStep(YamlTemplateBaseStep):
super().execute()
service: Service = Service(self.service_name)
customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
- service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
- self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
+ service_subscription: ServiceSubscription = \
+ customer.get_service_subscription_by_service_type(self.service_name)
+ self._service_instance: ServiceInstance = \
+ service_subscription.get_service_instance_by_name(self.service_instance_name)
cloud_region: CloudRegion = CloudRegion.get_by_id(
cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
cloud_region_id=settings.CLOUD_REGION_ID,
diff --git a/src/onaptests/steps/loop/clamp.py b/src/onaptests/steps/loop/clamp.py
index c614857..502a7f0 100644
--- a/src/onaptests/steps/loop/clamp.py
+++ b/src/onaptests/steps/loop/clamp.py
@@ -80,13 +80,12 @@ class ClampStep(YamlTemplateBaseStep):
else:
return self.parent.service_name
-
def check(self, operational_policies: list, is_template: bool = False) -> str:
"""Check CLAMP requirements to create a loop."""
self._logger.info("Check operational policy")
for policy in operational_policies:
exist = Clamp.check_policies(policy_name=policy["name"],
- req_policies=30)# 30 required policy
+ req_policies=30) # 30 required policy
self._logger.info("Operational policy found.")
if not exist:
raise ValueError("Couldn't load the policy %s", policy)
@@ -109,14 +108,14 @@ class ClampStep(YamlTemplateBaseStep):
def loop_counter(self, action: str) -> None:
""" Count number of loop instances."""
- if action == "plus":
+ if action == "plus":
self.count += 1
- if action == "minus":
+ if action == "minus":
self.count -= 1
@YamlTemplateBaseStep.store_state
def execute(self):
- super().execute() # TODO work only the 1st time, not if already onboarded
+ super().execute() # TODO work only the 1st time, not if already onboarded
# Before instantiating, be sure that the service has been distributed
service = Service(self.service_name)
@@ -128,8 +127,8 @@ class ClampStep(YamlTemplateBaseStep):
distribution_completed = service.distributed
if distribution_completed is True:
self._logger.info(
- "Service Distribution for %s is sucessfully finished",
- service.name)
+ "Service Distribution for %s is sucessfully finished",
+ service.name)
break
self._logger.info(
"Service Distribution for %s ongoing, Wait for 60 s",
@@ -139,7 +138,7 @@ class ClampStep(YamlTemplateBaseStep):
if distribution_completed is False:
self._logger.error(
- "Service Distribution for %s failed !!",service.name)
+ "Service Distribution for %s failed !!", service.name)
raise onap_test_exceptions.ServiceDistributionException
# time to wait for template load in CLAMP
diff --git a/src/onaptests/steps/loop/instantiate_loop.py b/src/onaptests/steps/loop/instantiate_loop.py
index 07586ac..fab70ad 100644
--- a/src/onaptests/steps/loop/instantiate_loop.py
+++ b/src/onaptests/steps/loop/instantiate_loop.py
@@ -13,9 +13,9 @@ class InstantiateLoop():
"""class instantiating a closed loop in clamp."""
def __init__(self, template: str, loop_name: str, operational_policies: list):
- self.template=template
- self.loop_name=loop_name
- self.operational_policies=operational_policies
+ self.template = template
+ self.loop_name = loop_name
+ self.operational_policies = operational_policies
self._logger: logging.Logger = logging.getLogger("")
logging.config.dictConfig(settings.LOG_CONFIG)
@@ -30,7 +30,6 @@ class InstantiateLoop():
self._logger.error("an error occured while adding an operational policy")
self._logger.info("ADD OPERATION SUCCESSFULY DONE")
-
def configure_policies(self, loop: LoopInstance) -> None:
"""Configure all policies."""
self._logger.info("******** UPDATE MICROSERVICE POLICY *******")
@@ -38,8 +37,8 @@ class InstantiateLoop():
loop.update_microservice_policy()
self._logger.info("******** UPDATE OPERATIONAL POLICIES CONFIG *******")
for policy in self.operational_policies:
- #loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]])
- #possible configurations for the moment
+ # loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]])
+ # possible configurations for the moment
if policy["name"] == "MinMax":
loop.add_op_policy_config(loop.add_minmax_config)
if policy["name"] == "Drools":
@@ -53,7 +52,7 @@ class InstantiateLoop():
self._logger.info("******** SUBMIT POLICIES TO PE *******")
submit = loop.act_on_loop_policy(loop.submit)
self._logger.info("******** CHECK POLICIES SUBMITION *******")
- if submit :
+ if submit:
self._logger.info("Policies successfully submited to PE")
else:
diff --git a/src/onaptests/steps/onboard/clamp.py b/src/onaptests/steps/onboard/clamp.py
index 72daba2..c8984ba 100644
--- a/src/onaptests/steps/onboard/clamp.py
+++ b/src/onaptests/steps/onboard/clamp.py
@@ -10,6 +10,7 @@ from onapsdk.configuration import settings
from ..base import YamlTemplateBaseStep
from .service import YamlTemplateVfOnboardStep
+
class OnboardClampStep(YamlTemplateBaseStep):
"""Onboard class to create CLAMP templates."""
diff --git a/src/onaptests/steps/onboard/msb_k8s.py b/src/onaptests/steps/onboard/msb_k8s.py
index 3605c29..666de33 100644
--- a/src/onaptests/steps/onboard/msb_k8s.py
+++ b/src/onaptests/steps/onboard/msb_k8s.py
@@ -69,11 +69,12 @@ class CreateProfileStep(BaseStep):
def execute(self) -> None:
"""Create profile."""
super().execute()
- definition: Definition = Definition.get_definition_by_name_version(\
+ definition: Definition = Definition.get_definition_by_name_version(
rb_name=settings.MSB_K8S_RB_NAME,
rb_version=settings.MSB_K8S_RB_VERSION)
with open(settings.MSB_K8S_PROFILE_ARTIFACT_FILE_PATH, "rb") as profile_file:
- self.profile = definition.create_profile(profile_name=settings.MSB_K8S_PROFILE_NAME,
- namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
- kubernetes_version=settings.K8S_VERSION)
+ self.profile = definition.create_profile(
+ profile_name=settings.MSB_K8S_PROFILE_NAME,
+ namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
+ kubernetes_version=settings.K8S_VERSION)
self.profile.upload_artifact(profile_file.read())
diff --git a/src/onaptests/steps/onboard/service.py b/src/onaptests/steps/onboard/service.py
index b1e7d4d..c0bec0d 100644
--- a/src/onaptests/steps/onboard/service.py
+++ b/src/onaptests/steps/onboard/service.py
@@ -55,7 +55,8 @@ class ServiceOnboardStep(BaseStep):
"""
super().execute()
- service: Service = Service(name=settings.SERVICE_NAME, instantiation_type=settings.SERVICE_INSTANTIATION_TYPE)
+ service: Service = Service(name=settings.SERVICE_NAME,
+ instantiation_type=settings.SERVICE_INSTANTIATION_TYPE)
if not service.created():
service.create()
if settings.VL_NAME != "":
@@ -70,7 +71,8 @@ class ServiceOnboardStep(BaseStep):
# If the service is already distributed, do not try to checkin/onboard (replay of tests)
# checkin is done if needed
# If service is replayed, no need to try to re-onboard the model
- # Double check because of: https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
+ # Double check because of:
+ # https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
if not service.distributed and service.status != onapsdk_const.DISTRIBUTED:
if service.status == onapsdk_const.DRAFT:
try:
@@ -186,7 +188,8 @@ class YamlTemplateServiceOnboardStep(YamlTemplateBaseStep):
# If the service is already distributed, do not try to checkin/onboard (replay of tests)
# checkin is done if needed
# If service is replayed, no need to try to re-onboard the model
- # Double check because of: https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
+ # Double check because of:
+ # https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
if not service.distributed and service.status != onapsdk_const.DISTRIBUTED:
if service.status == onapsdk_const.DRAFT:
try:
diff --git a/src/onaptests/steps/onboard/vsp.py b/src/onaptests/steps/onboard/vsp.py
index 49b766b..577b1cf 100644
--- a/src/onaptests/steps/onboard/vsp.py
+++ b/src/onaptests/steps/onboard/vsp.py
@@ -41,7 +41,9 @@ class VspOnboardStep(BaseStep):
"""
super().execute()
vendor: Vendor = Vendor(name=settings.VENDOR_NAME)
- vsp: Vsp = Vsp(name=settings.VSP_NAME, vendor=vendor, package=open(settings.VSP_FILE_PATH, "rb"))
+ vsp: Vsp = Vsp(name=settings.VSP_NAME,
+ vendor=vendor,
+ package=open(settings.VSP_FILE_PATH, "rb"))
vsp.onboard()
@BaseStep.store_state(cleanup=True)
@@ -123,8 +125,8 @@ class YamlTemplateVspOnboardStep(YamlTemplateBaseStep):
if "heat_files_to_upload" in pnf:
with open(get_resource_location(pnf["heat_files_to_upload"]), "rb") as package:
vsp: Vsp = Vsp(name=f"{pnf['pnf_name']}_VSP",
- vendor=vendor,
- package=package)
+ vendor=vendor,
+ package=package)
vsp.onboard()
@YamlTemplateBaseStep.store_state(cleanup=True)
diff --git a/src/onaptests/steps/reports_collection.py b/src/onaptests/steps/reports_collection.py
index be06151..99a6ef2 100644
--- a/src/onaptests/steps/reports_collection.py
+++ b/src/onaptests/steps/reports_collection.py
@@ -15,6 +15,7 @@ class ReportStepStatus(Enum):
FAIL = "FAIL"
NOT_EXECUTED = "NOT EXECUTED"
+
@dataclass
class Report:
"""Step execution report."""
@@ -85,7 +86,8 @@ class ReportsCollection:
details=details,
components=components,
log_path="./pythonsdk.debug.log").dump(
- str(Path(settings.REPORTING_FILE_DIRECTORY).joinpath(settings.HTML_REPORTING_FILE_NAME)))
+ str(Path(settings.REPORTING_FILE_DIRECTORY).joinpath(
+ settings.HTML_REPORTING_FILE_NAME)))
report_dict = {
'usecase': usecase,
@@ -101,5 +103,6 @@ class ReportsCollection:
for step_report in reversed(self.report)
]
}
- with (Path(settings.REPORTING_FILE_DIRECTORY).joinpath(settings.JSON_REPORTING_FILE_NAME)).open('w') as file:
+ with (Path(settings.REPORTING_FILE_DIRECTORY).joinpath(
+ settings.JSON_REPORTING_FILE_NAME)).open('w') as file:
json.dump(report_dict, file, indent=4)
diff --git a/src/onaptests/steps/simulator/cds_mockserver.py b/src/onaptests/steps/simulator/cds_mockserver.py
index 6933fa0..9fc4162 100644
--- a/src/onaptests/steps/simulator/cds_mockserver.py
+++ b/src/onaptests/steps/simulator/cds_mockserver.py
@@ -47,11 +47,11 @@ class CdsMockserverCnfConfigureStep(BaseStep):
response = requests.put(
"http://portal.api.simpledemo.onap.org:30726/mockserver/expectation",
json={
- "httpRequest" : {
+ "httpRequest": {
"method": expectation["method"],
"path": expectation["path"]
},
- "httpResponse" : {
+ "httpResponse": {
"body": expectation["response"]
}
}
diff --git a/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py b/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py
index 3a846e3..2a4f47e 100644
--- a/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py
+++ b/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py
@@ -50,13 +50,13 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
config.load_incluster_config()
else:
config.load_kube_config(config_file=settings.K8S_CONFIG)
- k8s_client: "CoreV1API" = client.CoreV1Api()
- k8s_watch: "Watch" = watch.Watch()
+ k8s_client: "client.CoreV1Api" = client.CoreV1Api()
+ k8s_watch: "watch.Watch" = watch.Watch()
status = False
try:
for event in k8s_watch.stream(k8s_client.list_namespaced_pod,
- namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
- timeout_seconds=timeout_seconds):
+ namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
+ timeout_seconds=timeout_seconds):
if event["object"].metadata.name == "pnf-macro-test-simulator":
if not event["object"].status.phase in ["Pending", "Running"]:
# Invalid pod state
@@ -82,9 +82,10 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
config.load_incluster_config()
else:
config.load_kube_config(config_file=settings.K8S_CONFIG)
- k8s_client: "CoreV1API" = client.CoreV1Api()
+ k8s_client: "client.CoreV1Api" = client.CoreV1Api()
try:
- for service in k8s_client.list_namespaced_service(namespace=settings.K8S_ONAP_NAMESPACE).items:
+ for service in k8s_client.list_namespaced_service(
+ namespace=settings.K8S_ONAP_NAMESPACE).items:
if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME:
proto = "http"
if "443" in str(service.spec.ports[0].port):
@@ -102,11 +103,13 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
status = self.is_pnf_pod_running()
if not status:
raise EnvironmentPreparationException("PNF simulator is not running")
- time.sleep(settings.PNF_WAIT_TIME) # Let's still wait for PNF simulator to make sure it's initialized
+ # Let's still wait for PNF simulator to make sure it's initialized
+ time.sleep(settings.PNF_WAIT_TIME)
ves_proto, ves_ip, ves_port = self.get_ves_protocol_ip_and_port()
registration_number: int = 0
registered_successfully: bool = False
- while registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and not registered_successfully:
+ while (registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and
+ not registered_successfully):
try:
response = requests.post(
"http://portal.api.simpledemo.onap.org:30999/simulator/start",
@@ -114,7 +117,7 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
"simulatorParams": {
"repeatCount": 9999,
"repeatInterval": 30,
- "vesServerUrl": f"{ves_proto}://sample1:sample1@{ves_ip}:{ves_port}/eventListener/v7"
+ "vesServerUrl": f"{ves_proto}://sample1:sample1@{ves_ip}:{ves_port}/eventListener/v7" # noqa
},
"templateName": "registration.json",
"patch": {
@@ -132,7 +135,8 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
)
response.raise_for_status()
registered_successfully = True
- self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} source name")
+ self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} "
+ "source name")
except (requests.ConnectionError, requests.HTTPError) as http_error:
self._logger.debug(f"Can't connect with PNF simulator: {str(http_error)}")
registration_number = registration_number + 1
diff --git a/src/onaptests/steps/wrapper/helm_charts.py b/src/onaptests/steps/wrapper/helm_charts.py
index b8a6b21..f745150 100644
--- a/src/onaptests/steps/wrapper/helm_charts.py
+++ b/src/onaptests/steps/wrapper/helm_charts.py
@@ -4,12 +4,11 @@ from avionix import ChartBuilder, ChartDependency, ChartInfo
from avionix.errors import HelmError
from onaptests.steps.base import BaseStep
from onaptests.utils.resources import get_local_dir
-from onaptests.utils.exceptions import (
+from onaptests.utils.exceptions import (
EnvironmentPreparationException,
EnvironmentCleanupException)
-
class HelmChartStep(BaseStep):
"""Basic operations on a docker container."""
@@ -36,7 +35,6 @@ class HelmChartStep(BaseStep):
msg = f"{chart_info_file} not found."
raise EnvironmentPreparationException(msg) from err
-
try:
for dependency in chart_info["dependencies"]:
dep = ChartDependency(
@@ -48,16 +46,16 @@ class HelmChartStep(BaseStep):
dependencies.append(dep)
self.builder = ChartBuilder(
- chart_info=ChartInfo(
- api_version=chart_info["api_version"],
- name=chart_info["chart_name"],
- version=chart_info["version"], # SemVer 2 version
- app_version=chart_info["app_version"],
- dependencies=dependencies
- ),
- kubernetes_objects=[],
- keep_chart=False
- )
+ chart_info=ChartInfo(
+ api_version=chart_info["api_version"],
+ name=chart_info["chart_name"],
+ version=chart_info["version"], # SemVer 2 version
+ app_version=chart_info["app_version"],
+ dependencies=dependencies
+ ),
+ kubernetes_objects=[],
+ keep_chart=False
+ )
except KeyError as err:
msg = f"{chart_info_file} does not contain required keys."
raise EnvironmentPreparationException(msg) from err
@@ -82,7 +80,6 @@ class HelmChartStep(BaseStep):
msg = "Error during helm release installation."
raise EnvironmentPreparationException(msg) from err
-
def cleanup(self) -> None:
"""Uninstall helm release."""
try:
diff --git a/src/onaptests/steps/wrapper/start.py b/src/onaptests/steps/wrapper/start.py
index 18ba5df..8b950da 100644
--- a/src/onaptests/steps/wrapper/start.py
+++ b/src/onaptests/steps/wrapper/start.py
@@ -4,6 +4,7 @@ import requests
from onaptests.steps.base import BaseStep
from onaptests.utils.exceptions import TestConfigurationException
+
class SimulatorStartStep(BaseStep):
"""Basic operations on a docker container."""