diff options
Diffstat (limited to 'services')
37 files changed, 2136 insertions, 1031 deletions
diff --git a/services/__init__.py b/services/__init__.py index 30d7152..32b601a 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. diff --git a/services/api/__init__.py b/services/api/__init__.py index 30d7152..32b601a 100644 --- a/services/api/__init__.py +++ b/services/api/__init__.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. diff --git a/services/api/api_bridge.py b/services/api/api_bridge.py index 8926a1d..d8cb46c 100644 --- a/services/api/api_bridge.py +++ b/services/api/api_bridge.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -36,11 +36,15 @@ # ============LICENSE_END============================================ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. + + class APIBridge: """ - This class helps to use functions inside classes with circular import (dependencies). - Use this class only when there is circular import in one of the API services. + This class helps to use functions inside + classes with circular import (dependencies). + Use this class only when there is circular + import in one of the API services. """ @staticmethod @@ -63,7 +67,8 @@ class APIBridge: @staticmethod def create_engagement(wait_for_gitlab=True): - """create_engagement: Originally can be found under APIVirtualFunction class.""" + """create_engagement: Originally can be found under + APIVirtualFunction class.""" from services.api.api_virtual_function import APIVirtualFunction return APIVirtualFunction.create_engagement(wait_for_gitlab) diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py index fda0730..02ab311 100644 --- a/services/api/api_checklist.py +++ b/services/api/api_checklist.py @@ -37,7 +37,6 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. import datetime -import logging import requests @@ -56,7 +55,12 @@ logger = LoggingServiceFactory.get_logger() class APIChecklist: @staticmethod - def create_checklist(user_content, files=["file0", "file1"], return_negative_response=False): + def create_checklist( + user_content, + files=[ + "file0", + "file1"], + return_negative_response=False): r1 = None postURL = Constants.Default.URL.Checklist.TEXT + \ user_content['engagement_uuid'] + '/checklist/new/' @@ -69,11 +73,13 @@ class APIChecklist: data['checkListName'] = "checklistAPI" + \ Helper.rand_string('randomString') data['checkListTemplateUuid'] = DBGeneral.select_where( - "uuid", "ice_checklist_template", "name", Constants.Template.Heat.TEXT, 1) + "uuid", "ice_checklist_template", "name", + Constants.Template.Heat.TEXT, 1) try: if not APIGitLab.is_gitlab_ready(user_content): raise Exception( - "Gitlab is not ready and because of that the test is failed.") + "Gitlab is not ready and because " + + "of that the test is failed.") r1 = requests.post(postURL, json=data, headers=headers, verify=False) @@ -82,15 +88,19 @@ class APIChecklist: logger.debug("Checklist was created successfully!") cl_content = r1.json() return cl_content - except: + except BaseException: if return_negative_response: return r1 if r1 is None: logger.error( - "Failed to create checklist for VF " + user_content['vfName']) + "Failed to create checklist for VF " + + user_content['vfName']) else: - logger.error("Failed to create checklist for VF " + user_content[ - 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "Failed to create checklist for VF " + + user_content['vfName'] + + ", see response >>> %s %s.\nContent: %s" % + (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) raise @staticmethod @@ -107,7 +117,8 @@ class APIChecklist: data['checkListName'] = "UpdateChecklistAPI" + \ Helper.rand_string('randomString') data['checkListTemplateUuid'] = DBGeneral.select_where( - "uuid", "ice_checklist_template", "name", Constants.Template.Heat.TEXT, 1) + "uuid", "ice_checklist_template", "name", + Constants.Template.Heat.TEXT, 1) try: r1 = requests.put( postURL, json=data, headers=headers, verify=False) @@ -115,13 +126,17 @@ class APIChecklist: logger.debug("DBChecklist was created successfully!") cl_content = r1.json() return cl_content['uuid'] - except: + except BaseException: if r1 is None: logger.error( - "Failed to create checklist for VF " + user_content['vfName']) + "Failed to create checklist for VF " + + user_content['vfName']) else: - logger.error("Failed to create checklist for VF " + user_content[ - 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "Failed to create checklist for VF " + + user_content['vfName'] + + ", see response >>> %s %s.\nContent: %s" % + (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) raise @staticmethod @@ -141,13 +156,16 @@ class APIChecklist: postURL, json=data, headers=headers, verify=False) Helper.internal_assert_boolean(r1.status_code, 200) logger.debug("Audit log was added successfully!") - except: + except BaseException: if r1 is None: logger.error( "Failed to add audit log for checklist uuid: " + cl_uuid) else: - logger.error("Failed to add audit log for checklist uuid: " + cl_uuid + - ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "Failed to add audit log for checklist uuid: " + + cl_uuid + + ", see response >>> %s %s.\nContent: %s" % + (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) raise @staticmethod @@ -175,13 +193,16 @@ class APIChecklist: logger.debug("Next step was added successfully!") ns_uuid = r1.json() return ns_uuid[0]['uuid'] - except: + except BaseException: if r1 is None: logger.error( "Failed to add next step for checklist uuid: " + cl_uuid) else: - logger.error("Failed to add next step for checklist uuid: " + cl_uuid + - ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "Failed to add next step for checklist uuid: " + + cl_uuid + + ", see response >>> %s %s.\nContent: %s" % + (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) raise @staticmethod @@ -202,8 +223,8 @@ class APIChecklist: logger.debug("go_to_next_state put request result status: %s" % r.status_code) else: - logger.error( - "PUT request failed to change checklist state >>> " + str(r.status_code) + " " + r.reason) + logger.error("PUT request failed to change checklist state >>> " + + str(r.status_code) + " " + r.reason) raise Exception("PUT request failed to change checklist state") @staticmethod @@ -215,7 +236,8 @@ class APIChecklist: Constants.ChecklistStates.Closed.TEXT] for i in range(len(vf_staff_emails)): logger.debug( - "Trying to jump state for %s [%s]" % (vf_staff_emails[i], i)) + "Trying to jump state for %s [%s]" % + (vf_staff_emails[i], i)) DBChecklist.update_all_decisions_to_approve(cl_uuid) api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i]) logger.debug("Checking state changed to %s" % states[i]) diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py index 6c5a2ff..639f67e 100644 --- a/services/api/api_gitlab.py +++ b/services/api/api_gitlab.py @@ -36,7 +36,6 @@ # ============LICENSE_END============================================ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import logging import os import subprocess import sys @@ -63,7 +62,7 @@ class APIGitLab: def display_output(p): while True: out = p.stderr.read(1) - if out == b'' and p.poll() != None: + if out == b'' and p.poll() is not None: break if out != '': sys.stdout.write(str(out.decode())) @@ -81,31 +80,42 @@ class APIGitLab: try: r1 = requests.get(getURL, headers=headers, verify=False) counter = 0 - while r1.status_code == 404 or r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + while r1.status_code == 404 or r1.content == b'[]' and \ + counter <= Constants.\ + GitLabConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause) r1 = requests.get(getURL, headers=headers, verify=False) logger.debug( - "trying to get the git project, yet to succeed (try #%s)" % counter) + "trying to get the git project, " + + "yet to succeed (try #%s)" % counter) counter += 1 + Helper.internal_assert(r1.status_code, 200) if r1.content == b'[]': logger.error("Got an empty list as a response.") raise logger.debug("Project exists on APIGitLab!") content = r1.json() # Change it from list to dict. return content - except: + except BaseException: if r1 is None: logger.error("Failed to get project from APIGitLab.") else: - logger.error("Failed to get project from APIGitLab, see response >>> %s %s \n %s" - % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "Failed to get project from APIGitLab, " + + "see response >>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise - def are_all_list_users_registered_as_project_members(self, users_emails_list, project_path_with_namespace): + def are_all_list_users_registered_as_project_members( + self, users_emails_list, project_path_with_namespace): for email in users_emails_list: - if not self.validate_git_project_members(project_path_with_namespace, email): + if not self.validate_git_project_members( + project_path_with_namespace, email): raise Exception( - "Couldn't find the invited users: " + email + " in GitLab.") + "Couldn't find the invited users: " + + email + + " in GitLab.") logger.debug( "Invited user: " + email + " found in GitLab.") @@ -121,7 +131,9 @@ class APIGitLab: headers['Content-type'] = 'application/json' headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN counter = 0 - while (r1 is None or r1.content == b'[]' or r1.status_code != 200) and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + while (r1 is None or r1.content == b'[]' or r1.status_code != + 200) and counter <= Constants.GitLabConstants.\ + RETRIES_NUMBER: logger.debug( "try to get git project members (try #%s)" % counter) time.sleep(session.wait_until_time_pause) @@ -130,8 +142,11 @@ class APIGitLab: counter += 1 except Exception as e: if counter >= Constants.GitLabConstants.RETRIES_NUMBER: - logger.error("Failed to get project's team members from APIGitLab, see response >>> %s %s \n %s %s" - % (r1.status_code, r1.reason, str(r1.content, 'utf-8'), e.message)) + logger.error( + "Failed to get project's team members from " + + "APIGitLab, see response >>> %s %s \n %s %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'), e.message)) return False if r1.content == b'[]': logger.error("Got an empty list as a response.") @@ -144,7 +159,8 @@ class APIGitLab: return True @staticmethod - def negative_validate_git_project_member(path_with_namespace, user_email, git_user_id): + def negative_validate_git_project_member( + path_with_namespace, user_email, git_user_id): if settings.DATABASE_TYPE != 'local': r1 = None headers = dict() @@ -154,7 +170,9 @@ class APIGitLab: headers['Content-type'] = 'application/json' headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN counter = 0 - while r1 is None or str.encode(user_email) not in r1.content and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + while r1 is None or str.encode( + user_email) not in r1.content and counter <= Constants.\ + GitLabConstants.RETRIES_NUMBER: logger.debug( "try to get git project members (try #%s)" % counter) time.sleep(session.wait_until_time_pause) @@ -163,8 +181,11 @@ class APIGitLab: counter += 1 except Exception as e: if counter >= Constants.GitLabConstants.RETRIES_NUMBER: - logger.error("Failed to get project's team members from APIGitLab, see response >>> %s %s \n %s %s" - % (r1.status_code, r1.reason, str(r1.content, 'utf-8'), e.message)) + logger.error( + "Failed to get project's team members from " + + "APIGitLab, see response >>> %s %s \n %s %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'), e.message)) return False if r1.content == b'[]': @@ -193,7 +214,9 @@ class APIGitLab: counter = 0 while r1.content == b'[]' and counter <= 60: logger.info( - "Will try to get gitlab user until will be response... #%s" % counter) + "Will try to get gitlab user until " + + "will be response... #%s" % + counter) time.sleep(session.wait_until_time_pause_long) r1 = requests.get(getURL, headers=headers, verify=False) Helper.internal_assert(r1.status_code, 200) @@ -207,12 +230,15 @@ class APIGitLab: (r1.status_code, r1.reason, r1.content)) content = r1.json() return content[0] - except: + except BaseException: if r1 is None: logger.error("Failed to get user from APIGitLab.") else: - logger.error("Failed to get user from APIGitLab, see response >>> %s %s \n %s" - % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "Failed to get user from APIGitLab, see response " + + ">>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod @@ -235,19 +261,24 @@ class APIGitLab: content = r1.json() # Change it from list to dict. gitPubKey = content[0]['key'] return gitPubKey - except: + except BaseException: if r1 is None: - logger.error("Failed to get user's public key from APIGitLab.") + logger.error("Failed to get user's public key " + + "from APIGitLab.") else: - logger.error("Failed to get user's public key from APIGitLab, see response >>> %s %s \n %s" - % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "Failed to get user's public key from APIGitLab, " + + "see response >>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod def git_clone_push(user_content): if settings.DATABASE_TYPE != 'local': logger.debug( - "About to push files into project's repository on the local folder(not over origin).") + "About to push files into project's repository on the " + + "local folder(not over origin).") try: user_content['session_token'] = "token " + \ APIBridge.login_user(Constants.Users.Admin.EMAIL) @@ -267,7 +298,8 @@ class APIGitLab: counter = 0 git_user_pub_key = None - while user_pub_key != git_user_pub_key and counter < Constants.GitLabConstants.RETRIES_NUMBER: + while user_pub_key != git_user_pub_key and counter < \ + Constants.GitLabConstants.RETRIES_NUMBER: try: git_user_pub_key = APIGitLab.get_git_user_ssh_key( git_user['id']) @@ -279,17 +311,20 @@ class APIGitLab: # Check that the SSH key was added to user on APIGitLab. if user_pub_key != git_user_pub_key: - raise Exception("The SSH Key received does not equal to the" - " one provided! The key from" - "APIGitLab:\n %s ==<>== %s" - % (git_user_pub_key, user_pub_key)) + raise Exception( + "The SSH Key received does not equal to the" + " one provided! The key from" + "APIGitLab:\n %s ==<>== %s" % + (git_user_pub_key, user_pub_key)) gitRepoURL = "git@gitlab:%s/%s.git" % ( - user_content['engagement_manual_id'], user_content['vfName']) + user_content['engagement_manual_id'], + user_content['vfName']) logger.debug("Clone repo from: " + gitRepoURL) APIGitLab.is_gitlab_ready(user_content) cmd = 'cd ' + repo_dir + \ - '; git config --global user.email \"' + Constants.Users.Admin.EMAIL + \ + '; git config --global user.email \"' + Constants.\ + Users.Admin.EMAIL + \ '\"; git config --global user.name \"' + \ Constants.Users.Admin.FULLNAME + '\";' # Commit all changes. @@ -358,9 +393,11 @@ class APIGitLab: "All edited files were commited and pushed to APIGitLab.") except Exception as e: logger.error( - "_-_-_-_-_- Unexpected error in git_push_commit : " + str(e)) + "_-_-_-_-_- Unexpected error in git_push_commit : " + + str(e)) raise Exception( - "Something went wrong on git_push_commit function, please check logs.") + "Something went wrong on git_push_commit " + + "function, please check logs.") @staticmethod def is_gitlab_ready(user_content): diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py index b63cb66..0b57b00 100644 --- a/services/api/api_jenkins.py +++ b/services/api/api_jenkins.py @@ -40,10 +40,10 @@ from django.conf import settings from requests.auth import HTTPBasicAuth from services.constants import Constants from services.helper import Helper +from services.session import session import logging import requests import time -from services.session import session logger = logging.getLogger('ice-ci.logger') @@ -60,7 +60,8 @@ class APIJenkins: try: r1 = requests.get(getURL, auth=HTTPBasicAuth( settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD)) - while r1.status_code != 200 and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + while r1.status_code != 200 and counter <= \ + Constants.GitLabConstants.RETRIES_NUMBER: r1 = requests.get(getURL, auth=HTTPBasicAuth( settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD)) time.sleep(session.wait_until_time_pause) @@ -69,14 +70,15 @@ class APIJenkins: counter += 1 Helper.internal_assert(r1.status_code, 200) logger.debug("Job was created on APIJenkins!") - except: + except BaseException: msg = None if r1 is None: msg = "APIJenkins didn't create job for %s" % job_name else: - msg = "APIJenkins didn't create job for %s, see response >>> %s %s" % ( - job_name, r1.status_code, r1.reason) + msg = "APIJenkins didn't create job for %s, " +\ + "see response >>> %s %s" % ( + job_name, r1.status_code, r1.reason) logger.error(msg) raise Exception(msg) @@ -85,6 +87,7 @@ class APIJenkins: def find_build_num_out_of_jenkins_log(log): lines_array = log.splitlines() for line in lines_array: - if Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.BUILD_IDENTIFIER in line: + if Constants.Dashboard.Checklist.JenkinsLog.\ + Modal.Body.BUILD_IDENTIFIER in line: parts = line.partition('jenkins') return parts[2] diff --git a/services/api/api_rados.py b/services/api/api_rados.py index 61cfa5c..cdad7d4 100644 --- a/services/api/api_rados.py +++ b/services/api/api_rados.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -36,7 +36,6 @@ # ============LICENSE_END============================================ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import logging import time from boto.s3.connection import S3Connection, OrdinaryCallingFormat @@ -50,6 +49,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class APIRados: @staticmethod @@ -75,77 +75,56 @@ class APIRados: @staticmethod def get_bucket_grants(bucket_name): """Return the Grants.""" - counter = 1 - bucket = APIRados.get_bucket(bucket_name) - while not bucket and counter <= Constants.RGWAConstants.BUCKET_RETRIES_NUMBER: - logger.error("Bucket not found. Retry #%s" % counter) - time.sleep(session.wait_until_time_pause_long) + for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): bucket = APIRados.get_bucket(bucket_name) - counter += 1 - if not bucket: + if bucket: + break + logger.error("Bucket not found. Retry #%s" % counter+1) + time.sleep(session.wait_until_time_pause_long) + else: raise TimeoutError("Max retries exceeded, failing test...") grants = bucket.list_grants() - print("***********grants=", grants) return grants @staticmethod def is_bucket_ready(bucket_id): - counter = 1 - bucket = APIRados.get_bucket(bucket_id) - while (bucket == None and counter <= - Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): - time.sleep(session.wait_until_time_pause_long) - logger.debug( - "bucket are not ready yet, trying again (%s of 180)" % counter) + for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): bucket = APIRados.get_bucket(bucket_id) - counter += 1 - print("****_+__+bucket= ", str(bucket)) - time.sleep(session.wait_until_time_pause_long) - if bucket == None: + if bucket: + break + logger.debug( + "bucket are not ready yet, trying again (%s of %s)" % ( + counter+1, Constants.RGWAConstants.BUCKET_RETRIES_NUMBER)) + time.sleep(session.wait_until_time_pause_long) + else: raise TimeoutError("Max retries exceeded, failing test...") - elif bucket != None: - logger.debug("bucket are ready to continue!") - return True + logger.debug("bucket are ready to continue!") + return True @staticmethod def users_of_bucket_ready_after_complete(bucket_id, user_name): - grants = APIRados.get_bucket_grants(bucket_id) - count = 0 - counter = 1 - while (count != 0 and counter <= - Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): + for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): grants = APIRados.get_bucket_grants(bucket_id) + if not any(user_name == g.id for g in grants): + break time.sleep(session.wait_until_time_pause_long) - for g in grants: - if g.id == user_name: - count = +1 - time.sleep(session.wait_until_time_pause_long) - if count != 0: + else: raise Exception("Max retries exceeded, failing test...") return False - elif count == 0: - logger.debug("users_of_bucket are ready to continue!") - return True + logger.debug("users_of_bucket are ready to continue!") + return True @staticmethod - def users_of_bucket_ready_after_created(bucket_id, user_name): - grants = APIRados.get_bucket_grants(bucket_id) - count = 0 - counter = 1 - while (count == 0 and counter <= - Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): + def users_of_bucket_ready_after_created(bucket_id, user_uuid): + for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): grants = APIRados.get_bucket_grants(bucket_id) + if any(user_uuid == g.id for g in grants): + break time.sleep(session.wait_until_time_pause_long) - for g in grants: - if g.id == user_name: - count = +1 - time.sleep(session.wait_until_time_pause_long) - if count == 0: + else: raise Exception("Max retries exceeded, failing test...") - return False - elif count > 0: - logger.debug("users_of_bucket are ready to continue!") - return True + logger.debug("users_of_bucket are ready to continue!") + return True @staticmethod def specific_client(access_key_id, secret_access_key): diff --git a/services/api/api_user.py b/services/api/api_user.py index 3e38fd2..963280e 100644 --- a/services/api/api_user.py +++ b/services/api/api_user.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -50,13 +50,14 @@ from services.logging_service import LoggingServiceFactory logger = LoggingServiceFactory.get_logger() + class APIUser: @staticmethod # Update account API - only adds new SSH key! def update_account(user_content): r1 = None - token = APIUser.login_user(user_content['email']) + token = APIUser.login_user(user_content['email']) user_content['session_token'] = 'token ' + token sshKey = Helper.generate_sshpub_key() putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account' @@ -82,16 +83,20 @@ class APIUser: putURL, json=put_data, headers=headers, verify=False) Helper.internal_assert(r1.status_code, 200) logger.debug( - "SSH Key was added successfully to user " + user_content['full_name']) + "SSH Key was added successfully to user " + + user_content['full_name']) if not APIBridge.is_gitlab_ready(user_content): raise return sshKey - except: + except BaseException: if r1 is None: logger.error("Failed to add public SSH key to user.") else: - logger.error("PUT request failed to add SSH key to user, see response >>> %s %s \n %s" % ( - r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "PUT request failed to add SSH key to user, see " + + "response >>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod @@ -125,12 +130,15 @@ class APIUser: if not APIBridge.is_gitlab_ready(user_content): raise return True - except: + except BaseException: if r1 is None: logger.error("Failed to add public SSH key to user.") else: - logger.error("PUT request failed to add SSH key to user, see response >>> %s %s \n %s" % ( - r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "PUT request failed to add SSH key to user, " + + "see response >>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod @@ -168,7 +176,7 @@ class APIUser: logger.debug(str(r.status_code) + " " + r.reason) decoded_response = r.json() return decoded_response['token'] - except: + except BaseException: logger.debug("Failed to login.") raise @@ -195,12 +203,14 @@ class APIUser: if not APIBridge.is_gitlab_ready(user_content): raise return sshKey - except: + except BaseException: if r1 is None: logger.error("Failed to add public SSH key.") else: logger.error( - "POST request failed to add SSH key to user, see response >>> %s %s" % (r1.status_code, r1.reason)) + "POST request failed to add SSH key to user, " + + "see response >>> %s %s" % + (r1.status_code, r1.reason)) raise @staticmethod @@ -214,11 +224,11 @@ class APIUser: data = { "company": company, "full_name": Helper.rand_string("randomString"), - "email": Helper.rand_string("randomString") + "@" + email_domain, + "email": Helper.rand_string("randomString") + + "@" + email_domain, "phone_number": Constants.Default.Phone.TEXT, "password": Constants.Default.Password.TEXT, - "regular_email_updates": "True" - } + "regular_email_updates": "True"} return data # If failed - count the failure and add the error to list of errors. @@ -237,10 +247,19 @@ class APIUser: return True else: raise Exception( - "Failed to activate user >>> %s %s" % (r1.status_code, r1.reason)) + "Failed to activate user >>> %s %s" % + (r1.status_code, r1.reason)) @staticmethod - def signup_invited_user(company, invited_email, invite_token, invite_url, user_content, is_contact_user="false", activate=False, wait_for_gitlab=True): + def signup_invited_user( + company, + invited_email, + invite_token, + invite_url, + user_content, + is_contact_user="false", + activate=False, + wait_for_gitlab=True): r1 = None postURL = settings.ICE_EM_URL + '/v1/engmgr/signup' logger.debug("Post signup URL: " + postURL) @@ -279,12 +298,15 @@ class APIUser: if not APIBridge.is_gitlab_ready(user_content): raise return post_data - except: + except BaseException: if r1 is None: logger.error("Failed to sign up the invited team member.") else: - logger.error("POST request failed to sign up the invited team member, see response >>> %s %s \n %s" % ( - r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "POST request failed to sign up the invited " + + "team member, see response >>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod diff --git a/services/api/api_virtual_function.py b/services/api/api_virtual_function.py index 46610e8..193d6e2 100644 --- a/services/api/api_virtual_function.py +++ b/services/api/api_virtual_function.py @@ -43,7 +43,6 @@ import time from django.conf import settings import requests -from services.api.api_gitlab import APIGitLab from services.api.api_user import APIUser from services.constants import Constants, ServiceProvider from services.database.db_general import DBGeneral @@ -67,7 +66,7 @@ class APIVirtualFunction: headers['Authorization'] = user_content['session_token'] data = dict() # Create JSON data for post request. files_list = list() - if type(files) is list: + if isinstance(files, list): for file in files: files_list.append(file) else: @@ -85,13 +84,16 @@ class APIVirtualFunction: logger.debug("Next step was added to the engagement!") ns_uuid = r1.json() return ns_uuid[0]['uuid'] - except: + except BaseException: if r1 is None: logger.error( "Failed to add next step to VF " + user_content['vfName']) else: - logger.error("Failed to add next step to VF " + user_content[ - 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error("Failed to add next step to VF " + + user_content['vfName'] + + ", see response >>> %s %s.\nContent: %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod @@ -104,12 +106,15 @@ class APIVirtualFunction: headers = dict() # Create header for post request. headers['Content-type'] = 'application/json' headers['Authorization'] = 'token ' + token - jdata = [{"virtual_function": Helper.rand_string("randomString"), - "version": Helper.rand_string("randomString") + Helper.rand_string("randomNumber"), - "target_lab_entry_date": time.strftime("%Y-%m-%d"), - "target_aic_uuid": targetVersion, - "ecomp_release": ecompRelease, - "is_service_provider_internal": False}] + jdata = [ + { + "virtual_function": Helper.rand_string("randomString"), + "version": Helper.rand_string("randomString") + + Helper.rand_string("randomNumber"), + "target_lab_entry_date": time.strftime("%Y-%m-%d"), + "target_aic_uuid": targetVersion, + "ecomp_release": ecompRelease, + "is_service_provider_internal": False}] try: r1 = requests.post( postUrl, json=jdata, headers=headers, verify=False) @@ -117,12 +122,14 @@ class APIVirtualFunction: logger.debug("Virtual Function created successfully!") content = r1.content[1:-1] return content - except: + except BaseException: if r1 is None: logger.debug("Failed to create VF >>> request failed!") else: logger.debug( - "Failed to create VF >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + "Failed to create VF >>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod @@ -140,13 +147,15 @@ class APIVirtualFunction: logger.debug("Retrieved the Engagement successfully!") content = r1.content return json.loads(content) - except: + except BaseException: if r1 is None: logger.debug( "Failed to Retrieve the Engagement >>> request failed!") else: logger.debug( - "Failed to Retrieve the Engagement >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + "Failed to Retrieve the Engagement >>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod @@ -168,18 +177,25 @@ class APIVirtualFunction: postURL, json=list_data, headers=headers, verify=False) Helper.internal_assert_boolean(r1.status_code, 200) logger.debug("Invite sent successfully to email " + data['email']) - invite_token = DBGeneral.select_where_and("invitation_token", "ice_invitation", "email", data[ - 'email'], "engagement_uuid", user_content['engagement_uuid'], 1) - invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + invite_token + \ - "&email=" + data['email'] + invite_token = DBGeneral.select_where_and( + "invitation_token", + "ice_invitation", + "email", + data['email'], + "engagement_uuid", + user_content['engagement_uuid'], + 1) + invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + \ + invite_token + "&email=" + data['email'] logger.debug("Invitation URL is: " + invite_url) return data['email'], invite_token, invite_url - except: + except BaseException: if r1 is None: logger.error("Failed to invite team member.") else: logger.error( - "POST request failed to invite team member, see response >>> %s %s" % (r1.status_code, r1.reason)) + "POST request failed to invite team member, " + + "see response >>> %s %s" % (r1.status_code, r1.reason)) raise @staticmethod @@ -202,21 +218,30 @@ class APIVirtualFunction: postURL, json=data, headers=headers, verify=False) Helper.internal_assert_boolean(r1.status_code, 200) logger.debug("Invite sent successfully to email " + data['email']) - invite_token = DBGeneral.select_where_and("invitation_token", "ice_invitation", "email", data[ - 'email'], "engagement_uuid", user_content['engagement_uuid'], 1) - invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + invite_token + \ - "&email=" + data['email'] + "&full_name=" + data['full_name'] + \ - "&phone_number=" + \ - data['phone_number'] + "&company=" + \ + invite_token = DBGeneral.select_where_and( + "invitation_token", + "ice_invitation", + "email", + data['email'], + "engagement_uuid", + user_content['engagement_uuid'], + 1) + invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" +\ + invite_token + "&email=" + data['email'] +\ + "&full_name=" + data['full_name'] + \ + "&phone_number=" + data['phone_number'] + "&company=" + \ data['company'] + "&is_contact_user=true" logger.debug("Invitation URL is: " + invite_url) return data['email'], invite_token, invite_url - except: + except BaseException: if r1 is None: logger.error("Failed to invite vendor contact.") else: - logger.error("POST request failed to invite vendor contact, see response >>> %s %s \n %s" % ( - r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + logger.error( + "POST request failed to invite vendor contact, " + + "see response >>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod @@ -239,12 +264,16 @@ class APIVirtualFunction: postURL, json=data, headers=headers, verify=False) Helper.internal_assert_boolean(r1.status_code, 202) logger.debug("Next step was edited successfully!") - except: + except BaseException: if r1 is None: logger.error("Failed to edit next step uuid: " + ns_uuid) else: - logger.error("Failed to edit next step uuid: " + ns_uuid + - ", see response >>> %s %s" % (r1.status_code, r1.reason)) + logger.error( + "Failed to edit next step uuid: " + + ns_uuid + + ", see response >>> %s %s" % + (r1.status_code, + r1.reason)) raise @staticmethod @@ -259,7 +288,8 @@ class APIVirtualFunction: return r1.content else: raise Exception( - "Failed to activate user >>> %s %s" % (r1.status_code, r1.reason)) + "Failed to activate user >>> %s %s" % + (r1.status_code, r1.reason)) return False @staticmethod @@ -314,15 +344,19 @@ class APIVirtualFunction: putUrl, headers=headers, verify=False) Helper.internal_assert(r1.status_code, 202) logger.debug( - "Engagement stage was successfully changed to " + str(requested_stage) + "!") + "Engagement stage was successfully changed to " + + str(requested_stage) + + "!") content = r1.content[1:-1] return content - except: + except BaseException: if r1 is None: logger.debug("Failed to set eng stage >>> request failed!") else: logger.debug( - "Failed to set eng stage >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + "Failed to set eng stage >>> %s %s \n %s" % + (r1.status_code, r1.reason, str( + r1.content, 'utf-8'))) raise @staticmethod @@ -338,7 +372,7 @@ class APIVirtualFunction: r1 = requests.put(putURL, headers=headers, verify=False) Helper.internal_assert_boolean(r1.status_code, 200) logger.debug("AIC version has changed!") - except: + except BaseException: if r1 is None: msg = "Failed to edit AIC version" else: @@ -359,10 +393,11 @@ class APIVirtualFunction: r1 = requests.put(putURL, headers=headers, verify=False) Helper.internal_assert_boolean(r1.status_code, 200) logger.debug("AIC version has changed!") - except: + except BaseException: if r1 is None: msg = "Failed to update ECOMP release" else: - msg = "Failed to update ECOMP release, see response >>> %s %s" % ( - r1.status_code, r1.reason) + msg = "Failed to update ECOMP release," +\ + " see response >>> %s %s" % ( + r1.status_code, r1.reason) raise msg diff --git a/services/constants.py b/services/constants.py index 23d7ef1..b97ff6b 100644 --- a/services/constants.py +++ b/services/constants.py @@ -40,9 +40,9 @@ from django.conf import settings class ServiceProvider: - PROGRAM_NAME = "VVP" - MainServiceProvider = "ServiceProvider" - email = "example-domain.com" + PROGRAM_NAME = settings.PROGRAM_NAME + MainServiceProvider = settings.SERVICE_PROVIDER + email = settings.SERVICE_PROVIDER_DOMAIN class Constants: @@ -134,6 +134,7 @@ class Constants: ID = "toast-successfully-message" CMS_ID = "announcement-successfully-message" CSS = "html.ng-scope" + TEXT = "Important announcement: " class Cms: Toast_title_id = "toast-title-id" @@ -258,7 +259,8 @@ class Constants: TEXT = "Please fill CAPTCHA!" class NotMainVendor: - TEXT = "Email address should be with service provider domain for signees that their company =" \ + TEXT = "Email address should be with service provider " +\ + "domain for signees that their company =" \ + ServiceProvider.MainServiceProvider class HaveAccount: @@ -273,7 +275,8 @@ class Constants: class SubTitle: CSS = "h2.ng-binding" - TEXT = "Please follow the instructions below to activate your account." + TEXT = "Please follow the instructions below to " +\ + "activate your account." class Toast: TEXT = "Please activate your account first" @@ -283,7 +286,8 @@ class Constants: class Toast: class Success: - TEXT = "An email with detailed instructions on how to reset your password was sent to your Email." + TEXT = "An email with detailed instructions on how " +\ + "to reset your password was sent to your Email." class Title: CSS = "h1.ng-binding" @@ -291,7 +295,8 @@ class Constants: class SubTitle: CSS = "h2.ng-binding" - TEXT = "Please follow the instructions below to reset your password" + TEXT = "Please follow the instructions below to reset " +\ + "your password" class Button: TEXT = "Send Instructions" @@ -307,7 +312,8 @@ class Constants: class SubTitle: CSS = "h2.ng-binding" - TEXT = "Please follow the instructions below to update your password" + TEXT = "Please follow the instructions below to" +\ + " update your password" class Password: NAME = "password" @@ -490,7 +496,6 @@ class Constants: NAME = "ssh_key" class UpdateFailed: - # TEXT = "Something went wrong while trying to update user account" TEXT = "Updating SSH Key failed due to invalid key." class Update: @@ -584,7 +589,8 @@ class Constants: TEXT = "Statistics" class FilterDropdown: - CSS = "#statistics-header > .search-filters > .search-filter-stage" + CSS = "#statistics-header > .search-filters" +\ + " > .search-filter-stage" class ValidationsNumber: ID = "id-validations-num" @@ -751,7 +757,8 @@ class Constants: TEXT = "Remove user from engagement team: %s" class Message: - TEXT = "Are you sure you would like to remove the user out of the team members?" + TEXT = "Are you sure you would like to remove " +\ + "the user out of the team members?" class NextSteps: diff --git a/services/database/__init__.py b/services/database/__init__.py index 30d7152..32b601a 100644 --- a/services/database/__init__.py +++ b/services/database/__init__.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. diff --git a/services/database/db_bridge.py b/services/database/db_bridge.py index fc765c7..1eb79fa 100644 --- a/services/database/db_bridge.py +++ b/services/database/db_bridge.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -36,16 +36,21 @@ # ============LICENSE_END============================================ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. + + class DBBridge: """ - This class helps to use functions inside classes with circular import (dependencies). - Use this class only when there is circular import in one of the DB services. + This class helps to use functions inside classes + with circular import (dependencies). + Use this class only when there is circular + import in one of the DB services. """ @staticmethod def select_personal_next_step(user_email): - """select_personal_next_step: Originally can be found under DBUser class.""" + """select_personal_next_step: Originally """ +\ + """can be found under DBUser class.""" from services.database.db_user import DBUser return DBUser.select_personal_next_step(user_email) diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py index 04f8a44..0f8fd6e 100644 --- a/services/database/db_checklist.py +++ b/services/database/db_checklist.py @@ -54,14 +54,21 @@ logger = LoggingServiceFactory.get_logger() class DBChecklist: @staticmethod - def select_where_approval_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + def select_where_approval_state( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + fetchNum): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s' and state = 'approval';" % ( - queryColumnName, queryTableName, whereParametrType, whereParametrValue) + queryStr = \ + "select %s from %s " % (queryColumnName, queryTableName) +\ + "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\ + " and state = 'approval';" logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -74,25 +81,33 @@ class DBChecklist: result = result.partition('(')[-1].rpartition(',')[0] dbConn.close() logger.debug("Query result: " + str(result)) - if result == None: + if result is None: errorMsg = "select_where_approval_state FAILED " logger.error(errorMsg) raise return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where_approval_state FAILED " raise Exception(errorMsg, "select_where_approval_state FAILED") @staticmethod - def select_where_pr_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + def select_where_pr_state( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + fetchNum): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s' and state = 'peer_review';" % ( - queryColumnName, queryTableName, whereParametrType, whereParametrValue) + queryStr = \ + "select %s from %s " % (queryColumnName, queryTableName) +\ + "Where %s = '%s' and " % ( + whereParametrType, whereParametrValue) +\ + "state = 'peer_review';" logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -104,26 +119,33 @@ class DBChecklist: elif(result.find(",)") != -1): # formatting ints e.g id result = result.partition('(')[-1].rpartition(',')[0] dbConn.close() - if result == None: + if result is None: errorMsg = "select_where_pr_state FAILED " logger.error(errorMsg) raise logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where FAILED " raise Exception(errorMsg, "select_where") @staticmethod - def select_where_cl_not_archive(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + def select_where_cl_not_archive( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + fetchNum): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s' and state != 'archive';" % ( - queryColumnName, queryTableName, whereParametrType, whereParametrValue) + queryStr = \ + "select %s from %s " % (queryColumnName, queryTableName) +\ + "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\ + "and state != 'archive';" logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -138,19 +160,25 @@ class DBChecklist: logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where FAILED " raise Exception(errorMsg, "select_where") @staticmethod - def select_native_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + def select_native_where( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + fetchNum): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() queryStr = "select %s from %s Where %s = '%s';" % ( - queryColumnName, queryTableName, whereParametrType, whereParametrValue) + queryColumnName, queryTableName, whereParametrType, + whereParametrValue) logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -165,7 +193,7 @@ class DBChecklist: logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where FAILED " raise Exception(errorMsg, "select_where") @@ -176,14 +204,15 @@ class DBChecklist: DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "UPDATE ice_checklist SET state='review' Where name= '%s' and state= 'pending';" % ( - queryTableName) + queryStr = "UPDATE ice_checklist SET state='review' Where " +\ + "name= '%s' and state= 'pending';" % ( + queryTableName) logger.debug("Query : " + queryStr) cur.execute(queryStr) dbConn.commit() dbConn.close() # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "Could not Update User" raise Exception(errorMsg, "Update") @@ -194,8 +223,10 @@ class DBChecklist: DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "UPDATE ice_checklist_decision SET review_value='approved' , peer_review_value='approved' Where checklist_id = '%s';" % ( - whereParametrValue) + queryStr = "UPDATE ice_checklist_decision SET " +\ + "review_value='approved' , peer_review_value='approved' " +\ + "Where checklist_id = '%s';" % ( + whereParametrValue) logger.debug(queryStr) cur.execute(queryStr) dbConn.commit() @@ -226,7 +257,7 @@ class DBChecklist: break return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "is_archive FAILED " raise Exception(errorMsg, "is_archive") @@ -248,14 +279,15 @@ class DBChecklist: @staticmethod def get_admin_email(checklistUuid): try: + # Fetch one AT&T user ID. owner_id = DBChecklist.select_where_approval_state( - "owner_id", "ice_checklist", "uuid", checklistUuid, 1) # Fetch one AT&T user ID. + "owner_id", "ice_checklist", "uuid", checklistUuid, 1) engLeadEmail = DBGeneral.select_where( "email", "ice_user_profile", "id", owner_id, 1) logger.debug("get_admin_email = " + engLeadEmail) return engLeadEmail # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "get_admin_email FAILED " raise Exception(errorMsg, "get_admin_email") @@ -270,7 +302,7 @@ class DBChecklist: logger.debug("getPreeReviewerEngLeadEmail = " + engLeadEmail) return engLeadEmail # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "get_admin_email FAILED " raise Exception(errorMsg, "get_owner_email") @@ -279,83 +311,181 @@ class DBChecklist: checklistTempid = DBGeneral.select_where( "template_id", "ice_checklist", "name", checklistName, 1) checklistLineItems = DBGeneral.select_where_and( - "uuid", "ice_checklist_line_item", "line_type", "auto", "template_id", checklistTempid, 0) + "uuid", + "ice_checklist_line_item", + "line_type", + "auto", + "template_id", + checklistTempid, + 0) for lineItem in checklistLineItems: setParametrType2 = "peer_review_value" setParametrValue2 = "approved" whereParametrType2 = "lineitem_id" whereParametrValue2 = lineItem - DBGeneral.update_where_and("ice_checklist_decision", "review_value", checklistUuid, "approved", - "checklist_id", setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2) + DBGeneral.update_where_and( + "ice_checklist_decision", + "review_value", + checklistUuid, + "approved", + "checklist_id", + setParametrType2, + setParametrValue2, + whereParametrType2, + whereParametrValue2) @staticmethod def checkChecklistIsUpdated(): - query = "select uuid from ice_checklist_section where template_id in (select template_id from ice_checklist_template where name='{template_name}') and name='{section_name}'".format( - template_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, section_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT) + query = "select uuid from ice_checklist_section where template_id " +\ + "in (select template_id from ice_checklist_template where " +\ + "name='{template_name}') and name='{section_name}'".format( + template_name=Constants.Dashboard.LeftPanel. + EditChecklistTemplate.HEAT, section_name=Constants. + Dashboard.LeftPanel.EditChecklistTemplate.HEAT) return DBGeneral.select_query(query) @staticmethod def fetchEngByVfName(vfName): # Fetch one AT&T user ID. - return DBGeneral.select_where("engagement_id", "ice_vf", "name", vfName, 1) + return DBGeneral.select_where( + "engagement_id", "ice_vf", "name", vfName, 1) @staticmethod def fetchEngManIdByEngUuid(engagement_id): - return DBGeneral.select_where("engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + return DBGeneral.select_where( + "engagement_manual_id", + "ice_engagement", + "uuid", + engagement_id, + 1) @staticmethod def fetchChecklistByName(checklistName): - query = "select uuid from ice_checklist where name='{cl_name}'".format( - cl_name=checklistName) + query = "select uuid from ice_checklist where " +\ + "name='{cl_name}'".format( + cl_name=checklistName) return DBGeneral.select_query(query) @staticmethod def create_default_heat_teampleate(): - template_query = "INSERT INTO public.ice_checklist_template(uuid, name, category, version, create_time)"\ + template_query = "INSERT INTO public.ice_checklist_template(uuid, " +\ + "name, category, version, create_time)"\ "VALUES ('%s', '%s', '%s', '%s', '%s');" % ( - str(uuid4()), 'Editing Heat', 'first category', '1', timezone.now()) + str(uuid4()), 'Editing Heat', 'first category', '1', + timezone.now()) DBGeneral.insert_query(template_query) template_id = DBGeneral.select_query( - "SELECT uuid FROM public.ice_checklist_template where name = 'Editing Heat'") + "SELECT uuid FROM public.ice_checklist_template where " + + "name = 'Editing Heat'") # SECTIONS - section1_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'External References', - '1', 'section descripyion', 'valid instructions', timezone.now(), template_id) + section1_query = "INSERT INTO public.ice_checklist_section(uuid, " +\ + "name, weight, description, validation_instructions, " +\ + "create_time, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % ( + str(uuid4()), 'External References', + '1', 'section descripyion', 'valid instructions', + timezone.now(), template_id) DBGeneral.insert_query(section1_query) section1_id = DBGeneral.select_query( - ("""SELECT uuid FROM public.ice_checklist_section where name = 'External References' and template_id = '{s}'""").format(s=template_id)) - section2_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'Parameter Specification', - '2', 'section descripyion', 'valid instructions', timezone.now(), template_id) + ("""SELECT uuid FROM public.ice_checklist_section """ + + """where name = 'External References' """ + + """and template_id = '{s}'""").format( + s=template_id)) + section2_query = "INSERT INTO public.ice_checklist_section(uuid, " +\ + "name, weight, description, validation_instructions, " +\ + "create_time, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % ( + str(uuid4()), 'Parameter Specification', + '2', 'section descripyion', 'valid instructions', + timezone.now(), template_id) DBGeneral.insert_query(section2_query) section2_id = DBGeneral.select_query( - ("""SELECT uuid FROM public.ice_checklist_section where name = 'Parameter Specification' and template_id = '{s}'""").format(s=template_id)) + ("""SELECT uuid FROM public.ice_checklist_section """ + + """where name = """ + + """'Parameter Specification' and template_id = '{s}'""").format( + s=template_id)) # Line items - line_item1 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Normal references', '1', 'Numeric parameters should include range and/or allowed values.', 'manual', - 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section1_id, template_id) + line_item1 = \ + "INSERT INTO public.ice_checklist_line_item(uuid, " +\ + "name, weight, description, line_type, validation_instructions," +\ + "create_time,section_id, template_id) "\ + "VALUES ('%s', '%s', " % (str(uuid4()), 'Normal references') +\ + "'%s', " % '1' +\ + "'%s'," % 'Numeric parameters should include ' +\ + 'range and/or allowed values.' +\ + " '%s'," % 'manual', +\ + "'%s'" % 'Here are some useful tips ' +\ + 'for how to validate this item ' +\ + 'in the most awesome way:<br><br><ul><li>Here is my ' +\ + 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +\ + 'Here is my awesome tip 3</li></ul>' +\ + ", '%s'" % timezone.now() +\ + ", '%s'," % section1_id +\ + " '%s');" % template_id DBGeneral.insert_query(line_item1) - line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'String parameters', '2', 'Numeric parameters should include range and/or allowed values.', 'auto', - 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id) + line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\ + "name, weight, description, line_type, validation_instructions," +\ + "create_time, section_id, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % ( + str(uuid4()), 'String parameters', '2', + 'Numeric parameters should include range ' + + 'and/or allowed values.', 'auto', + 'Here are some useful tips for how to validate this item ' + + 'in the most awesome way:<br><br><ul><li>Here is my ' + + 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' + + 'Here is my awesome tip 3</li></ul>', timezone.now(), + section2_id, template_id) DBGeneral.insert_query(line_item2) - line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Numeric parameters', '3', 'Numeric parameters should include range and/or allowed values.', 'manual', - 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id) + line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\ + "name, weight, description, line_type, validation_instructions," +\ + "create_time,section_id, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', " +\ + "'%s', '%s', '%s');" % ( + str(uuid4()), 'Numeric parameters', '3', + 'Numeric parameters should include range and/or ' + + 'allowed values.', 'manual', + 'Here are some useful tips for how to validate this item ' + + 'in the most awesome way:<br><br><ul><li>Here is my ' + + 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' + + 'Here is my awesome tip 3</li></ul>', timezone.now(), + section2_id, template_id) DBGeneral.insert_query(line_item3) - line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'VF image', '2', 'Numeric parameters should include range and/or allowed values.', 'auto', - 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section1_id, template_id) + line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\ + "name, weight, description, line_type, " +\ + "validation_instructions,create_time, section_id, " +\ + "template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', " +\ + "'%s', '%s');" % ( + str(uuid4()), 'VF image', '2', + 'Numeric parameters should include range and/or ' + + 'allowed values.', 'auto', + 'Here are some useful tips for how to validate this ' + + 'item in the most awesome way:<br><br><ul><li>Here is ' + + 'my awesome tip 1</li><li>Here is my awesome tip 2' + + '</li><li>Here is my awesome tip 3</li></ul>', + timezone.now(), section1_id, template_id) DBGeneral.insert_query(line_item4) - line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\ - "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Parameters', '1', 'Numeric parameters should include range and/or allowed values.', 'auto', - 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id) + line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\ + "name, weight, description, line_type, validation_instructions," +\ + "create_time,section_id, template_id) "\ + "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s'," +\ + " '%s', '%s');" % (str( + uuid4()), 'Parameters', '1', + 'Numeric parameters should include range ' + + 'and/or allowed values.', 'auto', + 'Here are some useful tips for how to validate this item ' + + 'in the most awesome way:<br><br><ul><li>Here is my awesome ' + + 'tip 1</li><li>Here is my awesome tip 2</li><li>Here is my ' + + 'awesome tip 3</li></ul>', timezone.now(), section2_id, + template_id) DBGeneral.insert_query(line_item5) @staticmethod def create_editing_cl_template_if_not_exist(): - template_id = DBGeneral.select_query(("""SELECT uuid FROM public.ice_checklist_template where name = '{s}'""").format( - s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT)) + template_id = DBGeneral.select_query( + ("""SELECT uuid FROM public.ice_checklist_template """ + + """where name = '{s}'""").format( + s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT)) if template_id == 'None': DBChecklist.create_default_heat_teampleate() session.createTemplatecount = True @@ -366,10 +496,14 @@ class DBChecklist: "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, "create_time")[0] counter = 0 - while get_state != expected_state and counter <= Constants.DBConstants.RETRIES_NUMBER: + while get_state != expected_state and \ + counter <= Constants.DBConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause_long) - logger.debug("Checklist state not changed yet , expecting state: %s, current result: %s (attempt %s of %s)" % ( - expected_state, get_state, counter, Constants.DBConstants.RETRIES_NUMBER)) + logger.debug( + "Checklist state not changed yet ," + + "expecting state: %s, current result: %s (attempt %s of %s)" % + (expected_state, get_state, counter, + Constants.DBConstants.RETRIES_NUMBER)) counter += 1 get_state = DBGeneral.select_where_order_by_desc( "state", Constants.DBConstants.IceTables.CHECKLIST, @@ -380,7 +514,9 @@ class DBChecklist: expected_state + ", and was verified over the DB") return expected_state raise Exception( - "Expected checklist state never arrived " + expected_state, get_state) + "Expected checklist state never arrived " + + expected_state, + get_state) @staticmethod def get_recent_checklist_uuid(name): diff --git a/services/database/db_cms.py b/services/database/db_cms.py index 3c2b2c6..288121a 100644 --- a/services/database/db_cms.py +++ b/services/database/db_cms.py @@ -1,5 +1,4 @@ - -# ============LICENSE_START========================================== +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -39,17 +38,9 @@ import psycopg2 from wheel.signatures import assertTrue -from services.constants import Constants from services.database.db_general import DBGeneral -from services.frontend.base_actions.click import Click -from services.frontend.base_actions.enter import Enter -from services.frontend.base_actions.wait import Wait -from services.frontend.fe_dashboard import FEDashboard -from services.frontend.fe_general import FEGeneral -from services.frontend.fe_user import FEUser from services.helper import Helper from services.logging_service import LoggingServiceFactory -from services.session import session logger = LoggingServiceFactory.get_logger() @@ -70,7 +61,7 @@ class DBCMS: dbConn.close() logger.debug("Insert query success!") # If failed - count the failure and add the error to list of errors. - except: + except BaseException: raise Exception("Couldn't fetch answer using the given query.") @staticmethod @@ -86,7 +77,7 @@ class DBCMS: dbConn.close() logger.debug("Update query success!") # If failed - count the failure and add the error to list of errors. - except: + except BaseException: raise Exception("Couldn't fetch answer using the given query.") @staticmethod @@ -107,14 +98,14 @@ class DBCMS: logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: raise Exception("Couldn't fetch answer using the given query.") @staticmethod def get_cms_category_id(categoryName): logger.debug("Get DBCMS category id for name: " + categoryName) - queryStr = "SELECT id FROM public.blog_blogcategory WHERE title = '%s' LIMIT 1;" % ( - categoryName) + queryStr = "SELECT id FROM public.blog_blogcategory WHERE " +\ + "title = '%s' LIMIT 1;" % (categoryName) logger.debug("Query : " + queryStr) result = DBCMS.select_query(queryStr) return result @@ -123,9 +114,17 @@ class DBCMS: def insert_cms_new_post(title, description, categoryName): logger.debug("Insert new post : " + title) queryStr = "INSERT INTO public.blog_blogpost" \ - "(comments_count, keywords_string, rating_count, rating_sum, rating_average, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, content, allow_comments, featured_image, site_id, user_id) "\ - "VALUES (0, '', 0, 0, 0, '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '2 day', 2, current_timestamp - interval '1 day', NULL, '', true, '<p>%s</p>', true, '', 1, 1);" % ( - title, title, description, description) + "(comments_count, keywords_string, rating_count, rating_sum, " +\ + "rating_average, title, slug, _meta_title, description, " +\ + "gen_description, created, updated, status, publish_date, " +\ + "expiry_date, short_url, in_sitemap, content, allow_comments, " +\ + "featured_image, site_id, user_id) "\ + "VALUES (0, '', 0, 0, 0, " +\ + "'%s', '%s-slug', " % (title, title) +\ + "'', '%s', true, " % description +\ + "current_timestamp - interval '1 day', current_timestamp - " +\ + "interval '2 day', 2, current_timestamp - interval '1 day', " +\ + "NULL, '', true, '<p>%s</p>', true, '', 1, 1);" % description logger.debug("Query : " + queryStr) DBCMS.insert_query(queryStr) post_id = DBCMS.get_last_added_post_id() @@ -144,9 +143,9 @@ class DBCMS: @staticmethod def update_days(xdays, title): logger.debug("Get the id of the post inserted") -# queryStr = "select MAX(id) FROM public.blog_blogpost;" - queryStr = "UPDATE public.blog_blogpost SET created=current_timestamp - interval '%s day' WHERE title='%s';" % ( - xdays, title) + queryStr = "UPDATE public.blog_blogpost SET " +\ + "created=current_timestamp - interval '%s day' " % xdays +\ + "WHERE title='%s';" % title logger.debug("Query : " + queryStr) result = DBCMS.update_query(queryStr) return result @@ -154,15 +153,17 @@ class DBCMS: @staticmethod def add_category_to_post(postId, categoryId): logger.debug("bind category into inserted post: " + postId) - queryStr = "INSERT INTO public.blog_blogpost_categories(blogpost_id, blogcategory_id) VALUES (%s, %s);" % ( - postId, categoryId) + queryStr = "INSERT INTO public.blog_blogpost_categories" +\ + "(blogpost_id, blogcategory_id) " +\ + "VALUES (%s, %s);" % (postId, categoryId) logger.debug("Query : " + queryStr) DBCMS.insert_query(queryStr) @staticmethod def get_documentation_page_id(): logger.debug("Retrive id of documentation page: ") - queryStr = "SELECT id FROM public.pages_page WHERE title = 'Documentation' LIMIT 1;" + queryStr = "SELECT id FROM public.pages_page WHERE " +\ + "title = 'Documentation' LIMIT 1;" logger.debug("Query : " + queryStr) result = DBCMS.select_query(queryStr) return result @@ -191,17 +192,27 @@ class DBCMS: if parent_id is None: parent_id = DBCMS.get_documentation_page_id() queryStr = "INSERT INTO public.pages_page(" \ - "keywords_string, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, _order, in_menus, titles, content_model, login_required, parent_id, site_id)" \ - "VALUES ('', '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '1 day', 2, current_timestamp - interval '1 day', NULL, '', true, 0, '1,2,3', '%s', 'richtextpage', true, %s, 1);" % ( - title, title, content, title, parent_id) + "keywords_string, title, slug, _meta_title, description, " +\ + "gen_description, created, updated, status, publish_date, " +\ + "expiry_date, short_url, in_sitemap, _order, in_menus, titles, " +\ + "content_model, login_required, parent_id, site_id)" \ + "VALUES ('', " +\ + "'%s', '%s-slug'" % (title, title) +\ + ", '', '%s', true, " % content +\ + "current_timestamp - interval '1 day', current_timestamp " +\ + "- interval '1 day', 2, current_timestamp - interval '1 day', " +\ + "NULL, '', true, 0, '1,2,3', " +\ + "'%s', 'richtextpage', " % title +\ + "true, %s, 1);" % parent_id logger.debug("Query : " + queryStr) DBCMS.insert_query(queryStr) createdPageId = DBCMS.get_last_inserted_page_id() logger.debug( "Bind the page with the rich text content related to this page") - queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, content) VALUES (%s, '<p>%s</p>');" % ( - createdPageId, content) + queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, " +\ + "content) VALUES (%s, '<p>%s</p>');" % ( + createdPageId, content) logger.debug("Query : " + queryStr) DBCMS.insert_query(queryStr) return createdPageId @@ -244,8 +255,10 @@ class DBCMS: @staticmethod def update_X_days_back_post(title, xdays): logger.debug("Get the id of the post inserted") - queryStr = "UPDATE blog_blogpost SET created = current_timestamp - interval '%s day', publish_date=current_timestamp - interval '%s day' WHERE title= '%s' ;" % ( - xdays, xdays, title) + queryStr = "UPDATE blog_blogpost SET created = current_timestamp" +\ + " - interval '%s day', " % xdays +\ + "publish_date=current_timestamp - " +\ + "interval '%s day' WHERE title= '%s' ;" % (xdays, title) logger.debug("Query : " + queryStr) DBCMS.update_query(queryStr) diff --git a/services/database/db_general.py b/services/database/db_general.py index c850d3a..2c83fb0 100755 --- a/services/database/db_general.py +++ b/services/database/db_general.py @@ -1,5 +1,4 @@ - -# ============LICENSE_START========================================== +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -48,22 +47,31 @@ from services.logging_service import LoggingServiceFactory logger = LoggingServiceFactory.get_logger() + class DBGeneral: @staticmethod # desigredDB: Use 'default' for CI General and 'em_db' for EM General # (according to settings.DATABASES). def return_db_native_connection(desigredDB): - dbConnectionStr = "dbname='" + str(settings.SINGLETONE_DB[desigredDB]['NAME']) + \ + dbConnectionStr = "dbname='" + str( + settings.SINGLETONE_DB[desigredDB]['NAME']) + \ "' user='" + str(settings.SINGLETONE_DB[desigredDB]['USER']) + \ "' host='" + str(settings.SINGLETONE_DB[desigredDB]['HOST']) + \ - "' password='" + str(settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \ + "' password='" + str( + settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \ "' port='" + \ str(settings.SINGLETONE_DB[desigredDB]['PORT']) + "'" return dbConnectionStr @staticmethod - def insert_results(testType, testFeature, testResult, testName, testDuration, notes=" "): + def insert_results( + testType, + testFeature, + testResult, + testName, + testDuration, + notes=" "): try: if settings.DATABASE_TYPE == 'sqlite': dbfile = str(settings.DATABASES['default']['TEST_NAME']) @@ -80,13 +88,20 @@ class DBGeneral: raise Exception(errorMsg) try: # Create INSERT query. if settings.DATABASE_TYPE == 'sqlite': - query_str = 'INSERT INTO ice_test_results (testType, testFeature, testResult, testName, notes,'\ - 'create_time, build_id, duration) VALUES (?, ?, ?, ?, ?, ?, ?, ?);' + query_str = 'INSERT INTO ice_test_results ' +\ + '(testType, testFeature, testResult, testName, notes,'\ + 'create_time, build_id, duration) VALUES ' +\ + '(?, ?, ?, ?, ?, ?, ?, ?);' else: - query_str = 'INSERT INTO ice_test_results ("testType", "testFeature", "testResult", "testName", notes,'\ - 'create_time, build_id, duration) VALUES (%s, %s, %s, %s, %s, %s, %s, %s);' - cur.execute(query_str, (testType, testFeature, testResult, testName, notes, str(datetime.now()), - settings.ICE_BUILD_REPORT_NUM, testDuration)) + query_str = 'INSERT INTO ice_test_results ("testType", ' +\ + '"testFeature", "testResult", "testName", notes,'\ + 'create_time, build_id, duration) VALUES ' +\ + '(%s, %s, %s, %s, %s, %s, %s, %s);' + cur.execute(query_str, (testType, testFeature, testResult, + testName, notes, + str(datetime.now()), + settings.ICE_BUILD_REPORT_NUM, + testDuration)) dbConn.commit() logger.debug("Test result in DB - " + testResult) except Exception as e: @@ -123,7 +138,7 @@ class DBGeneral: dbConn.close() logger.debug("Query result: " + str(result)) return result - except: + except BaseException: raise Exception("Couldn't fetch answer using the given query.") @staticmethod @@ -182,7 +197,7 @@ class DBGeneral: logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where_email FAILED " raise Exception(errorMsg, "select_where_email") raise @@ -213,13 +228,19 @@ class DBGeneral: raise Exception(errorMsg, "select_from") @staticmethod - def select_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum): + def select_where( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + fetchNum): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) cur = dbConn.cursor() queryStr = "select %s from %s Where %s = '%s';" % ( - queryColumnName, queryTableName, whereParametrType, whereParametrValue) + queryColumnName, queryTableName, whereParametrType, + whereParametrValue) logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -234,17 +255,24 @@ class DBGeneral: logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where FAILED " raise Exception(errorMsg, "select_where") @staticmethod - def select_where_order_by_desc(queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by): + def select_where_order_by_desc( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + order_by): dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s' order by %s desc limit 1;" \ - % (queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by) + queryStr = \ + "select %s from %s " % (queryColumnName, queryTableName,) +\ + "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\ + "order by %s desc limit 1;" % order_by logger.debug("Query : " + queryStr) cur.execute(queryStr) result = str(cur.fetchall()) @@ -274,14 +302,22 @@ class DBGeneral: return result @staticmethod - def select_where_not_and_order_by_desc(queryColumnName, queryTableName, whereParametrType, - whereParametrValue, parametrTypeAnd, parametrAnd, order_by): + def select_where_not_and_order_by_desc( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + parametrTypeAnd, + parametrAnd, + order_by): dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s' and %s != '%s' order by %s desc limit 1;" \ - % (queryColumnName, queryTableName, whereParametrType, whereParametrValue, - parametrTypeAnd, parametrAnd, order_by) + queryStr = \ + "select %s from %s " % (queryColumnName, queryTableName) +\ + "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\ + "and %s != '%s' " % (parametrTypeAnd, parametrAnd) +\ + "order by %s desc limit 1;" % order_by logger.debug("Query : " + queryStr) cur.execute(queryStr) result = str(cur.fetchall()) @@ -290,15 +326,22 @@ class DBGeneral: return result @staticmethod - def select_where_and(queryColumnName, queryTableName, whereParametrType, whereParametrValue, - parametrTypeAnd, parametrAnd, fetchNum): + def select_where_and( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + parametrTypeAnd, + parametrAnd, + fetchNum): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() queryStr = "select %s from %s Where %s = '%s' and %s = '%s';" % ( - queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd) + queryColumnName, queryTableName, whereParametrType, + whereParametrValue, parametrTypeAnd, parametrAnd) logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -314,19 +357,27 @@ class DBGeneral: logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where_and FAILED " raise Exception(errorMsg, "select_where_and") @staticmethod - def select_where_is_bigger(queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd, fetchNum): + def select_where_is_bigger( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + parametrTypeAnd, + parametrAnd, + fetchNum): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() queryStr = "select %s from %s Where %s = '%s' and %s > %s;" % ( - queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd) + queryColumnName, queryTableName, whereParametrType, + whereParametrValue, parametrTypeAnd, parametrAnd) logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -340,19 +391,25 @@ class DBGeneral: dbConn.close() return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where_is_bigger FAILED " raise Exception(errorMsg, "select_where_is_bigger") @staticmethod - def update_where(queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue): + def update_where( + queryTableName, + setParametrType, + setparametrValue, + whereParametrType, + whereParametrValue): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() queryStr = "UPDATE %s SET %s = '%s' Where %s = '%s';" % ( - queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue) + queryTableName, setParametrType, setparametrValue, + whereParametrType, whereParametrValue) cur.execute(queryStr) dbConn.commit() logger.debug("Query : " + queryStr) @@ -376,25 +433,37 @@ class DBGeneral: dbConn.commit() dbConn.close() # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "Could not Update User" raise Exception(errorMsg, "Update") @staticmethod - def update_where_and(queryTableName, setParametrType, parametrValue, changeToValue, whereParametrType, setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2): + def update_where_and( + queryTableName, + setParametrType, + parametrValue, + changeToValue, + whereParametrType, + setParametrType2, + setParametrValue2, + whereParametrType2, + whereParametrValue2): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "UPDATE %s SET %s = '%s', %s = '%s' Where %s = '%s' and %s = '%s';" % ( - queryTableName, setParametrType, changeToValue, setParametrType2, setParametrValue2, whereParametrType, parametrValue, whereParametrType2, whereParametrValue2) + queryStr = "UPDATE %s SET " % queryTableName +\ + "%s = '%s', " % (setParametrType, changeToValue) +\ + "%s = '%s' Where " % (setParametrType2, setParametrValue2) +\ + "%s = '%s' " % (whereParametrType, parametrValue) +\ + "and %s = '%s';" % (whereParametrType2, whereParametrValue2) logger.debug("Query : " + queryStr) cur.execute(queryStr) dbConn.commit() dbConn.close() # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "Could not Update User" raise Exception(errorMsg, "Update") diff --git a/services/database/db_user.py b/services/database/db_user.py index d347dd2..10d02ff 100644 --- a/services/database/db_user.py +++ b/services/database/db_user.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -52,6 +52,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class DBUser: @staticmethod @@ -69,24 +70,39 @@ class DBUser: return activationUrl @staticmethod - def get_contact_signup_url(invite_token, uuid, email, fullName, phoneNum, companyName): + def get_contact_signup_url( + invite_token, + uuid, + email, + fullName, + phoneNum, + companyName): companyId = DBGeneral.select_where( "uuid", "ice_vendor", "name", companyName, 1) - signUpURLforContact = settings.ICE_PORTAL_URL + "#/signUp?invitation=" + invite_token + \ + signUpURLforContact = settings.ICE_PORTAL_URL + \ + "#/signUp?invitation=" + invite_token + \ "&email=" + email + "&full_name=" + fullName + \ "&phone_number=" + phoneNum + "&company=" + companyId logger.debug("SignUpURLforContact :" + signUpURLforContact) return signUpURLforContact @staticmethod - def select_invitation_token(queryColumnName, queryTableName, whereParametrType, whereParametrValue, email, fetchNum): + def select_invitation_token( + queryColumnName, + queryTableName, + whereParametrType, + whereParametrValue, + email, + fetchNum): try: dbConn = psycopg2.connect( DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "select %s from %s Where %s = '%s' and email = '%s' ;" % ( - queryColumnName, queryTableName, whereParametrType, whereParametrValue, email) + queryStr = \ + "select %s from %s Where %s = '%s' and email = '%s' ;" % ( + queryColumnName, queryTableName, whereParametrType, + whereParametrValue, email) logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -98,14 +114,14 @@ class DBUser: elif(result.find(",)") != -1): # formatting ints e.g id result = result.partition('(')[-1].rpartition(',')[0] dbConn.close() - if result == None: + if result is None: errorMsg = "select_where_pr_state FAILED " logger.error(errorMsg) raise logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where FAILED " raise Exception(errorMsg, "select_where") @@ -115,27 +131,30 @@ class DBUser: # Fetch one AT&T user ID. engagement_id = DBVirtualFunction.select_eng_uuid(vfName) engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) reviewer_id = DBGeneral.select_where( - "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + "reviewer_id", + "ice_engagement", + "engagement_manual_id", + engagement_manual_id, + 1) engLeadFullName = DBGeneral.select_where_and( - "full_name", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1) + "full_name", "ice_user_profile", "id", reviewer_id, + "role_id", "2", 1) return engLeadFullName # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "get_el_name FAILED " raise Exception(errorMsg, "get_el_name") @staticmethod def get_email_by_full_name(fullname): # try: - query_str = "select email from ice_user_profile where full_name = '%s';" % ( - fullname) + query_str = "select email from ice_user_profile where " +\ + "full_name = '%s';" % (fullname) user_email = DBGeneral.select_query(query_str) return user_email -# except: # If failed - count the failure and add the error to list of errors. -# errorMsg = "get_email_by_full_name FAILED " -# raise Exception(errorMsg, "get_el_name") @staticmethod def select_recent_vf_of_user(user_uuid, fetchNum): @@ -144,8 +163,9 @@ class DBUser: DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "SELECT vf_id FROM public.ice_recent_engagement where user_uuid = '%s' order by last_update desc limit 20;" % ( - user_uuid) + queryStr = "SELECT vf_id FROM public.ice_recent_engagement " +\ + "where user_uuid = '%s' order by last_update " % user_uuid +\ + "desc limit 20;" logger.debug("Query : " + queryStr) cur.execute(queryStr) if (fetchNum == 0): @@ -160,7 +180,7 @@ class DBUser: logger.debug("Query result: " + str(result)) return result # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_where FAILED " raise Exception(errorMsg, "select_where") @@ -170,14 +190,20 @@ class DBUser: # Fetch one AT&T user ID. engagement_id = DBVirtualFunction.select_eng_uuid(vfName) engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) reviewer_id = DBGeneral.select_where( - "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + "reviewer_id", + "ice_engagement", + "engagement_manual_id", + engagement_manual_id, + 1) engLeadEmail = DBGeneral.select_where_and( - "email", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1) + "email", "ice_user_profile", "id", reviewer_id, "role_id", + "2", 1) return engLeadEmail # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_el_email FAILED " raise Exception(errorMsg, "select_el_email") @@ -188,14 +214,15 @@ class DBUser: engLeadId = DBUser.select_user_profile_property(email, "id") return engLeadId # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_user_native_id FAILED " raise Exception(errorMsg, "select_user_native_id") @staticmethod def select_personal_next_step(email): user_id = DBUser.select_user_native_id(email) - return DBGeneral.select_where("uuid", "ice_next_step", "owner_id", user_id, 1) + return DBGeneral.select_where( + "uuid", "ice_next_step", "owner_id", user_id, 1) @staticmethod def select_pr_email(vfName): @@ -203,14 +230,19 @@ class DBUser: # Fetch one AT&T user ID. engagement_id = DBVirtualFunction.select_eng_uuid(vfName) engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) reviewer_id = DBGeneral.select_where( - "peer_reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + "peer_reviewer_id", + "ice_engagement", + "engagement_manual_id", + engagement_manual_id, + 1) engLeadEmail = DBGeneral.select_where( "email", "ice_user_profile", "id", reviewer_id, 1) return engLeadEmail # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_el_email FAILED " raise Exception(errorMsg, "select_el_email") @@ -223,16 +255,30 @@ class DBUser: return notifIDs @staticmethod - def get_not_seen_notifications_number_by_email(user_email, is_negative=False): + def get_not_seen_notifications_number_by_email( + user_email, is_negative=False): user_id = DBGeneral.select_where_email( "id", Constants.DBConstants.IceTables.USER_PROFILE, user_email) notifications_number = DBGeneral.select_where_and( - Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1) + Constants.DBConstants.Queries.COUNT, + Constants.DBConstants.IceTables.NOTIFICATION, + "user_id", + user_id, + "is_read", + "False", + 1) if is_negative: counter = 0 - while notifications_number != "0" and counter <= Constants.Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER: + while notifications_number != "0" and counter <= Constants.\ + Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER: notifications_number = DBGeneral.select_where_and( - Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1) + Constants.DBConstants.Queries.COUNT, + Constants.DBConstants.IceTables.NOTIFICATION, + "user_id", + user_id, + "is_read", + "False", + 1) time.sleep(1) counter += 1 return notifications_number @@ -240,9 +286,14 @@ class DBUser: @staticmethod def get_eng_lead_email_per_enguuid(enguuid): reviewer_id = DBGeneral.select_where( - "reviewer_id", Constants.DBConstants.IceTables.ENGAGEMENT, "uuid", enguuid, 1) + "reviewer_id", + Constants.DBConstants.IceTables.ENGAGEMENT, + "uuid", + enguuid, + 1) engLeadEmail = DBGeneral.select_where( - "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", reviewer_id, 1) + "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", + reviewer_id, 1) return engLeadEmail @staticmethod @@ -252,8 +303,12 @@ class DBUser: DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "select COUNT(*) from ice_engagement_engagement_team Where iceuserprofile_id = %s and (select engagement_stage from public.ice_engagement where uuid = engagement_id LIMIT 1) != 'Archived';" % ( - engLeadID) + queryStr = "select COUNT(*) from ice_engagement_engagement_team" +\ + " Where iceuserprofile_id = %s" % engLeadID +\ + " and (select " +\ + "engagement_stage from public.ice_engagement " +\ + "where uuid = engagement_id LIMIT 1) != 'Archived';" + logger.debug("Query : " + queryStr) cur.execute(queryStr) result = cur.fetchall() @@ -262,7 +317,7 @@ class DBUser: logger.debug(result[0][0]) return result[0][0] # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_user_engagements_by_stage FAILED " raise Exception(errorMsg, "select_user_engagements_by_stage") @@ -273,8 +328,14 @@ class DBUser: DBGeneral.return_db_native_connection('em_db')) dbConn = dbConn cur = dbConn.cursor() - queryStr = "select count(*) from ice_engagement INNER JOIN ice_engagement_engagement_team ON ice_engagement_engagement_team.engagement_id= ice_engagement.uuid Where (ice_engagement.engagement_stage = '%s') and (ice_engagement_engagement_team.iceuserprofile_id = %s );" % ( - stage, engLeadID) + queryStr = "select count(*) from ice_engagement INNER JOIN " +\ + "ice_engagement_engagement_team ON " +\ + "ice_engagement_engagement_team.engagement_id= " +\ + "ice_engagement.uuid Where " +\ + "(ice_engagement.engagement_stage " +\ + "= '%s') and " % stage +\ + "(ice_engagement_engagement_team.iceuserprofile_id = " +\ + "%s );" % engLeadID logger.debug("Query : " + queryStr) cur.execute(queryStr) result = cur.fetchall() @@ -283,7 +344,7 @@ class DBUser: logger.debug(result[0][0]) return result[0][0] # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "select_user_engagements_by_stage FAILED " raise Exception(errorMsg, "select_user_engagements_by_stage") @@ -294,7 +355,11 @@ class DBUser: # Fetch one user ID. index = DBGeneral.select_where_email("id", "auth_user", email) DBGeneral.update_where( - "ice_custom_user", "temp_password", encodePass, "user_ptr_id", index) + "ice_custom_user", + "temp_password", + encodePass, + "user_ptr_id", + index) @staticmethod def set_password_to_default(email): @@ -305,8 +370,9 @@ class DBUser: @staticmethod def select_el_not_in_engagement(el_name, pr_name): - query_str = "select full_name from ice_user_profile where role_id = 2 and full_name != '%s' and full_name != '%s';" % ( - el_name, pr_name) + query_str = "select full_name from ice_user_profile where " +\ + "role_id = 2 and full_name != '%s' and full_name != '%s';" % ( + el_name, pr_name) new_user = DBGeneral.select_query(query_str) if new_user == 'None': new_user = DBUser.update_to_el_not_in_engagement() @@ -316,53 +382,65 @@ class DBUser: def select_user_uuid(email): user_uuid = DBUser.select_user_profile_property(email, "uuid") return user_uuid - + @staticmethod def select_access_key(email): - access_key = DBUser.select_user_profile_property(email, "rgwa_access_key") + access_key = DBUser.select_user_profile_property( + email, "rgwa_access_key") return access_key - + @staticmethod def select_secret_key(email): - secret_key = DBUser.select_user_profile_property(email, "rgwa_secret_key") + secret_key = DBUser.select_user_profile_property( + email, "rgwa_secret_key") return secret_key - + @staticmethod def update_to_el_not_in_engagement(): query_str = "select uuid from ice_user_profile where role_id = 1 ;" user_uuid = DBGeneral.select_query(query_str) - updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name = 'el_for_test' WHERE uuid = '%s' ;" % ( - user_uuid) + updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name" +\ + " = 'el_for_test' WHERE uuid = '%s' ;" % ( + user_uuid) DBGeneral.update_query(updatequery) - updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE full_name = '%s' ;" % ( - 'el_for_test') + updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE " +\ + "full_name = '%s' ;" % ( + 'el_for_test') DBGeneral.update_query(updatequery) return 'el_for_test' @staticmethod def rollback_for_el_not_in_engagement(): - query_str = "select uuid from ice_user_profile where full_name = 'el_for_test';" + query_str = "select uuid from ice_user_profile where full_name = " +\ + "'el_for_test';" user_uuid = DBGeneral.select_query(query_str) fullName = DBBridge.helper_rand_string("randomString") - updatequery = "UPDATE ice_user_profile SET role_id=1,full_name = '%s' WHERE uuid = '%s' ;" % ( - fullName, user_uuid) + updatequery = "UPDATE ice_user_profile SET role_id=1,full_name " +\ + "= '%s' WHERE uuid = '%s' ;" % (fullName, user_uuid) DBGeneral.update_query(updatequery) @staticmethod def set_engagement_peer_reviewer(engagement_uuid, email): user_uuid = DBUser.select_user_uuid(email) - update_query = "UPDATE ice_user_profile SET role_id=2 WHERE uuid = '%s';" % user_uuid + update_query = "UPDATE ice_user_profile SET role_id=2 WHERE " +\ + "uuid = '%s';" % user_uuid DBGeneral.update_query(update_query) user_id = DBGeneral.select_query( "SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid) - update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s WHERE uuid = '%s';" % ( - user_id, engagement_uuid) + update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s " +\ + "WHERE uuid = '%s';" % ( + user_id, engagement_uuid) DBGeneral.update_query(update_query) @staticmethod def select_user_profile_property(user_email, property_name): - return DBGeneral.select_where(property_name, "ice_user_profile", "email", user_email, 1) + return DBGeneral.select_where( + property_name, + "ice_user_profile", + "email", + user_email, + 1) @staticmethod def validate_user_profile_settings_in_db(user_email, checked): @@ -391,13 +469,23 @@ class DBUser: def get_access_key(user_uuid): counter = 0 access_key = DBGeneral.select_where( - "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) - while access_key == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER: + "rgwa_access_key", + Constants.DBConstants.IceTables.USER_PROFILE, + "uuid", + user_uuid, + 1) + while access_key == "None" and counter <= \ + Constants.RGWAConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause) logger.debug( - "rgwa_access_key are not ready yet, trying again (%s of 20)" % counter) + "rgwa_access_key are not ready yet, trying again (%s of 20)" % + counter) access_key = DBGeneral.select_where( - "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) + "rgwa_access_key", + Constants.DBConstants.IceTables.USER_PROFILE, + "uuid", + user_uuid, + 1) counter += 1 return access_key @@ -405,13 +493,23 @@ class DBUser: def get_access_secret(user_uuid): counter = 0 access_secret = DBGeneral.select_where( - "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) - while access_secret == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER: + "rgwa_secret_key", + Constants.DBConstants.IceTables.USER_PROFILE, + "uuid", + user_uuid, + 1) + while access_secret == "None" and counter <= Constants.\ + RGWAConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause) logger.debug( - "rgwa_secret_key are not ready yet, trying again (%s of 100)" % counter) + "rgwa_secret_key are not ready yet, trying again (%s of 100)" % + counter) access_secret = DBGeneral.select_where( - "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1) - + "rgwa_secret_key", + Constants.DBConstants.IceTables.USER_PROFILE, + "uuid", + user_uuid, + 1) + counter += 1 return access_secret diff --git a/services/database/db_virtual_function.py b/services/database/db_virtual_function.py index 143bca2..f61d1b7 100644 --- a/services/database/db_virtual_function.py +++ b/services/database/db_virtual_function.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -49,6 +49,7 @@ from services.logging_service import LoggingServiceFactory logger = LoggingServiceFactory.get_logger() + class DBVirtualFunction: @staticmethod @@ -67,8 +68,10 @@ class DBVirtualFunction: try: logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE) # Create INSERT query. - queryStr = "INSERT INTO %s (""uuid, name, weight, ui_visibility"") VALUES ('%s', '%s', '%s', '%s');" % ( - queryTableName, uuid, name, 0, ui_visibility) + queryStr = "INSERT INTO %s " % queryTableName +\ + "(""uuid, name, weight, ui_visibility"") VALUES " +\ + "('%s', '%s', " % (uuid, name) +\ + "'%s', '%s');" % (0, ui_visibility) logger.debug("Query: " + queryStr) cur.execute(queryStr) # Execute query. dbConn.commit() @@ -100,7 +103,8 @@ class DBVirtualFunction: dbConn.commit() logger.debug("Test results are in General now.") except Exception as e: - errorMsg = "Failed to delete ECOMP release from General . because :" + \ + errorMsg = "Failed to delete ECOMP release from General ." +\ + " because :" + \ str(e) raise Exception(errorMsg) raise @@ -114,8 +118,10 @@ class DBVirtualFunction: @staticmethod def select_next_steps_uuids_by_stage(engagement_uuid, engagement_stage): - query = "SELECT uuid FROM %s WHERE engagement_id='%s' AND engagement_stage='%s' ORDER BY position;" % ( - Constants.DBConstants.IceTables.NEXT_STEP, engagement_uuid, engagement_stage) + query = "SELECT uuid FROM %s WHERE " % ( + Constants.DBConstants.IceTables.NEXT_STEP) + "engagement_id=" +\ + "'%s' AND engagement_stage='%s' ORDER BY position;" % ( + engagement_uuid, engagement_stage) return DBGeneral.select_query(query, "list", 0) @staticmethod @@ -125,11 +131,17 @@ class DBVirtualFunction: @staticmethod def select_next_step_description(next_step_uuid): - return DBGeneral.select_where("description", "ice_next_step", "uuid", next_step_uuid, 1) + return DBGeneral.select_where( + "description", + "ice_next_step", + "uuid", + next_step_uuid, + 1) @staticmethod def select_eng_uuid(vf_name): - return DBGeneral.select_where("engagement_id", "ice_vf", "name", vf_name, 1) + return DBGeneral.select_where( + "engagement_id", "ice_vf", "name", vf_name, 1) @staticmethod def select_engagment_uuid_by_vf_name(vfName): @@ -138,7 +150,11 @@ class DBVirtualFunction: engagement_manual_id = DBGeneral.select_where( "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) enguuid = DBGeneral.select_where( - "uuid", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + "uuid", + "ice_engagement", + "engagement_manual_id", + engagement_manual_id, + 1) return enguuid @staticmethod @@ -149,7 +165,8 @@ class DBVirtualFunction: @staticmethod def select_vf_name_by_vf_version(version_name): - queryofname = "SELECT name FROM ice_vf WHERE version= '%s';" % version_name + queryofname = "SELECT name FROM ice_vf WHERE " +\ + "version= '%s';" % version_name vfNameDb = str(DBGeneral.select_query(queryofname)) return vfNameDb @@ -164,64 +181,106 @@ class DBVirtualFunction: @staticmethod def get_engagement(): - """Use this function instead of creating a new engagement where no need to""" - queryStr = "SELECT DISTINCT ice_engagement.uuid, engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \ - ice_user_profile.email, reviewer_table.full_name, reviewer_table.email, \ + """Use this function instead of creating a new """ +\ + """engagement where no need to""" + queryStr = "SELECT DISTINCT ice_engagement.uuid, " +\ + "engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \ + ice_user_profile.email, reviewer_table.full_name, " +\ + "reviewer_table.email, \ ice_deployment_target.version, ice_ecomp_release.name \ - FROM ice_engagement LEFT JOIN ice_vf ON engagement_id = ice_engagement.uuid \ - LEFT JOIN ice_user_profile reviewer_table ON reviewer_table.id = ice_engagement.reviewer_id \ - LEFT JOIN ice_user_profile ON ice_user_profile.id = ice_engagement.peer_reviewer_id \ - LEFT JOIN ice_deployment_target ON ice_deployment_target.uuid = ice_vf.deployment_target_id \ - LEFT JOIN ice_ecomp_release ON ice_ecomp_release.uuid = ice_vf.ecomp_release_id \ + FROM ice_engagement LEFT JOIN ice_vf ON engagement_id " +\ + "= ice_engagement.uuid \ + LEFT JOIN ice_user_profile reviewer_table ON " +\ + "reviewer_table.id = ice_engagement.reviewer_id \ + LEFT JOIN ice_user_profile ON ice_user_profile.id = " +\ + "ice_engagement.peer_reviewer_id \ + LEFT JOIN ice_deployment_target ON " +\ + "ice_deployment_target.uuid = " +\ + "ice_vf.deployment_target_id \ + LEFT JOIN ice_ecomp_release ON " +\ + "ice_ecomp_release.uuid = ice_vf.ecomp_release_id \ WHERE ice_user_profile.id IS NOT NULL LIMIT 1;" list_of_values = DBGeneral.select_query(queryStr, return_type="list") - list_of_keys = ["engagement_uuid", "engagement_manual_id", "vfName", "pr_name", - "pr_email", "el_name", "el_email", "target_aic", "ecomp_release"] + list_of_keys = [ + "engagement_uuid", + "engagement_manual_id", + "vfName", + "pr_name", + "pr_email", + "el_name", + "el_email", + "target_aic", + "ecomp_release"] return dict(zip(list_of_keys, list_of_values)) @staticmethod def insert_aic_version(ui_visibility="TRUE"): new_aic_version = { - "uuid": str(uuid.uuid4()), "name": "AIC", "version": DBBridge.helper_rand_string("randomNumber", 2), "ui_visibility": ui_visibility, "weight": 0} + "uuid": str( + uuid.uuid4()), + "name": "AIC", + "version": DBBridge.helper_rand_string( + "randomNumber", + 2), + "ui_visibility": ui_visibility, + "weight": 0} queryStr = "INSERT INTO public.ice_deployment_target( \ uuid, name, version, ui_visibility, weight) \ - VALUES ('%s', '%s', '%s', '%s', %s);" % (new_aic_version['uuid'], new_aic_version['name'], new_aic_version['version'], new_aic_version['ui_visibility'], new_aic_version['weight']) + VALUES " +\ + "('%s', '%s', '%s', '%s', %s);" % ( + new_aic_version['uuid'], + new_aic_version['name'], + new_aic_version['version'], + new_aic_version['ui_visibility'], + new_aic_version['weight']) DBGeneral.insert_query(queryStr) return new_aic_version @staticmethod def delete_aic_version(aic_uuid): DBGeneral.insert_query( - "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" % aic_uuid) + "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" % + aic_uuid) @staticmethod def change_aic_version_weight(new_weight, old_weight): DBGeneral.insert_query( - "UPDATE public.ice_deployment_target SET weight=%s WHERE weight=%s" % (new_weight, old_weight)) + "UPDATE public.ice_deployment_target " + + "SET weight=%s " % new_weight + + "WHERE weight=%s" % old_weight) @staticmethod def change_ecomp_release_weight(new_weight, old_weight): DBGeneral.insert_query( - "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" % (new_weight, old_weight)) + "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" % + (new_weight, old_weight)) @staticmethod def select_aic_version_uuid(aic_version): - return DBGeneral.select_where("uuid", "ice_deployment_target", "version", aic_version, 1) + return DBGeneral.select_where( + "uuid", "ice_deployment_target", "version", aic_version, 1) @staticmethod def select_ecomp_release_uuid(ecomp_release): - return DBGeneral.select_where("uuid", "ice_ecomp_release", "name", ecomp_release, 1) + return DBGeneral.select_where( + "uuid", "ice_ecomp_release", "name", ecomp_release, 1) @staticmethod def add_admin_to_eng_team(eng_uuid): admin_db_id = DBGeneral.select_where( - 'id', Constants.DBConstants.IceTables.USER_PROFILE, 'email', Constants.Users.Admin.EMAIL, 1) - queryStr = "INSERT INTO public.ice_engagement_engagement_team(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % ( - eng_uuid, admin_db_id) + 'id', + Constants.DBConstants.IceTables.USER_PROFILE, + 'email', + Constants.Users.Admin.EMAIL, + 1) + queryStr = "INSERT INTO public.ice_engagement_engagement_team" +\ + "(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % ( + eng_uuid, admin_db_id) logger.debug("add_admin_to_eng_team Query: %s" % queryStr) DBGeneral.insert_query(queryStr) @staticmethod def remove_engagement_from_recent(vf_uuid): DBGeneral.insert_query( - "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants.IceTables.RECENT, vf_uuid)) + "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants. + IceTables.RECENT, vf_uuid)) diff --git a/services/frontend/__init__.py b/services/frontend/__init__.py index 30d7152..32b601a 100644 --- a/services/frontend/__init__.py +++ b/services/frontend/__init__.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. diff --git a/services/frontend/base_actions/__init__.py b/services/frontend/base_actions/__init__.py index 30d7152..32b601a 100644 --- a/services/frontend/base_actions/__init__.py +++ b/services/frontend/base_actions/__init__.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. diff --git a/services/frontend/base_actions/click.py b/services/frontend/base_actions/click.py index 00470b7..561b2f7 100644 --- a/services/frontend/base_actions/click.py +++ b/services/frontend/base_actions/click.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -51,7 +51,8 @@ class Click: Wait.page_has_loaded() Wait.id(element_id) session.ice_driver.find_element_by_id(element_id).click() - except Exception as e: # If failed - count the failure and add the error to list of errors. + # If failed - count the failure and add the error to list of errors. + except Exception as e: errorMsg = "Failed to click_on on ID " + element_id raise Exception(errorMsg, e) @@ -62,18 +63,21 @@ class Click: Wait.page_has_loaded() Wait.name(element_name) session.ice_driver.find_element_by_name(element_name).click() - except Exception as e: # If failed - count the failure and add the error to list of errors. + # If failed - count the failure and add the error to list of errors. + except Exception as e: errorMsg = "Failed to click_on on ID " + element_name raise Exception(errorMsg, e) - + @staticmethod def link_text(link_inner_text, wait_for_page=False): try: if wait_for_page: Wait.page_has_loaded() Wait.link_text(link_inner_text) - session.ice_driver.find_element_by_link_text(link_inner_text).click() - except Exception as e: # If failed - count the failure and add the error to list of errors. + session.ice_driver.find_element_by_link_text( + link_inner_text).click() + # If failed - count the failure and add the error to list of errors. + except Exception as e: errorMsg = "Failed to click_on on LINK TEXT " + link_inner_text raise Exception(errorMsg, e) @@ -83,8 +87,10 @@ class Click: if wait_for_page: Wait.page_has_loaded() Wait.css(element_css) - session.ice_driver.find_element_by_css_selector(element_css).click() - except Exception as e: # If failed - count the failure and add the error to list of errors. + session.ice_driver.find_element_by_css_selector( + element_css).click() + # If failed - count the failure and add the error to list of errors. + except Exception as e: errorMsg = "Failed to click_on on CSS Selector " + element_css raise Exception(errorMsg, e) @@ -95,7 +101,8 @@ class Click: Wait.page_has_loaded() Wait.xpath(element_xpath) session.ice_driver.find_element_by_xpath(element_xpath).click() - except Exception as e: # If failed - count the failure and add the error to list of errors. + # If failed - count the failure and add the error to list of errors. + except Exception as e: errorMsg = "Failed to click_on on XPATH " + element_xpath raise Exception(errorMsg, e) @@ -104,5 +111,10 @@ class Click: ns = session.ice_driver.find_element_by_id("step-description-1") ActionChains(session.ice_driver).move_to_element(ns).perform() Wait.css(source_css) - source_element = session.ice_driver.find_element_by_css_selector(source_css) - ActionChains(session.ice_driver).drag_and_drop_by_offset(source_element, xoffset, yoffset).perform() + source_element = session.ice_driver.find_element_by_css_selector( + source_css) + ActionChains( + session.ice_driver).drag_and_drop_by_offset( + source_element, + xoffset, + yoffset).perform() diff --git a/services/frontend/base_actions/enter.py b/services/frontend/base_actions/enter.py index 4577a3d..2a6992d 100644 --- a/services/frontend/base_actions/enter.py +++ b/services/frontend/base_actions/enter.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -45,85 +45,117 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + + class Enter: - + @staticmethod def text_by_name(attr_name_value, typed_text, wait_for_page=False): - try: # Send keys to element in UI, by name locator (e.g. type something in text box). + # Send keys to element in UI, by name locator (e.g. type something in + # text box). + try: if wait_for_page: Wait.page_has_loaded() Wait.name(attr_name_value) session.ice_driver.find_element_by_name(attr_name_value).clear() - session.ice_driver.find_element_by_name(attr_name_value).send_keys(typed_text[:-1]) + session.ice_driver.find_element_by_name( + attr_name_value).send_keys(typed_text[:-1]) time.sleep(session.wait_until_time_pause) - session.ice_driver.find_element_by_name(attr_name_value).send_keys(typed_text[-1:]) - except Exception as e: # If failed - count the failure and add the error to list of errors. - errorMsg = "Failed to type " + typed_text + " in text box" + session.ice_driver.find_element_by_name( + attr_name_value).send_keys(typed_text[-1:]) + # If failed - count the failure and add the error to list of errors. + except Exception as e: + errorMsg = "Failed to type " + typed_text + " in text box" raise Exception(errorMsg, e) - + @staticmethod def text_by_xpath(attr_xpath_value, typed_text, wait_for_page=False): - try: # Send keys to element in UI, by name locator (e.g. type something in text box). + # Send keys to element in UI, by name locator (e.g. type something in + # text box). + try: if wait_for_page: Wait.page_has_loaded() Wait.xpath(attr_xpath_value) session.ice_driver.find_element_by_xpath(attr_xpath_value).clear() - session.ice_driver.find_element_by_xpath(attr_xpath_value).send_keys(typed_text[:-1]) + session.ice_driver.find_element_by_xpath( + attr_xpath_value).send_keys(typed_text[:-1]) time.sleep(session.wait_until_time_pause) - session.ice_driver.find_element_by_xpath(attr_xpath_value).send_keys(typed_text[-1:]) - except Exception as e: # If failed - count the failure and add the error to list of errors. - errorMsg = "Failed to type " + typed_text + " in text box" + session.ice_driver.find_element_by_xpath( + attr_xpath_value).send_keys(typed_text[-1:]) + # If failed - count the failure and add the error to list of errors. + except Exception: + errorMsg = "Failed to type " + typed_text + " in text box" raise Exception(errorMsg, attr_xpath_value) - + @staticmethod def text_by_id(attr_id_value, typed_text, wait_for_page=False): - try: # Send keys to element in UI, by ID locator (e.g. type something in text box). + # Send keys to element in UI, by ID locator (e.g. type something in + # text box). + try: if wait_for_page: Wait.page_has_loaded() Wait.id(attr_id_value) session.ice_driver.find_element_by_id(attr_id_value).clear() - session.ice_driver.find_element_by_id(attr_id_value).send_keys(typed_text[:-1]) + session.ice_driver.find_element_by_id( + attr_id_value).send_keys(typed_text[:-1]) time.sleep(session.wait_until_time_pause) - session.ice_driver.find_element_by_id(attr_id_value).send_keys(typed_text[-1:]) - except Exception as e: # If failed - count the failure and add the error to list of errors. - errorMsg = "Failed to type " + typed_text + " in text box" + session.ice_driver.find_element_by_id( + attr_id_value).send_keys(typed_text[-1:]) + # If failed - count the failure and add the error to list of errors. + except Exception: + errorMsg = "Failed to type " + typed_text + " in text box" raise Exception(errorMsg, attr_id_value) - + @staticmethod def clear(attr_id_value): try: Wait.id(attr_id_value) session.ice_driver.find_element_by_id(attr_id_value).clear() - except Exception as e: - errorMsg = "Failed to clear text box" + except Exception: + errorMsg = "Failed to clear text box" raise Exception(errorMsg, attr_id_value) - + @staticmethod def text_by_css(attr_css_value, typed_text, wait_for_page=False): - try: # Send keys to element in UI, by CSS locator (e.g. type something in text box). + # Send keys to element in UI, by CSS locator (e.g. type something in + # text box). + try: if wait_for_page: Wait.page_has_loaded() Wait.css(attr_css_value) - session.ice_driver.find_element_by_css_selector(attr_css_value).clear() - session.ice_driver.find_element_by_css_selector(attr_css_value).send_keys(typed_text[:-1]) + session.ice_driver.find_element_by_css_selector( + attr_css_value).clear() + session.ice_driver.find_element_by_css_selector( + attr_css_value).send_keys(typed_text[:-1]) time.sleep(session.wait_until_time_pause) - session.ice_driver.find_element_by_css_selector(attr_css_value).send_keys(typed_text[-1:]) - except Exception as e: # If failed - count the failure and add the error to list of errors. - errorMsg = "Failed to type " + typed_text + " in text box" + session.ice_driver.find_element_by_css_selector( + attr_css_value).send_keys(typed_text[-1:]) + # If failed - count the failure and add the error to list of errors. + except Exception: + errorMsg = "Failed to type " + typed_text + " in text box" raise Exception(errorMsg, attr_css_value) - + @staticmethod - def text_by_link_text(attr_link_text_value, typed_text, wait_for_page=False): - try: # Send keys to element in UI, by name locator (e.g. type something in text box). + def text_by_link_text( + attr_link_text_value, + typed_text, + wait_for_page=False): + # Send keys to element in UI, by name locator (e.g. type something in + # text box). + try: if wait_for_page: Wait.page_has_loaded() Wait.link_text(attr_link_text_value) - session.ice_driver.find_element_by_link_text(attr_link_text_value).clear() - session.ice_driver.find_element_by_link_text(attr_link_text_value).send_keys(typed_text[:-1]) + session.ice_driver.find_element_by_link_text( + attr_link_text_value).clear() + session.ice_driver.find_element_by_link_text( + attr_link_text_value).send_keys(typed_text[:-1]) time.sleep(session.wait_until_time_pause) - session.ice_driver.find_element_by_link_text(attr_link_text_value).send_keys(typed_text[-1:]) - except Exception as e: # If failed - count the failure and add the error to list of errors. - errorMsg = "Failed to type " + typed_text + " in text box" + session.ice_driver.find_element_by_link_text( + attr_link_text_value).send_keys(typed_text[-1:]) + # If failed - count the failure and add the error to list of errors. + except Exception: + errorMsg = "Failed to type " + typed_text + " in text box" raise Exception(errorMsg, attr_link_text_value) @staticmethod @@ -132,8 +164,10 @@ class Enter: if wait_for_page: Wait.page_has_loaded() session.ice_driver.execute_script( - "var element = angular.element(document.querySelector('" + selector + "')); element.scope()." + - property_name + " = new Date('" + str(datetime.today().isoformat()) + "')") + "var element = angular.element(document." + + "querySelector('" + selector + "')); element.scope()." + + property_name + " = new Date('" + + str(datetime.today().isoformat()) + "')") except Exception as e: errorMsg = "Failed to select date with datePicker." raise Exception(errorMsg, str(e)) diff --git a/services/frontend/base_actions/get.py b/services/frontend/base_actions/get.py index 5fb801a..0eb7959 100644 --- a/services/frontend/base_actions/get.py +++ b/services/frontend/base_actions/get.py @@ -51,7 +51,7 @@ class Get: Wait.id(attr_id_value) return session.ice_driver.find_element_by_id(attr_id_value).text # If failed - count the failure and add the error to list of errors. - except Exception as e: + except Exception: errorMsg = "Failed to get text of element " + attr_id_value raise Exception(errorMsg, attr_id_value) @@ -61,9 +61,10 @@ class Get: if wait_for_page: Wait.page_has_loaded() Wait.css(attr_css_value) - return session.ice_driver.find_element_by_css_selector(attr_css_value).text + return session.ice_driver.find_element_by_css_selector( + attr_css_value).text # If failed - count the failure and add the error to list of errors. - except Exception as e: + except Exception: errorMsg = "Failed to get text of element " + attr_css_value raise Exception(errorMsg, attr_css_value) @@ -74,7 +75,7 @@ class Get: return session.ice_driver.find_element_by_css_selector( "#" + attr_id_value + ".wysiwyg-textarea") # If failed - count the failure and add the error to list of errors. - except Exception as e: + except Exception: errorMsg = "Failed to get element by id " + attr_id_value raise Exception(errorMsg, attr_id_value) @@ -82,9 +83,10 @@ class Get: def by_name(attr_name_value): try: Wait.name(attr_name_value) - return session.ice_driver.find_element_by_name(attr_name_value).text + return session.ice_driver.find_element_by_name( + attr_name_value).text # If failed - count the failure and add the error to list of errors. - except Exception as e: + except Exception: errorMsg = "Failed to get text of element " + attr_name_value raise Exception(errorMsg, attr_name_value) @@ -92,9 +94,10 @@ class Get: def by_xpath(attr_name_value): try: Wait.xpath(attr_name_value) - return session.ice_driver.find_element_by_xpath(attr_name_value).text + return session.ice_driver.find_element_by_xpath( + attr_name_value).text # If failed - count the failure and add the error to list of errors. - except Exception as e: + except Exception: errorMsg = "Failed to get text of element " + attr_name_value raise Exception(errorMsg, attr_name_value) @@ -104,8 +107,9 @@ class Get: if wait_for_page: Wait.page_has_loaded() Wait.name(attr_name_value) - return session.ice_driver.find_element_by_name(attr_name_value).get_attribute("value") - except Exception as e: + return session.ice_driver.find_element_by_name( + attr_name_value).get_attribute("value") + except Exception: errorMsg = "Failed to get value by name:" + attr_name_value raise Exception(errorMsg, attr_name_value) @@ -113,8 +117,9 @@ class Get: def meta_order_by_id(attr_id_value): try: Wait.id(attr_id_value) - return session.ice_driver.find_element_by_id(attr_id_value).get_attribute("meta-order") - except Exception as e: + return session.ice_driver.find_element_by_id( + attr_id_value).get_attribute("meta-order") + except Exception: errorMsg = "Failed to get meta order by id:" + attr_id_value raise Exception(errorMsg, attr_id_value) @@ -124,8 +129,9 @@ class Get: if wait_for_page: Wait.page_has_loaded() Wait.id(attr_id_value) - return session.ice_driver.find_element_by_id(attr_id_value).is_selected() - except Exception as e: + return session.ice_driver.find_element_by_id( + attr_id_value).is_selected() + except Exception: errorMsg = "Failed to get if it's selected by id:" + attr_id_value raise Exception(errorMsg, attr_id_value) @@ -138,6 +144,6 @@ class Get: return Helper.internal_assert_boolean_true_false( session.ice_driver.find_element_by_id( attr_id_value).get_attribute("value"), "on") - except Exception as e: + except Exception: errorMsg = "Failed to get if it's selected by id:" + attr_id_value raise Exception(errorMsg, attr_id_value) diff --git a/services/frontend/base_actions/wait.py b/services/frontend/base_actions/wait.py index a699917..4434bb1 100644 --- a/services/frontend/base_actions/wait.py +++ b/services/frontend/base_actions/wait.py @@ -58,12 +58,12 @@ class Wait: try: # Wait 4 seconds for element and compare to expected result. if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_retires).until( + WebDriverWait( + session.ice_driver, session.wait_until_retires).until( expected_conditions.text_to_be_present_in_element( - (By.XPATH, xpath), text) - ) + (By.XPATH, xpath), text)) # If failed - count the failure and add the error to list of errors. - except: + except Exception: error_msg = "Text - " + text + " not found in xPath " + xpath raise Exception(error_msg, xpath) @@ -72,12 +72,12 @@ class Wait: try: # Wait 4 seconds for element and compare to expected result. if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_retires).until( + WebDriverWait( + session.ice_driver, session.wait_until_retires).until( expected_conditions.text_to_be_present_in_element( - (By.ID, element_id), text) - ) + (By.ID, element_id), text)) # If failed - count the failure and add the error to list of errors. - except: + except Exception: error_msg = "Text - " + text + " not found in ID " + element_id raise Exception(error_msg, element_id) @@ -86,10 +86,10 @@ class Wait: try: # Wait 4 seconds for element and compare to expected result. if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_retires).until( + WebDriverWait( + session.ice_driver, session.wait_until_retires).until( expected_conditions.text_to_be_present_in_element( - (By.CSS_SELECTOR, css), text) - ) + (By.CSS_SELECTOR, css), text)) # If failed - count the failure and add the error to list of errors. except Exception as e: error_msg = "Text - " + text + " not found in CSS - " + css @@ -100,10 +100,10 @@ class Wait: try: # Wait 4 seconds for element and compare to expected result. if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_retires).until( + WebDriverWait( + session.ice_driver, session.wait_until_retires).until( expected_conditions.text_to_be_present_in_element( - (By.NAME, name), text) - ) + (By.NAME, name), text)) # If failed - count the failure and add the error to list of errors. except Exception as e: error_msg = "Text - " + text + " not found by NAME - " + name @@ -114,7 +114,8 @@ class Wait: try: # Wait 4 seconds for element and compare to expected result. if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_retires).until( + WebDriverWait(session.ice_driver, + session.wait_until_retires).until( expected_conditions.visibility_of_element_located( (By.ID, element_id)) ) @@ -128,10 +129,10 @@ class Wait: try: # Wait 4 seconds for element and compare to expected result. if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_retires).until( + WebDriverWait( + session.ice_driver, session.wait_until_retires).until( expected_conditions.visibility_of_element_located( - (By.CSS_SELECTOR, element_css)) - ) + (By.CSS_SELECTOR, element_css))) # If failed - count the failure and add the error to list of errors. except Exception as e: error_msg = "Didn't find CSS Selector " + element_css @@ -142,7 +143,8 @@ class Wait: try: # Wait 4 seconds for element and compare to expected result. if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_implicit_time).until( + WebDriverWait(session.ice_driver, + session.wait_until_implicit_time).until( expected_conditions.visibility_of_element_located( (By.CSS_SELECTOR, element_css)) ) @@ -157,10 +159,10 @@ class Wait: try: if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_retires).until( + WebDriverWait( + session.ice_driver, session.wait_until_retires).until( expected_conditions.visibility_of_element_located( - (By.LINK_TEXT, link_inner_text)) - ) + (By.LINK_TEXT, link_inner_text))) # If failed - count the failure and add the error to list of errors. except Exception as e: error_msg = "Didn't find LINK TEXT " + link_inner_text @@ -171,10 +173,10 @@ class Wait: try: if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_retires).until( + WebDriverWait( + session.ice_driver, session.wait_until_retires).until( expected_conditions.visibility_of_element_located( - (By.NAME, element_name)) - ) + (By.NAME, element_name))) # If failed - count the failure and add the error to list of errors. except Exception as e: error_msg = "Didn't find NAME " + element_name @@ -185,10 +187,10 @@ class Wait: try: if wait_for_page: Wait.page_has_loaded() - WebDriverWait(session.ice_driver, session.wait_until_retires).until( + WebDriverWait( + session.ice_driver, session.wait_until_retires).until( expected_conditions.visibility_of_element_located( - (By.XPATH, element_xpath)) - ) + (By.XPATH, element_xpath))) # If failed - count the failure and add the error to list of errors. except Exception as e: error_msg = "Didn't find XPath " + element_xpath @@ -199,14 +201,17 @@ class Wait: for _ in range(Constants.FEConstants.RETRIES_NUMBER): try: httpRequests = session.ice_driver.execute_script( - 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;') + 'return window.angular ? window.angular.element("body").' + + 'injector().get("$http").pendingRequests.length : 1;') if(str(httpRequests) == "0"): time.sleep(session.wait_until_time_pause) return logger.debug( - "Checking if {} page is loaded. ".format(session.ice_driver.current_url)) + "Checking if {} page is loaded. ".format( + session.ice_driver.current_url)) time.sleep(session.wait_until_time_pause) except Exception as exception: + time.sleep(session.wait_until_time_pause) continue raise Exception("Page loading took too much time") @@ -266,7 +271,10 @@ class Wait: return True else: raise Exception( - "id_to_dissappear " + id_element + " num of retries = " + str(i)) + "id_to_dissappear " + + id_element + + " num of retries = " + + str(i)) @staticmethod def name_to_dissappear(name_element, wait_for_page=False): @@ -292,7 +300,10 @@ class Wait: return True else: raise Exception( - "name_to_dissappear " + name_element + " num of retries = " + str(i)) + "name_to_dissappear " + + name_element + + " num of retries = " + + str(i)) @staticmethod def css_to_dissappear(css_element): @@ -316,3 +327,8 @@ class Wait: return True else: raise Exception("css_to_dissappear" + css_element) + + @staticmethod + def bucket_to_create(bucket_id): + logger.debug("Waiting for %s bucket to be created" % bucket_id) + time.sleep(session.positive_timeout) diff --git a/services/frontend/fe_checklist.py b/services/frontend/fe_checklist.py index 3afc472..133c6d1 100644 --- a/services/frontend/fe_checklist.py +++ b/services/frontend/fe_checklist.py @@ -53,7 +53,9 @@ from services.frontend.fe_user import FEUser from services.frontend.fe_wizard import FEWizard from services.helper import Helper from services.logging_service import LoggingServiceFactory -from tests.uiTests.test_ui_base import * +from services.database.db_general import DBGeneral +from services.constants import Constants +from services.session import session logger = LoggingServiceFactory.get_logger() @@ -86,8 +88,9 @@ class FEChecklist: engagement_id = DBVirtualFunction.select_eng_uuid(vfName) engLeadEmail = DBUser.select_el_email(vfName) logger.debug("EL email: " + engLeadEmail) - engagement_manual_id = DBGeneral.select_where("engagement_manual_id", "ice_engagement", "uuid", - engagement_id, 1) + engagement_manual_id = DBGeneral.select_where( + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) # Click on all default next steps myVfName = engagement_manual_id + ": " + vfName actualVfNameid = "clickable-" + myVfName @@ -117,8 +120,13 @@ class FEChecklist: engagement_id, vfName, actualVfName, engagement_manual_id) checklistUuid = DBGeneral.select_where( "uuid", "ice_checklist", "name", checklistName, 1) - newObjWithChecklist = [checklistUuid, engLeadEmail, engagement_manual_id, actualVfNameid, myVfName, - checklistName] + newObjWithChecklist = [ + checklistUuid, + engLeadEmail, + engagement_manual_id, + actualVfNameid, + myVfName, + checklistName] return newObjWithChecklist # If failed - count the failure and add the error to list of errors. except Exception as e: @@ -126,7 +134,11 @@ class FEChecklist: raise Exception(errorMsg, "create_new_checklist") @staticmethod - def create_checklist(engagement_id, vfName, actualVfName, engagement_manual_id): + def create_checklist( + engagement_id, + vfName, + actualVfName, + engagement_manual_id): try: checklistName = Helper.rand_string("randomString") Wait.id("checklist-plus-" + engagement_id, wait_for_page=True) @@ -140,12 +152,18 @@ class FEChecklist: "checkListName", checklistName, wait_for_page=True) Wait.xpath("//select") - Select(session.ice_driver.find_element_by_id(Constants.Template.Subtitle.SelectTemplateTitle.TEXT) - ).select_by_visible_text(Constants.Template.Heat.TEXT) + Select( + session.ice_driver.find_element_by_id( + Constants.Template.Subtitle.SelectTemplateTitle.TEXT + )).select_by_visible_text( + Constants.Template.Heat.TEXT) Click.id(Constants.Template.Heat.TEXT, wait_for_page=True) # Click.css("option.ng-binding.ng-scope") Helper.internal_assert( - "Associate Files", Get.by_id("associated-files-title", wait_for_page=True)) + "Associate Files", + Get.by_id( + "associated-files-title", + wait_for_page=True)) Click.xpath("//multiselect/div/button", wait_for_page=True) Click.link_text("file0", wait_for_page=True) Click.link_text("file1") @@ -202,7 +220,7 @@ class FEChecklist: Click.xpath("(//button[@type='button'])[11]") try: Click.xpath("//div[3]/multiselect/div/ul/li/a") - except: + except BaseException: Click.link_text("Homer Simpson") Click.css("div.modal-content") count = 0 @@ -219,7 +237,8 @@ class FEChecklist: Click.css("div.modal-content") Click.xpath("(//button[@type='button'])[23]") Click.css( - "div.btn-group.open > ul.dropdown-menu > li.ng-scope > a.ng-binding") + "div.btn-group.open > ul.dropdown-menu > " + + "li.ng-scope > a.ng-binding") Click.link_text("Add Another Next Step") Click.xpath("(//button[@type='button'])[25]") FEWizard.date_picker_add_ns(count) @@ -243,7 +262,7 @@ class FEChecklist: Click.xpath("(//button[@type='button'])[11]") try: Click.xpath("//div[3]/multiselect/div/ul/li/a") - except: + except BaseException: Wait.link_text("Homer Simpson") Click.link_text("Homer Simpson") Wait.css("div.modal-content") @@ -263,7 +282,8 @@ class FEChecklist: Click.css("div.modal-content") Click.xpath("(//button[@type='button'])[23]") Click.css( - "div.btn-group.open > ul.dropdown-menu > li.ng-scope > a.ng-binding") + "div.btn-group.open > ul.dropdown-menu > " + + "li.ng-scope > a.ng-binding") Click.link_text("Add Another Next Step") Wait.xpath("(//button[@type='button'])[25]") Click.xpath("(//button[@type='button'])[25]") @@ -273,7 +293,6 @@ class FEChecklist: Click.xpath("//div[4]/div/span") Wait.id("btn-submit") Wait.text_by_id("btn-submit", "Submit Next Steps") -# Helper.internal_assert("Submit Next Steps", Get.by_id("btn-submit")) Click.id("btn-submit") @staticmethod @@ -294,38 +313,42 @@ class FEChecklist: Helper.internal_assert( "Numeric parameters", Get.by_xpath("//li[3]/span[2]")) if settings.DATABASE_TYPE == 'local': - Helper.internal_assert("Section 2: External References", - Get.by_xpath("//li[2]/h2")) - # //li[2]/ul/li/span[2] #//ul[@id='line-item-list']/li[2]/ul/li/span[2] + Helper.internal_assert( + "Section 2: External References", + Get.by_xpath("//li[2]/h2")) Helper.internal_assert( - "Normal references", Get.by_xpath("//li[2]/ul/li/span[2]")) + "Normal references", + Get.by_xpath("//li[2]/ul/li/span[2]")) Helper.internal_assert( "VF image", Get.by_xpath("//li[2]/ul/li[2]/span[2]")) - except: + except BaseException: if settings.DATABASE_TYPE == 'local': Wait.text_by_css( "h2.ng-binding", "Section 1: External References") try: Helper.internal_assert( - "Normal references", Get.by_css("span.col-md-9.ng-binding")) - except: + "Normal references", Get.by_css( + "span.col-md-9.ng-binding")) + except BaseException: if "VF image" in Get.by_xpath("//li[2]/span[2]"): logger.debug("All Ok") if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Section 2: Parameter Specification", Get.by_xpath("//li[2]/h2")) + "Section 2: Parameter Specification", + Get.by_xpath("//li[2]/h2")) try: if settings.DATABASE_TYPE == 'local': Helper.internal_assert( "1.1 - Parameters", Get.by_xpath("//header/h2")) - except: + except BaseException: if settings.DATABASE_TYPE == 'local': Helper.internal_assert( "1.1 - Normal References", Get.by_xpath("//header/h2")) if settings.DATABASE_TYPE == 'local': elementTxt = Get.by_id("line-item-description") Helper.internal_assert( - "Numeric parameters should include range and/or allowed values.", elementTxt) + "Numeric parameters should include " + + "range and/or allowed values.", elementTxt) Helper.internal_assert("Audit Logs", Get.by_css("h3.col-md-12")) localLogText = "local log" Enter.text_by_id("new-audit-log-text", localLogText) @@ -333,63 +356,72 @@ class FEChecklist: "Add Log Entry", Get.by_id("submit-new-audit-lop-text")) Click.id("submit-new-audit-lop-text") vfName = newObj[0] - engLeadFullName = DBUser.get_el_name(vfName) + DBUser.get_el_name(vfName) Helper.internal_assert(localLogText, Get.by_css( Constants.Dashboard.Checklist.AuditLog.LastLocalAuditLog.CSS)) try: if settings.DATABASE_TYPE == 'local': Helper.internal_assert( "Parameters", Get.by_xpath("//li[2]/ul/li/span[2]")) - except: + except BaseException: if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Numeric parameters", Get.by_xpath("//li[2]/ul/li/span[2]")) + "Numeric parameters", + Get.by_xpath("//li[2]/ul/li/span[2]")) -# if Wait.css(Constants.Dashboard.Checklist.LineItem.Deny.CSS) or -# Wait.css(Constants.Dashboard.Checklist.LineItem.Approve.CSS): - session.run_negative(lambda: Wait.css(Constants.Dashboard.Checklist.LineItem.Deny.CSS) or Wait.css( - Constants.Dashboard.Checklist.LineItem.Approve.CSS), "Buttons displayed for Admin it's NOT work") -# logger.debug("Buttons displayed for Admin it's NOT work") -# else: -# print("Buttons not displayed for Admin it's work") + session.run_negative( + lambda: Wait.css( + Constants.Dashboard.Checklist.LineItem.Deny.CSS + ) or Wait.css( + Constants.Dashboard.Checklist.LineItem.Approve.CSS), + "Buttons displayed for Admin it's NOT work") if state == "APPROVAL": if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Audit Log (6)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (6)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) else: Helper.internal_assert( - "Audit Log (7)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (7)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) if state == "HANDOFF": if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Audit Log (8)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (8)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) else: Helper.internal_assert( - "Audit Log (9)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (9)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) Click.id(Constants.Dashboard.Checklist.AuditLog.ID) Wait.text_by_xpath("//span[2]", checklistName) - engLeadFullName = DBUser.select_el_email(vfName) + DBUser.select_el_email(vfName) Enter.text_by_xpath("//textarea", "zdfgsdyh") Click.css(Constants.SubmitButton.CSS) Wait.modal_to_dissappear() if state == "APPROVAL": if settings.DATABASE_TYPE == 'local': Wait.text_by_id( - Constants.Dashboard.Checklist.AuditLog.ID, "Audit Log (7)") + Constants.Dashboard.Checklist.AuditLog.ID, + "Audit Log (7)") else: Wait.text_by_id( - Constants.Dashboard.Checklist.AuditLog.ID, "Audit Log (8)") + Constants.Dashboard.Checklist.AuditLog.ID, + "Audit Log (8)") if state == "HANDOFF": if settings.DATABASE_TYPE == 'local': Wait.text_by_id( - Constants.Dashboard.Checklist.AuditLog.ID, "Audit Log (9)") + Constants.Dashboard.Checklist.AuditLog.ID, + "Audit Log (9)") else: Wait.text_by_id( - Constants.Dashboard.Checklist.AuditLog.ID, "Audit Log (10)") + Constants.Dashboard.Checklist.AuditLog.ID, + "Audit Log (10)") if state == "APPROVAL": Wait.text_by_xpath("//button[3]", "Add Next Steps") - Wait.text_by_id(Constants.Dashboard.Checklist.Reject.ID, - Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT) + Wait.text_by_id( + Constants.Dashboard.Checklist.Reject.ID, + Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT) Wait.text_by_xpath( "//div[@id='state-actions']/button", "Approve") if state == "HANDOFF": @@ -399,9 +431,10 @@ class FEChecklist: # If failed - count the failure and add the error to list of errors. except Exception as e: logger.error( - state + " state FAILED CONNECT TO STAGING MANUAL AND VERIFY WHY! ") - errorMsg = "approval_state_actions_and_validations FAILED because : " + \ - str(e) + state + + " state FAILED CONNECT TO STAGING MANUAL AND VERIFY WHY! ") + errorMsg = "approval_state_actions_and_validations " +\ + "FAILED because : " + str(e) raise Exception(errorMsg, "approval_state_actions_and_validations") @staticmethod @@ -414,7 +447,8 @@ class FEChecklist: try: if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Section 1: Parameter Specification", Get.by_css("h2.ng-binding")) + "Section 1: Parameter Specification", + Get.by_css("h2.ng-binding")) Helper.internal_assert( "Parameters", Get.by_css("span.col-md-9.ng-binding")) Helper.internal_assert( @@ -423,33 +457,38 @@ class FEChecklist: "Numeric parameters", Get.by_xpath("//li[3]/span[2]")) if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Section 2: External References", Get.by_xpath("//li[2]/h2")) + "Section 2: External References", + Get.by_xpath("//li[2]/h2")) Helper.internal_assert( "Normal references", Get.by_name("Normal references")) Helper.internal_assert( "VF image", Get.by_name("Normal references")) - except: + except BaseException: try: Helper.internal_assert( - "Section 1: External References", Get.by_css("h2.ng-binding")) - except: + "Section 1: External References", + Get.by_css("h2.ng-binding")) + except BaseException: Helper.internal_assert( - "Section 1: Scaling Considerations", Get.by_css("h2.ng-binding")) + "Section 1: Scaling Considerations", + Get.by_css("h2.ng-binding")) try: Helper.internal_assert( - "Normal references", Get.by_css("span.col-md-9.ng-binding")) - except: + "Normal references", + Get.by_css("span.col-md-9.ng-binding")) + except BaseException: if "VF image" in Get.by_xpath("//li[2]/span[2]"): logger.debug("All Ok") if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Section 2: Parameter Specification", Get.by_xpath("//li[2]/h2")) + "Section 2: Parameter Specification", + Get.by_xpath("//li[2]/h2")) Click.name("VF image") Click.name("Normal references") try: Helper.internal_assert( "1.1 - Parameters", Get.by_xpath("//header/h2")) - except: + except BaseException: text = Get.by_name("Normal references") Helper.internal_assert("Normal references", text) Helper.internal_assert("Audit Logs", Get.by_css("h3.col-md-12")) @@ -461,16 +500,18 @@ class FEChecklist: # Validate Local AuditLog engLeadFullName = DBUser.get_el_name(vfName) Helper.internal_assert( - engLeadFullName, Get.by_xpath("//ul[@id='audit-log-list']/li/h4")) + engLeadFullName, + Get.by_xpath("//ul[@id='audit-log-list']/li/h4")) Helper.internal_assert(localLogText, Get.by_css( Constants.Dashboard.Checklist.AuditLog.LastLocalAuditLog.CSS)) if settings.DATABASE_TYPE == 'local': try: Helper.internal_assert( "Parameters", Get.by_xpath("//li[2]/ul/li/span[2]")) - except: + except BaseException: Helper.internal_assert( - "Numeric parameters", Get.by_xpath("//li[2]/ul/li/span[2]")) + "Numeric parameters", + Get.by_xpath("//li[2]/ul/li/span[2]")) Click.name("Normal references") Wait.css(Constants.Dashboard.Checklist.LineItem.Deny.CSS) Wait.css(Constants.Dashboard.Checklist.LineItem.Approve.CSS) @@ -486,31 +527,37 @@ class FEChecklist: print("click on V button approve of decision in state = " + state) try: Wait.css("li.not-relevant-btn") - except: + except BaseException: Wait.xpath("//aside/header/ul/li") if state == "review": Wait.id("edit-checklist") if state == "PEER": if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Audit Log (4)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (4)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) else: Helper.internal_assert( - "Audit Log (5)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (5)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) if state == "review": if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Audit Log (2)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (2)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) else: Helper.internal_assert( - "Audit Log (3)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (3)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) if state == "APPROVAL": if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Audit Log (8)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (8)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) else: Helper.internal_assert( - "Audit Log (9)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (9)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) Click.id( Constants.Dashboard.Checklist.AuditLog.ID, wait_for_page=True) Wait.text_by_xpath("//span[2]", checklistName) @@ -520,32 +567,39 @@ class FEChecklist: if state == "review": if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Audit Log (3)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (3)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) else: Helper.internal_assert( - "Audit Log (4)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (4)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) if state == "PEER": if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Audit Log (5)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (5)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) else: Helper.internal_assert( - "Audit Log (6)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (6)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) if state == "APPROVAL": if settings.DATABASE_TYPE == 'local': Helper.internal_assert( - "Audit Log (9)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (9)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) else: Helper.internal_assert( - "Audit Log (10)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID)) + "Audit Log (10)", Get.by_id( + Constants.Dashboard.Checklist.AuditLog.ID)) # Validate Buttons if settings.DATABASE_TYPE != 'local': FEGeneral.refresh() engagement_id = DBVirtualFunction.select_eng_uuid(vfName) engLeadEmail = DBUser.select_el_email(vfName) logger.debug("EL email: " + engLeadEmail) - engagement_manual_id = DBGeneral.select_where("engagement_manual_id", "ice_engagement", - "uuid", engagement_id, 1) + engagement_manual_id = DBGeneral.select_where( + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) # Click on all default next steps myVfName = engagement_manual_id + ": " + vfName actualVfNameid = "clickable-" + myVfName @@ -553,18 +607,21 @@ class FEChecklist: Click.id("checklist-" + checklistUuid) Helper.internal_assert( "Add Next Steps", Get.by_xpath("//button[3]")) - Wait.text_by_id(Constants.Dashboard.Checklist.Reject.ID, - Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT, wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Checklist.Reject.ID, + Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT, + wait_for_page=True) Helper.internal_assert( "Approve", Get.by_xpath("//div[@id='state-actions']/button")) logger.debug("ALL VALIDATION PASS FOR STATE: " + state) # If failed - count the failure and add the error to list of errors. except Exception as e: - errorMsg = "review_state_actions_and_validations FAILED because: " + \ - str(e) + errorMsg = "review_state_actions_and_validations " +\ + "FAILED because: " + str(e) raise Exception(errorMsg, "review_state_actions_and_validations") logger.error( - state + " state FAILED CONNECT TO STAGING MANUAL AND VERIFY WHY!") + state + + " state FAILED CONNECT TO STAGING MANUAL AND VERIFY WHY!") raise @staticmethod @@ -574,9 +631,12 @@ class FEChecklist: Constants.Dashboard.Checklist.Reject.ID, wait_for_page=True) if rejectMsg: Enter.text_by_name( - Constants.Dashboard.Checklist.Reject.Modal.Comment.NAME, rejectMsg, wait_for_page=True) + Constants.Dashboard.Checklist.Reject.Modal.Comment.NAME, + rejectMsg, + wait_for_page=True) Click.id( - Constants.Dashboard.Checklist.Reject.Modal.Button.ID, wait_for_page=True) + Constants.Dashboard.Checklist.Reject.Modal.Button.ID, + wait_for_page=True) except Exception as e: errorMsg = "Failed to reject checklist." raise Exception(errorMsg, e) @@ -588,7 +648,9 @@ class FEChecklist: Enter.text_by_id("new-audit-log-text", log_txt, wait_for_page=True) Click.id("submit-new-audit-lop-text") Wait.text_by_css( - Constants.Dashboard.Checklist.AuditLog.LastLocalAuditLog.CSS, log_txt, wait_for_page=True) + Constants.Dashboard.Checklist.AuditLog.LastLocalAuditLog.CSS, + log_txt, + wait_for_page=True) return log_txt except Exception as e: errorMsg = "Failed to add audit log to line item." @@ -599,12 +661,20 @@ class FEChecklist: FEOverview.click_on_vf(user_content) if checklist_uuid is None: checklist_uuid = DBGeneral.select_where_not_and_order_by_desc( - 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', checklistName, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')[0] + 'uuid', + Constants.DBConstants.IceTables.CHECKLIST, + 'name', + checklistName, + 'state', + Constants.ChecklistStates.Archive.TEXT, + 'create_time')[0] Click.id("checklist-" + checklist_uuid, True) @staticmethod def validate_reject_is_enabled(): - return Wait.id(Constants.Dashboard.Checklist.Reject.ID, wait_for_page=True) + return Wait.id( + Constants.Dashboard.Checklist.Reject.ID, + wait_for_page=True) @staticmethod def cl_to_next_stage(actualVfNameid): @@ -619,7 +689,8 @@ class FEChecklist: 'engagement_manual_id'] + ": " + user_content['vfName'] if settings.DATABASE_TYPE != 'local': Enter.text_by_id( - Constants.Dashboard.LeftPanel.SearchBox.ID, user_content['vfName']) + Constants.Dashboard.LeftPanel.SearchBox.ID, + user_content['vfName']) Click.css(Constants.Dashboard.LeftPanel.SearchBox.Results.CSS) Wait.text_by_id( Constants.Dashboard.Overview.Title.ID, vfFullName) @@ -627,9 +698,12 @@ class FEChecklist: @staticmethod def search_by_manual_id(manual_id): Enter.text_by_id( - Constants.Dashboard.LeftPanel.SearchBox.ID, manual_id, wait_for_page=True) + Constants.Dashboard.LeftPanel.SearchBox.ID, + manual_id, + wait_for_page=True) Click.css( - Constants.Dashboard.LeftPanel.SearchBox.Results.CSS, wait_for_page=True) + Constants.Dashboard.LeftPanel.SearchBox.Results.CSS, + wait_for_page=True) Wait.id(Constants.Dashboard.Overview.Title.ID) @staticmethod @@ -638,16 +712,23 @@ class FEChecklist: vfName = newObj[0] engLeadFullName = DBUser.get_el_name(vfName) Enter.text_by_name( - Constants.Dashboard.Checklist.Reject.Modal.Comment.NAME, "Reject state By :" + engLeadFullName) + Constants.Dashboard.Checklist.Reject.Modal.Comment.NAME, + "Reject state By :" + engLeadFullName) Helper.internal_assert( - "Checklist: " + checklistName, Get.by_css("span.state-title.ng-binding")) + "Checklist: " + checklistName, + Get.by_css("span.state-title.ng-binding")) Wait.text_by_id(Constants.Dashboard.Checklist.Reject.Modal.Button.ID, Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT) Click.id(Constants.Dashboard.Checklist.Reject.Modal.Button.ID) Wait.modal_to_dissappear() @staticmethod - def add_nsteps(checklistUuid, actualVfNameid, myVfName, checklistName, newFileNames): + def add_nsteps( + checklistUuid, + actualVfNameid, + myVfName, + checklistName, + newFileNames): Click.id(actualVfNameid, wait_for_page=True) checklistUuid = DBChecklist.select_where_cl_not_archive( "uuid", "ice_checklist", "name", newFileNames[0], 1) @@ -661,30 +742,46 @@ class FEChecklist: Helper.internal_assert(myVfName, actualVfName) @staticmethod - def validate_multi_eng(user_content, checklist_content, newEL_content, actualVfNameid): + def validate_multi_eng( + user_content, + checklist_content, + newEL_content, + actualVfNameid): query = "UPDATE ice_user_profile SET role_id=2 WHERE email = '" + \ str(newEL_content['email']) + "';" DBGeneral.update_by_query(query) FEWizard.invite_team_members_modal(newEL_content['email']) # Fetch one AT&T user ID. enguuid = DBGeneral.select_where( - "uuid", "ice_engagement", "engagement_manual_id", user_content['engagement_manual_id'], 1) + "uuid", + "ice_engagement", + "engagement_manual_id", + user_content['engagement_manual_id'], + 1) invitation_token = DBUser.select_invitation_token( - "invitation_token", "ice_invitation", "engagement_uuid", enguuid, newEL_content['email'], 1) + "invitation_token", + "ice_invitation", + "engagement_uuid", + enguuid, + newEL_content['email'], + 1) URL = Constants.Default.InviteURL.Login.TEXT + invitation_token FEGeneral.re_open(URL) - FEUser.login(newEL_content[ - 'email'], Constants.Default.Password.TEXT, expected_element=actualVfNameid) + FEUser.login( + newEL_content['email'], + Constants.Default.Password.TEXT, + expected_element=actualVfNameid) Click.id(actualVfNameid, wait_for_page=True) count = None try: session.ice_driver.find_element_by_id( "checklist-" + checklist_content['uuid']) count += 1 - except: + except BaseException: logger.debug( - "check list not visible for EL invited : " + str(newEL_content['email'])) - assertTrue(count == None) + "check list not visible for EL invited : " + + str(newEL_content['email'])) + assertTrue(count is None) query = "UPDATE ice_user_profile SET role_id=1 WHERE email = '" + \ str(newEL_content['email']) + "';" DBGeneral.update_by_query(query) @@ -710,11 +807,12 @@ class FEChecklist: @staticmethod def validate_audit_log(log_txt): audit_log_list_text = Get.by_id( - Constants.Dashboard.Checklist.AuditLog.AuditLogList.ID, wait_for_page=True) + Constants.Dashboard.Checklist.AuditLog.AuditLogList.ID, + wait_for_page=True) try: log_txt in audit_log_list_text logger.debug("validate_audit_log PASS") - except Exception as e: + except Exception: errorMsg = "Failed in validate_audit_log" raise Exception(errorMsg) @@ -748,7 +846,9 @@ class FEChecklist: Constants.Dashboard.Checklist.JenkinsLog.Modal.Title.TEXT, True) log = Get.by_id( Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.ID, True) - Helper.assertTrue(Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.TEXT_SAMPLE in log, - "Jenkins log could not be viewed.") + Helper.assertTrue( + Constants.Dashboard.Checklist.JenkinsLog.Modal.Body. + TEXT_SAMPLE in log, + "Jenkins log could not be viewed.") Click.id(Constants.Dashboard.Modal.CLOSE_BUTTON_ID) return log diff --git a/services/frontend/fe_checklist_template.py b/services/frontend/fe_checklist_template.py index 19e91aa..09a497f 100644 --- a/services/frontend/fe_checklist_template.py +++ b/services/frontend/fe_checklist_template.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -37,7 +37,6 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. from selenium.webdriver.common.action_chains import ActionChains -from selenium.webdriver.common.keys import Keys from services.api.api_virtual_function import APIVirtualFunction from services.constants import Constants @@ -58,6 +57,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEChecklistTemplate: @staticmethod @@ -74,94 +74,143 @@ class FEChecklistTemplate: @staticmethod def click_on_save_and_assert_success_msg(): Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID, wait_for_page=True) + Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID, + wait_for_page=True) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID, - Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_SAVE_MSG) + Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID, + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.SUCCESS_SAVE_MSG) @staticmethod def click_on_disabled_save_and_assert_for_promp_msg(): Click.id( Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID) - session.run_negative(lambda: Click.id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID), - "Ooops modal window is opened although 'Save' button should have been disabled") + session.run_negative( + lambda: Click.id( + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.APPROVE_BTN_ID), + "Ooops modal window is opened although 'Save' " + + "button should have been disabled") @staticmethod def save_with_no_changes(): - Wait.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID, - Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN) - Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID, wait_for_page=True) - Wait.text_by_name(Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, - Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT) - Wait.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_TITLE_ID, - Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_TITLE_TEXT) Wait.text_by_id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, "Yes") + Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID, + Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID, - Constants.Dashboard.LeftPanel.EditChecklistTemplate.CL_TEMPLATE_SAVED_TXT) + Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID, + wait_for_page=True) + Wait.text_by_name( + Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, + Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT) + Wait.text_by_id( + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.APPROVE_BTN_TITLE_ID, + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.APPROVE_BTN_TITLE_TEXT) + Wait.text_by_id( + Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, + "Yes") + Click.id( + Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID, + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.CL_TEMPLATE_SAVED_TXT) @staticmethod def discard_checklist_after_modification(): Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID, wait_for_page=True) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_ID, + wait_for_page=True) Enter.text_by_id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_INPUT_ID, "ttttttt", wait_for_page=True) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_INPUT_ID, + "ttttttt", + wait_for_page=True) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_ID) Click.id( Constants.Dashboard.LeftPanel.EditChecklistTemplate.REJECT_BTN_ID) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, wait_for_page=True) + Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, + wait_for_page=True) Wait.text_by_id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID, "All changes discarded.") + Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID, + "All changes discarded.") @staticmethod def edit_template_and_save(): Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_ID) Enter.text_by_id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_INPUT_ID, "Ros Is My Mentor") + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_INPUT_ID, + "Ros Is My Mentor") FEChecklistTemplate.click_on_save_and_assert_success_msg() @staticmethod def del_lineitem_and_save(): - Click.id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID, wait_for_page=True) - Enter.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_INPUT_ID, - "Ros Is My Mentor", wait_for_page=True) - Click.id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID) + Click.id( + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_ID, + wait_for_page=True) + Enter.text_by_id( + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_INPUT_ID, + "Ros Is My Mentor", + wait_for_page=True) + Click.id( + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_ID) FEChecklistTemplate.click_on_save_and_assert_success_msg() @staticmethod def add_lineitem_and_save(): Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.ADD_LINE_ITEM_BTN, wait_for_page=True) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.ADD_LINE_ITEM_BTN, + wait_for_page=True) Click.xpath("//li[@id='select-lineitem-btn-0.1']/span[2]") Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_BTN) Enter.text_by_id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_NAME, "xxx") + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_NAME, + "xxx") Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN, wait_for_page=True) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_BTN, + wait_for_page=True) FEChecklistTemplate.click_on_save_and_assert_success_msg() @staticmethod def edit_description_lineitem_and_save(): - isBold = False desc = Helper.rand_string("randomString") Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_LINE_ITEM_ID) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_LINE_ITEM_ID) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN) - Enter.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_NAME, - Helper.rand_string("randomString")) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_BTN) + Enter.text_by_id( + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_NAME, + Helper.rand_string("randomString")) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_DESC) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_DESC) editor_element = Get.wysiwyg_element_by_id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.LINE_ITEM_DESC_TEXT_BOX) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.LINE_ITEM_DESC_TEXT_BOX) editor_element.clear() editor_element.send_keys(desc) Wait.page_has_loaded() @@ -169,49 +218,66 @@ class FEChecklistTemplate: actionChains.double_click(editor_element).perform() Wait.page_has_loaded() Click.xpath( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.WYSIWYG_BUTTON_BOLD) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.WYSIWYG_BUTTON_BOLD) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN, wait_for_page=True) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_BTN, + wait_for_page=True) isBold = Wait.is_css_exists("b") while not isBold: Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN, wait_for_page=True) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_BTN, + wait_for_page=True) actionChains.double_click(editor_element).perform() Click.xpath( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.WYSIWYG_BUTTON_BOLD, wait_for_page=True) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.WYSIWYG_BUTTON_BOLD, + wait_for_page=True) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN, wait_for_page=True) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_BTN, + wait_for_page=True) isBold = Wait.is_css_exists("b") if isBold: FEChecklistTemplate.click_on_save_and_assert_success_msg() FEGeneral.refresh() Click.name( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, wait_for_page=True) + Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, + wait_for_page=True) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_LINE_ITEM_ID, wait_for_page=True) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_LINE_ITEM_ID, + wait_for_page=True) Wait.css("b") Wait.text_by_css("b", desc, wait_for_page=True) @staticmethod def rollback_add_lineitem_and_save(): Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.DELETE_LINE_ITEM) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.DELETE_LINE_ITEM) FEChecklistTemplate.click_on_save_and_assert_success_msg() FEChecklistTemplate.rollback_to_heat_teampleate() @staticmethod def add_lineitem_and_check_db(): Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_ID) Enter.text_by_id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_INPUT_ID, "Ros Is My Mentor") + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.FIRST_SECTION_INPUT_ID, + "Ros Is My Mentor") FEChecklistTemplate.click_on_save_and_assert_success_msg() result = DBChecklist.checkChecklistIsUpdated() Helper.internal_not_equal(result, None) @staticmethod def check_cl_after_lineitem_added(): - template_name = Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT + template_name = Constants.Dashboard.LeftPanel.\ + EditChecklistTemplate.HEAT user_content = APIVirtualFunction.create_engagement() FEUser.login( Constants.Users.Admin.EMAIL, Constants.Default.Password.TEXT) @@ -220,15 +286,17 @@ class FEChecklistTemplate: engLeadEmail = DBUser.select_el_email(vfName) engagement_manual_id = DBChecklist.fetchEngManIdByEngUuid( engagement_id) - myVfName = engagement_manual_id + ": " + vfName FEOverview.click_on_vf(user_content) FEGeneral.re_open(Constants.Default.LoginURL.TEXT) FEUser.login( - engLeadEmail, Constants.Default.Password.TEXT, engagement_manual_id) + engLeadEmail, + Constants.Default.Password.TEXT, + engagement_manual_id) Click.id( Constants.Dashboard.LeftPanel.EditChecklistTemplate.DASHBOARD_ID) Enter.text_by_id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.SEARCH_ENG_ID, vfName) + Constants.Dashboard.LeftPanel.EditChecklistTemplate.SEARCH_ENG_ID, + vfName) Click.id("test_" + vfName) checklistName = FEChecklist.create_checklist( engagement_id, vfName, None, engagement_manual_id) @@ -236,19 +304,28 @@ class FEChecklistTemplate: result = DBChecklist.fetchChecklistByName(checklistName) FEUser.go_to_admin() FEChecklistTemplate.click_on_template_name_on_navigation( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, template_name) + Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, + template_name) Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN) - Enter.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_NAME, - "test_lineitem_added_and_audit_log_on_dupl_cl-NAME") + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_BTN) + Enter.text_by_id( + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_NAME, + "test_lineitem_added_and_audit_log_on_dupl_cl-NAME") Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.EDIT_LINE_ITEM_BTN) FEChecklistTemplate.click_on_save_and_assert_success_msg() Click.id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.DASHBOARD_ID) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.DASHBOARD_ID) Enter.text_by_id( - Constants.Dashboard.LeftPanel.EditChecklistTemplate.SEARCH_ENG_ID, vfName) + Constants.Dashboard.LeftPanel. + EditChecklistTemplate.SEARCH_ENG_ID, + vfName) Click.id("test_" + vfName) Click.id("checklist-" + str(result)) Helper.internal_assert( - "1. automation", session.ice_driver.find_element_by_xpath("//li[@id='']").text) + "1. automation", + session.ice_driver.find_element_by_xpath("//li[@id='']").text) diff --git a/services/frontend/fe_cms.py b/services/frontend/fe_cms.py index a86626e..9f5300d 100644 --- a/services/frontend/fe_cms.py +++ b/services/frontend/fe_cms.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -40,6 +40,7 @@ from services.constants import Constants from services.database.db_cms import DBCMS from services.frontend.base_actions.click import Click from services.frontend.base_actions.enter import Enter +from services.frontend.base_actions.get import Get from services.frontend.base_actions.wait import Wait from services.frontend.fe_dashboard import FEDashboard from services.frontend.fe_general import FEGeneral @@ -50,10 +51,12 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FECms: @staticmethod - def validate_5_last_announcement_displayed(listOfTitleAnDescriptions, user_content, last_title): + def validate_5_last_announcement_displayed( + listOfTitleAnDescriptions, user_content, last_title): last_description = listOfTitleAnDescriptions[ len(listOfTitleAnDescriptions) - 1][1] Wait.text_by_id(Constants.Toast.CMS_ID, last_title + ".") @@ -65,11 +68,19 @@ class FECms: FEUser.logout() # Validate Announcement TOAST not displayed FEUser.login(user_content['email'], Constants.Default.Password.TEXT) - session.run_negative(lambda: Wait.text_by_id(Constants.Cms.Toast_title_id, last_title), - "Last Announcement displayed in News & Announcements sections %s" % last_title) + session.run_negative( + lambda: Wait.text_by_id( + Constants.Cms.Toast_title_id, + last_title), + "Last Announcement displayed in News & Announcements sections %s" % + last_title) @staticmethod - def validate_grandchild_page(parent_title, child_title, grand_child_title, description): + def validate_grandchild_page( + parent_title, + child_title, + grand_child_title, + description): Click.id(Constants.Cms.Documentation) Click.id(parent_title) Click.id(child_title) @@ -99,7 +110,10 @@ class FECms: FEDashboard.open_documentation(title) Wait.text_by_id(title, title) logger.debug("Search Documentation by title") - Enter.text_by_id(Constants.Cms.SearchDocumentation, title, wait_for_page=True) + Enter.text_by_id( + Constants.Cms.SearchDocumentation, + title, + wait_for_page=True) Wait.text_by_id(title, title) Click.id(title, wait_for_page=True) Wait.text_by_id(title, title) @@ -110,23 +124,33 @@ class FECms: FEDashboard.open_documentation(title) Wait.text_by_id(title, title) logger.debug("Search Documentation by content") - Enter.text_by_id(Constants.Cms.SearchDocumentation, content, wait_for_page=True) + Enter.text_by_id( + Constants.Cms.SearchDocumentation, + content, + wait_for_page=True) Wait.text_by_id(title, title) Click.id(title, wait_for_page=True) Wait.text_by_css(Constants.Cms.DocumentationPageContent, content) logger.debug("Documentation found (searched by content)") @staticmethod - def validate_expired_post_Announcement(title, description): - Wait.text_by_id(Constants.Toast.CMS_ID, title + ".") + def validate_expired_post_Announcement(email, title, description): + title2 = Constants.Toast.TEXT + title + "." + Wait.text_by_id( + Constants.Toast.CMS_ID, title2, True) FEDashboard.open_announcement() Wait.text_by_id(Constants.Cms.Toast_title_id, title) Wait.text_by_id(Constants.Cms.Toast_description, description) DBCMS.update_X_days_back_post(title, xdays=3) Click.id(Constants.Cms.Test_addDT_close_modal_button) - FEGeneral.refresh() - session.run_negative(lambda: Wait.text_by_id( - Constants.Toast.CMS_ID, title + "."), "Announcement toast not disappear after 2 days %s" % title) + FEUser.logout() + FEUser.login(email, Constants.Default.Password.TEXT) + session.run_negative( + lambda: Wait.text_by_id( + Constants.Toast.CMS_ID, + title2), + "Announcement toast not disappear after 2 days %s" % + title) @staticmethod def validate_page(title, description): @@ -138,8 +162,8 @@ class FECms: @staticmethod def validate_FAQ(description): - Wait.text_by_id(Constants.Cms.Tooltip_title, "Did you know?") - Wait.text_by_id(Constants.Cms.Tooltip_description, description) + Wait.text_by_id(Constants.Cms.Tooltip_title, "Did you know?", True) + Wait.text_by_id(Constants.Cms.Tooltip_description, description, True) @staticmethod def validate_news(title, description): diff --git a/services/frontend/fe_dashboard.py b/services/frontend/fe_dashboard.py index b8d51b5..cb845cc 100644 --- a/services/frontend/fe_dashboard.py +++ b/services/frontend/fe_dashboard.py @@ -113,10 +113,12 @@ class FEDashboard: Wait.text_by_id("dashboard-title", "Statuses") Wait.id(Constants.Dashboard.Statuses.FilterDropdown.ID) Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.Statuses.FilterDropdown.ID)).select_by_visible_text("Intake") + Constants.Dashboard.Statuses.FilterDropdown.ID + )).select_by_visible_text("Intake") Wait.page_has_loaded() Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.Statuses.FilterDropdown.ID)).select_by_visible_text(stage) + Constants.Dashboard.Statuses.FilterDropdown.ID + )).select_by_visible_text(stage) Wait.id( Constants.Dashboard.Statuses.ExportExcel.ID, wait_for_page=True) countIdsActive = 0 @@ -159,7 +161,8 @@ class FEDashboard: Wait.text_by_id("dashboard-title", "Statuses") Wait.css(Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS) Select(session.ice_driver.find_element_by_css_selector( - Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS)).select_by_visible_text("All") + Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS + )).select_by_visible_text("All") engLeadID = DBUser.select_user_native_id(user_content['el_email']) countOfEngInStagePerUser = DBUser.select_all_user_engagements( engLeadID) # Scroll # @@ -171,13 +174,17 @@ class FEDashboard: # Stage Active Validation # element.location_once_scrolled_into_view Wait.css( - Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS, wait_for_page=True) + Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS, + wait_for_page=True) Select(session.ice_driver.find_element_by_css_selector( - Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS)).select_by_visible_text("Active") + Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS + )).select_by_visible_text("Active") countOfEngInStagePerUser = DBUser.select_user_engagements_by_stage( "Active", engLeadID) Wait.text_by_id( - Constants.Dashboard.Statuses.Statistics.EngagementsNumber.ID, str(countOfEngInStagePerUser), wait_for_page=True) + Constants.Dashboard.Statuses.Statistics.EngagementsNumber.ID, + str(countOfEngInStagePerUser), + wait_for_page=True) @staticmethod def search_by_vf(user_content): @@ -186,8 +193,10 @@ class FEDashboard: engSearchID = "eng-" + engName FEGeneral.re_open_not_clean_cache(Constants.Default.DashbaordURL.TEXT) logger.debug("Search engagement by engagement_manual_id") - Enter.text_by_id(Constants.Dashboard.Statuses.SearchBox.ID, - user_content['engagement_manual_id'], wait_for_page=True) + Enter.text_by_id( + Constants.Dashboard.Statuses.SearchBox.ID, + user_content['engagement_manual_id'], + wait_for_page=True) eng_manual_id = user_content['engagement_manual_id'] + ":" Wait.text_by_id(engSearchID, eng_manual_id) @@ -203,7 +212,11 @@ class FEDashboard: @staticmethod def check_vnf_version(user_content): current_vnf_value = Get.by_css( - "#progress_bar_" + user_content['engagement_manual_id'] + " ." + Constants.Dashboard.Overview.Progress.VnfVersion.CLASS, wait_for_page=True) + "#progress_bar_" + + user_content['engagement_manual_id'] + + " ." + + Constants.Dashboard.Overview.Progress.VnfVersion.CLASS, + wait_for_page=True) Helper.internal_assert(current_vnf_value, user_content['vnf_version']) @staticmethod @@ -217,7 +230,8 @@ class FEDashboard: FEUser.login(user, Constants.Default.Password.TEXT) logger.debug("Search engagement by engagement_manual_id") Enter.text_by_id( - Constants.Dashboard.Statuses.SearchBox.ID, user_content['engagement_manual_id']) + Constants.Dashboard.Statuses.SearchBox.ID, + user_content['engagement_manual_id']) eng_manual_id = user_content['engagement_manual_id'] + ":" Wait.text_by_id(engSearchID, eng_manual_id) logger.debug("Engagement found (searched by engagement_manual_id)") @@ -225,7 +239,8 @@ class FEDashboard: logger.debug("Search engagement by VF name") # Search by VF name. Enter.text_by_id( - Constants.Dashboard.Statuses.SearchBox.ID, user_content['vfName']) + Constants.Dashboard.Statuses.SearchBox.ID, + user_content['vfName']) Wait.text_by_id(engSearchID, eng_manual_id) logger.debug("Engagement found (searched by VF name)") FEGeneral.smart_refresh() @@ -254,8 +269,10 @@ class FEDashboard: @staticmethod def check_if_creator_of_NS_is_the_EL(user_content): logger.debug( - " > Check if creator of NS is the EL " + user_content['el_name']) - if (user_content['el_name'] not in Get.by_name("creator-full-name-" + user_content['el_name'])): + " > Check if creator of NS is the EL " + + user_content['el_name']) + if (user_content['el_name'] not in Get.by_name( + "creator-full-name-" + user_content['el_name'])): logger.error("EL is not the creator of the NS according to UI.") raise @@ -264,7 +281,9 @@ class FEDashboard: engName = engagement_manual_id + ": " + vf_name # Search by VF name. Enter.text_by_id( - Constants.Dashboard.Statuses.SearchBox.ID, vf_name, wait_for_page=True) + Constants.Dashboard.Statuses.SearchBox.ID, + vf_name, + wait_for_page=True) Wait.id("eng-" + engName, wait_for_page=True) Click.id("eng-" + engName, wait_for_page=True) Wait.text_by_id( @@ -279,8 +298,10 @@ class FEDashboard: # Click.id(Constants.Dashboard.Default.DASHBOARD_ID) Wait.page_has_loaded() if is_negative: - session.run_negative(lambda: Wait.id( - Constants.Dashboard.Default.STATISTICS), "Negative test failed at Statistics appears") + session.run_negative( + lambda: Wait.id( + Constants.Dashboard.Default.STATISTICS), + "Negative test failed at Statistics appears") else: Wait.id(Constants.Dashboard.Default.STATISTICS) diff --git a/services/frontend/fe_detailed_view.py b/services/frontend/fe_detailed_view.py index 556e7a7..49a3523 100644 --- a/services/frontend/fe_detailed_view.py +++ b/services/frontend/fe_detailed_view.py @@ -70,44 +70,61 @@ class FEDetailedView: @staticmethod def update_aic_version(): Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID, - Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True) - Select(session.ice_driver.find_element_by_id(Constants.Dashboard.DetailedView.AIC.Dropdown.ID) - ).select_by_visible_text(Constants.Dashboard.DetailedView.ValidationDetails.TargetAICVersion.AIC3) + Constants.Dashboard.DetailedView.ValidationDetails.PLUS, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Modal.TITLE_ID, + Constants.Dashboard.DetailedView.ValidationDetails.TITLE, + wait_for_page=True) + Select( + session.ice_driver.find_element_by_id( + Constants.Dashboard.DetailedView.AIC.Dropdown.ID + )).select_by_visible_text( + Constants.Dashboard.DetailedView.ValidationDetails. + TargetAICVersion.AIC3) Click.xpath("//option[3]", wait_for_page=True) Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.SAVE, wait_for_page=True) + Constants.Dashboard.DetailedView.ValidationDetails.SAVE, + wait_for_page=True) @staticmethod def open_validation_details(): Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID, - Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True) + Constants.Dashboard.DetailedView.ValidationDetails.PLUS, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Modal.TITLE_ID, + Constants.Dashboard.DetailedView.ValidationDetails.TITLE, + wait_for_page=True) @staticmethod def save_validation_details(): Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.SAVE, wait_for_page=True) + Constants.Dashboard.DetailedView.ValidationDetails.SAVE, + wait_for_page=True) @staticmethod def update_target_lab_entry(): Click.id( - Constants.Dashboard.DetailedView.TargetLabEntry.CHANGE, wait_for_page=True) + Constants.Dashboard.DetailedView.TargetLabEntry.CHANGE, + wait_for_page=True) Enter.date_picker( '#lab-entry-date', 'vm.targetLabDate', wait_for_page=True) Click.css( - Constants.Dashboard.DetailedView.TargetLabEntry.INPUT_CSS, wait_for_page=True) + Constants.Dashboard.DetailedView.TargetLabEntry.INPUT_CSS, + wait_for_page=True) Click.css(Constants.SubmitButton.CSS, wait_for_page=True) actualDate = Get.by_css( - Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS, wait_for_page=True) + Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS, + wait_for_page=True) return str(actualDate) @staticmethod def validate_target_lab_entry(date): - Wait.text_by_css(Constants.Dashboard.DetailedView.TargetLabEntry.CSS, - Constants.Dashboard.DetailedView.TargetLabEntry.TEXT, wait_for_page=True) + Wait.text_by_css( + Constants.Dashboard.DetailedView.TargetLabEntry.CSS, + Constants.Dashboard.DetailedView.TargetLabEntry.TEXT, + wait_for_page=True) actualDate = Get.by_css( Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS) Helper.internal_assert(actualDate, date) @@ -117,27 +134,47 @@ class FEDetailedView: count = 0 try: Click.id(Constants.Dashboard.DetailedView.ValidationDetails.PLUS) - Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID, - Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Modal.TITLE_ID, + Constants.Dashboard.DetailedView.ValidationDetails.TITLE, + wait_for_page=True) Click.id( - Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID, wait_for_page=True) + Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID, + wait_for_page=True) Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID)).select_by_visible_text(EcompName) - Click.id(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID_ECOMP + - EcompName, wait_for_page=True) + Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID + )).select_by_visible_text(EcompName) + Click.id( + Constants.Dashboard.DetailedView.ValidationDetails. + ECOMPRelease.ID_ECOMP + + EcompName, + wait_for_page=True) count += 1 - Wait.id(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID_ECOMP + - Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.UNKNOW, wait_for_page=True) - Select(session.ice_driver.find_element_by_id(Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID) - ).select_by_visible_text(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.UNKNOW) - Click.id(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID_ECOMP + - Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.UNKNOW, wait_for_page=True) + Wait.id( + Constants.Dashboard.DetailedView.ValidationDetails. + ECOMPRelease.ID_ECOMP + + Constants.Dashboard.DetailedView.ValidationDetails. + ECOMPRelease.UNKNOW, + wait_for_page=True) + Select( + session.ice_driver.find_element_by_id( + Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID + )).select_by_visible_text( + Constants.Dashboard.DetailedView.ValidationDetails. + ECOMPRelease.UNKNOW) + Click.id( + Constants.Dashboard.DetailedView.ValidationDetails. + ECOMPRelease.ID_ECOMP + + Constants.Dashboard.DetailedView.ValidationDetails. + ECOMPRelease.UNKNOW, + wait_for_page=True) count += 1 Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.SAVE, wait_for_page=True) + Constants.Dashboard.DetailedView.ValidationDetails.SAVE, + wait_for_page=True) Helper.internal_assert(count, 2) # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "Failed in update_ecomp_release ." raise Exception(errorMsg) @@ -145,20 +182,28 @@ class FEDetailedView: def update_vf_version(): try: Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID, - Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True) + Constants.Dashboard.DetailedView.ValidationDetails.PLUS, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Modal.TITLE_ID, + Constants.Dashboard.DetailedView.ValidationDetails.TITLE, + wait_for_page=True) newVFVersionName = "newVFVersionName-" + \ Helper.rand_string("randomString") Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.ID_VERSION) + Constants.Dashboard.DetailedView.ValidationDetails. + VFVersion.ID_VERSION) Enter.text_by_id( - Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.ID_VERSION, newVFVersionName, wait_for_page=True) + Constants.Dashboard.DetailedView.ValidationDetails. + VFVersion.ID_VERSION, + newVFVersionName, + wait_for_page=True) Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.SAVE, wait_for_page=True) + Constants.Dashboard.DetailedView.ValidationDetails.SAVE, + wait_for_page=True) return newVFVersionName # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "Failed in update_ecomp_release ." raise Exception(errorMsg) @@ -166,68 +211,100 @@ class FEDetailedView: def validate_aic_version(): FEGeneral.refresh() Wait.id( - Constants.Dashboard.DetailedView.AIC.ID + "3.0", wait_for_page=True) + Constants.Dashboard.DetailedView.AIC.ID + + "3.0", + wait_for_page=True) @staticmethod def validate_ecomp_version(): FEGeneral.refresh() - Wait.id(Constants.Dashboard.DetailedView.ECOMP.ID + - Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.UNKNOW, wait_for_page=True) + Wait.id( + Constants.Dashboard.DetailedView.ECOMP.ID + + Constants.Dashboard.DetailedView.ValidationDetails. + ECOMPRelease.UNKNOW, + wait_for_page=True) @staticmethod def validate_vf_version(newVFVersionName): FEGeneral.refresh() - Wait.id(Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.VF_VERSION_ID + - newVFVersionName, wait_for_page=True) + Wait.id( + Constants.Dashboard.DetailedView.ValidationDetails. + VFVersion.VF_VERSION_ID + + newVFVersionName, + wait_for_page=True) @staticmethod def validate_all_titles_on_dv_form(): - Wait.text_by_id(Constants.Dashboard.DetailedView.DeploymentTarget.ID, - Constants.Dashboard.DetailedView.DeploymentTarget.TEXT, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.DetailedView.VirtualFunctionComponents.ID, - Constants.Dashboard.DetailedView.VirtualFunctionComponents.TEXT) + Wait.text_by_id( + Constants.Dashboard.DetailedView.DeploymentTarget.ID, + Constants.Dashboard.DetailedView.DeploymentTarget.TEXT, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.DetailedView.VirtualFunctionComponents.ID, + Constants.Dashboard.DetailedView.VirtualFunctionComponents.TEXT) Wait.text_by_id(Constants.Dashboard.DetailedView.TargetLabEntry.ID, Constants.Dashboard.DetailedView.TargetLabEntry.TEXT) - Wait.text_by_id(Constants.Dashboard.DetailedView.ValidationDetails.ID, - Constants.Dashboard.DetailedView.ValidationDetails.TEXT) - Wait.text_by_id(Constants.Dashboard.DetailedView.ValidationDetails.TargetAICVersion.ID, - Constants.Dashboard.DetailedView.ValidationDetails.TargetAICVersion.TEXT) - Wait.text_by_id(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID, - Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.TEXT) - Wait.text_by_id(Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.ID, - Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.TEXT, wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.DetailedView.ValidationDetails.ID, + Constants.Dashboard.DetailedView.ValidationDetails.TEXT) + Wait.text_by_id( + Constants.Dashboard.DetailedView.ValidationDetails. + TargetAICVersion.ID, + Constants.Dashboard.DetailedView.ValidationDetails. + TargetAICVersion.TEXT) + Wait.text_by_id( + Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID, + Constants.Dashboard.DetailedView.ValidationDetails. + ECOMPRelease.TEXT) + Wait.text_by_id( + Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.ID, + Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.TEXT, + wait_for_page=True) @staticmethod def add_deployment_target(user_content): Click.id(Constants.Dashboard.DetailedView.TargetLabEntry.Add.ID) Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID, - Constants.Dashboard.DetailedView.DeploymentTarget.TITLE) + Constants.Dashboard.DetailedView.DeploymentTarget. + TITLE) # FIXME: empty drop-down, tests will fail. Select(session.ice_driver.find_element_by_xpath( "//select")).select_by_visible_text("Lisle (DPA3)") Click.id( - Constants.Dashboard.DetailedView.DeploymentTarget.SAVE, wait_for_page=True) + Constants.Dashboard.DetailedView.DeploymentTarget.SAVE, + wait_for_page=True) Wait.text_by_css( - Constants.Dashboard.DetailedView.DeploymentTarget.CSS, "Lisle (DPA3)", wait_for_page=True) + Constants.Dashboard.DetailedView.DeploymentTarget.CSS, + "Lisle (DPA3)", + wait_for_page=True) Wait.text_by_id(Constants.Dashboard.DetailedView.AIC.ID + user_content['target_aic'], user_content['target_aic']) e2edate = FEGeneral.date_short_formatter() Wait.text_by_css( - Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS, e2edate) + Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS, + e2edate) @staticmethod def remove_deployment_target(user_content): Wait.text_by_id( "visible-dts-Lisle (DPA3)", "Lisle (DPA3)", wait_for_page=True) dt_site_id = DBGeneral.select_query( - "SELECT uuid FROM public.ice_deployment_target_site where name = 'Lisle (DPA3)'") + "SELECT uuid FROM public.ice_deployment_target_site where name" + + " = 'Lisle (DPA3)'") Click.id("visible-dts-Lisle (DPA3)") Wait.id( - Constants.Dashboard.DetailedView.DeploymentTarget.ID_REMOVE_DTS + dt_site_id) - Click.id(Constants.Dashboard.DetailedView.DeploymentTarget.ID_REMOVE_DTS + - dt_site_id, wait_for_page=True) - session.run_negative(lambda: Wait.text_by_id( - "visible-dts-Lisle (DPA3)", "Lisle (DPA3)", wait_for_page=True), "Negative test failed at wait text Lisle (DPA3)") + Constants.Dashboard.DetailedView.DeploymentTarget.ID_REMOVE_DTS + + dt_site_id) + Click.id( + Constants.Dashboard.DetailedView.DeploymentTarget.ID_REMOVE_DTS + + dt_site_id, + wait_for_page=True) + session.run_negative( + lambda: Wait.text_by_id( + "visible-dts-Lisle (DPA3)", + "Lisle (DPA3)", + wait_for_page=True), + "Negative test failed at wait text Lisle (DPA3)") @staticmethod def add_vfc(): @@ -237,7 +314,8 @@ class FEDetailedView: session.ice_driver.find_element_by_name("extRefID").click() Enter.text_by_name("extRefID", Helper.rand_string("randomNumber")) Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text(ServiceProvider.MainServiceProvider) + Constants.Dashboard.DetailedView.VFC.Choose_Company.ID + )).select_by_visible_text(ServiceProvider.MainServiceProvider) Click.id(Constants.Dashboard.DetailedView.VFC.Save_button.ID) return vfcName @@ -251,7 +329,8 @@ class FEDetailedView: Click.name("extRefID", wait_for_page=True) Enter.text_by_name("extRefID", extRefID, wait_for_page=True) Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text("Amdocs") + Constants.Dashboard.DetailedView.VFC.Choose_Company.ID + )).select_by_visible_text("Amdocs") Wait.text_by_css("span.add-text", "Add VFC", wait_for_page=True) Click.css("span.add-text", wait_for_page=True) logger.debug("Add VFC no.2") @@ -260,7 +339,8 @@ class FEDetailedView: Enter.text_by_xpath("//div[2]/ng-form/div[2]/input", "loka2") Enter.text_by_xpath("//div[2]/ng-form/div[4]/input", "companyManual2") Click.id( - Constants.Dashboard.DetailedView.VFC.Save_button.ID, wait_for_page=True) + Constants.Dashboard.DetailedView.VFC.Save_button.ID, + wait_for_page=True) @staticmethod def remove_vfc(user_content): @@ -268,7 +348,8 @@ class FEDetailedView: "uuid", "ice_vf", "name", user_content['vfName'], 1) djoni_uuid = None counter = 0 - while not djoni_uuid and counter <= Constants.DBConstants.RETRIES_NUMBER: + while not djoni_uuid and counter <= Constants.DBConstants.\ + RETRIES_NUMBER: time.sleep(session.wait_until_time_pause_long) djoni_uuid = DBGeneral.select_where_and( "uuid", "ice_vfc", "vf_id", vf_id, "name", "djoni", 1) @@ -282,7 +363,9 @@ class FEDetailedView: Wait.text_by_id(Constants.Dashboard.DetailedView.VFC.ID + "djoni2", "djoni2 (loka2)", wait_for_page=True) Click.id( - Constants.Dashboard.DetailedView.VFC.ID + "djoni", wait_for_page=True) + Constants.Dashboard.DetailedView.VFC.ID + + "djoni", + wait_for_page=True) Click.id(Constants.Dashboard.DetailedView.VFC.Remove.ID + djoni_uuid, wait_for_page=True) @@ -295,7 +378,8 @@ class FEDetailedView: FEDetailedView.search_vf_and_go_to_detailed_view( user_content['engagement_manual_id'], user_content['vfName']) Wait.id( - Constants.Dashboard.DetailedView.DeploymentTarget.AddDeploymentTargetButton.ID) + Constants.Dashboard.DetailedView.DeploymentTarget. + AddDeploymentTargetButton.ID) @staticmethod def add_remove_deployment_targets(user_content, users): @@ -316,44 +400,72 @@ class FEDetailedView: FEUser.login(user, Constants.Default.Password.TEXT) FEDetailedView.search_vf_and_go_to_detailed_view( user_content['engagement_manual_id'], user_content['vfName']) - session.run_negative(lambda: Click.id(Constants.Dashboard.DetailedView.DeploymentTarget.AddDeploymentTargetButton.ID), - "Negative test failed at click_on_ deployment-targets with user %s" % user) + session.run_negative( + lambda: Click.id( + Constants.Dashboard.DetailedView.DeploymentTarget. + AddDeploymentTargetButton.ID), + "Negative test failed at click_on_ deployment-targets " + + "with user %s" % user) @staticmethod def click_on_update_aic_version(): Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID, - Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True) + Constants.Dashboard.DetailedView.ValidationDetails.PLUS, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Modal.TITLE_ID, + Constants.Dashboard.DetailedView.ValidationDetails.TITLE, + wait_for_page=True) @staticmethod def click_on_update_ecomp_release(): Click.id( - Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID, - Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True) + Constants.Dashboard.DetailedView.ValidationDetails.PLUS, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Modal.TITLE_ID, + Constants.Dashboard.DetailedView.ValidationDetails.TITLE, + wait_for_page=True) @staticmethod def select_aic_version_from_list(aic_version): Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.DetailedView.AIC.Dropdown.ID)).select_by_visible_text(aic_version) + Constants.Dashboard.DetailedView.AIC.Dropdown.ID + )).select_by_visible_text(aic_version) @staticmethod def compare_aic_selected_version(expected_aic_version): - Helper.internal_assert(Get.by_id( - Constants.Dashboard.DetailedView.AIC.ID + expected_aic_version), expected_aic_version) + Helper.internal_assert( + Get.by_id( + Constants.Dashboard.DetailedView.AIC.ID + + expected_aic_version), + expected_aic_version) @staticmethod def compare_selected_ecomp_release(expected_ecomp_release): - Helper.internal_assert(Get.by_id( - Constants.Dashboard.DetailedView.ECOMP.ID + expected_ecomp_release), expected_ecomp_release) + Helper.internal_assert( + Get.by_id( + Constants.Dashboard.DetailedView.ECOMP.ID + + expected_ecomp_release), + expected_ecomp_release) @staticmethod def validate_deprecated_aic_version_in_dropdown(expected_aic_version): - Helper.internal_assert(Get.by_id(Constants.Dashboard.DetailedView.AIC.Dropdown.UniversalVersion.ID % - expected_aic_version), "AIC " + expected_aic_version + " - Deprecated") + Helper.internal_assert( + Get.by_id( + Constants.Dashboard.DetailedView.AIC.Dropdown. + UniversalVersion.ID % + expected_aic_version), + "AIC " + + expected_aic_version + + " - Deprecated") @staticmethod def validate_deprecated_ecomp_release_in_dropdown(expected_ecomp_release): - Helper.internal_assert(Get.by_id(Constants.Dashboard.DetailedView.ECOMP.Dropdown.UniversalRelease.ID % - expected_ecomp_release), expected_ecomp_release + " - Deprecated") + Helper.internal_assert( + Get.by_id( + Constants.Dashboard.DetailedView.ECOMP.Dropdown. + UniversalRelease.ID % + expected_ecomp_release), + expected_ecomp_release + + " - Deprecated") diff --git a/services/frontend/fe_general.py b/services/frontend/fe_general.py index b3f97b4..10f9c72 100644 --- a/services/frontend/fe_general.py +++ b/services/frontend/fe_general.py @@ -39,7 +39,6 @@ import json import time -from django.conf import settings from selenium.webdriver.support.select import Select from services.constants import Constants @@ -87,7 +86,7 @@ class FEGeneral(Helper): session.ice_driver.get(reopen_url) session.ice_driver.maximize_window() Wait.page_has_loaded() - except Exception as e: + except Exception: errorMsg = "Could not reopen requested page" raise Exception(errorMsg, reopen_url) @@ -97,7 +96,7 @@ class FEGeneral(Helper): # Open FireFox with requested URL. session.ice_driver.get(url) session.ice_driver.maximize_window() - except: + except BaseException: errorMsg = "Could not reopen requested page" raise Exception(errorMsg, url) logger.debug("Moving to next test case") @@ -139,7 +138,9 @@ class FEGeneral(Helper): def go_to_signup_from_login(): Click.link_text(Constants.Login.Signup.LINK_TEXT, wait_for_page=True) Wait.text_by_css( - Constants.Signup.Title.CSS, Constants.Signup.Title.TEXT, wait_for_page=True) + Constants.Signup.Title.CSS, + Constants.Signup.Title.TEXT, + wait_for_page=True) @staticmethod def form_enter_name(name): @@ -213,7 +214,8 @@ class FEGeneral(Helper): def send_reset_password(email): FEGeneral.go_to_reset_password_from_login() Wait.text_by_css( - Constants.ResetPassword.Title.CSS, Constants.ResetPassword.Title.TEXT) + Constants.ResetPassword.Title.CSS, + Constants.ResetPassword.Title.TEXT) Enter.text_by_name(Constants.ResetPassword.Email.NAME, email) Wait.text_by_css( Constants.SubmitButton.CSS, Constants.ResetPassword.Button.TEXT) @@ -275,4 +277,5 @@ class FEGeneral(Helper): Helper.assertTrue( False, "%s does not exist over the client's side" % item) logger.debug( - "verify_existing_files_in_list succeeded, All vf repo files are available for choosing.") + "verify_existing_files_in_list succeeded, " + + "All vf repo files are available for choosing.") diff --git a/services/frontend/fe_invite.py b/services/frontend/fe_invite.py index cb84262..061df73 100644 --- a/services/frontend/fe_invite.py +++ b/services/frontend/fe_invite.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -36,12 +36,8 @@ # ============LICENSE_END============================================ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -from asyncio.tasks import wait - -from selenium.webdriver.support.select import Select from services.api.api_user import APIUser -from services.api.api_virtual_function import APIVirtualFunction from services.constants import Constants, ServiceProvider from services.database.db_general import DBGeneral from services.database.db_user import DBUser @@ -59,6 +55,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEInvite: @staticmethod @@ -69,14 +66,21 @@ class FEInvite: Click.id(vf_left_nav_id) FEWizard.invite_team_members_modal(user_content[1]['email']) # self.sleep(1) # TODO need to wait until modal window is closed. - invitation_token = DBUser.select_invitation_token("invitation_token", "ice_invitation", "engagement_uuid", - user_content[0]['engagement_uuid'], user_content[1]['email'], 1) + invitation_token = DBUser.select_invitation_token( + "invitation_token", + "ice_invitation", + "engagement_uuid", + user_content[0]['engagement_uuid'], + user_content[1]['email'], + 1) inviterURL = Constants.Default.InviteURL.Login.TEXT + invitation_token FEGeneral.re_open(inviterURL) # Login with 2nd user # title_id = "title-id-" + engName FEUser.login( - user_content[1]['email'], Constants.Default.Password.TEXT, title_id) + user_content[1]['email'], + Constants.Default.Password.TEXT, + title_id) Click.id(vf_left_nav_id) actualVfName = Get.by_id(vf_left_nav_id) Helper.internal_assert(engName, actualVfName) @@ -103,17 +107,23 @@ class FEInvite: Click.id(vf_left_nav_id) Click.id(Constants.Dashboard.Overview.TeamMember.ID) Wait.text_by_css(Constants.Dashboard.Wizard.Title.CSS, - Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT) + Constants.Dashboard.Wizard.InviteTeamMembers. + Title.TEXT) Enter.text_by_name("email", user_content[1]['email']) - Wait.text_by_css(Constants.SubmitButton.CSS, - Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT) + Wait.text_by_css( + Constants.SubmitButton.CSS, + Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT) Click.css(Constants.SubmitButton.CSS) Wait.id(Constants.Toast.ID) Helper.internal_assert( Get.by_id(Constants.Toast.ID), "Invite couldn't be created") @staticmethod - def invite_x_users_from_tm(list_of_invite_emails, countofUser, countOfem, num): + def invite_x_users_from_tm( + list_of_invite_emails, + countofUser, + countOfem, + num): Enter.text_by_name( "email", list_of_invite_emails[countofUser], wait_for_page=True) for _ in range(num): @@ -121,13 +131,14 @@ class FEInvite: session.run_negative( lambda: Click.css("span.add-icon"), "css appears") break - except: # button exists + except BaseException: # button exists pass countofUser += 1 # Click.css("span.add-icon") Wait.xpath("//fieldset[" + str(countOfem) + "]/div/input") Enter.text_by_xpath( - "//fieldset[" + str(countOfem) + "]/div/input", list_of_invite_emails[countofUser]) + "//fieldset[" + str(countOfem) + "]/div/input", + list_of_invite_emails[countofUser]) countOfem += 1 Click.css(Constants.SubmitButton.CSS, wait_for_page=True) @@ -160,11 +171,23 @@ class FEInvite: sponsor = [ServiceProvider.MainServiceProvider, 'aaaaaa', inviteEmail, '3058000000'] invitation_token = DBUser.select_invitation_token( - "invitation_token", "ice_invitation", "engagement_uuid", engagement_id, inviteEmail, 1) + "invitation_token", + "ice_invitation", + "engagement_uuid", + engagement_id, + inviteEmail, + 1) signUpURLforContact = DBUser.get_contact_signup_url( - invitation_token, uuid, sponsor[2], sponsor[1], sponsor[3], sponsor[0]) + invitation_token, uuid, sponsor[2], sponsor[1], + sponsor[3], sponsor[0]) APIUser.signup_invited_user( - sponsor[0], inviteEmail, invitation_token, signUpURLforContact, user_content, True, wait_for_gitlab=False) + sponsor[0], + inviteEmail, + invitation_token, + signUpURLforContact, + user_content, + True, + wait_for_gitlab=False) activationUrl2 = DBUser.get_activation_url(sponsor[2]) FEGeneral.re_open(activationUrl2) # Login with 2nd user # engName = engagement_manual_id + ": " + vflist[0] @@ -175,22 +198,24 @@ class FEInvite: engagement_id = DBGeneral.select_where( "engagement_id", "ice_vf", "name", vfName, 1) engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) engName = engagement_manual_id + ": " + vfName vf_left_nav_id = "clickable-" + engName Click.id(vf_left_nav_id, wait_for_page=True) @staticmethod def invite_x_users_and_verify_VF_appers_for_invited(user_content, engName): - inviteEmail = Helper.rand_string('randomString') + "@" \ - + ServiceProvider.email + inviteEmail = Helper.rand_string( + 'randomString') + "@" + ServiceProvider.email vflist = FEInvite.create_x_vfs(user_content, engName, x=3) for vfName in vflist: # Fetch one AT&T user ID. engagement_id = DBGeneral.select_where( "engagement_id", "ice_vf", "name", vfName, 1) engagement_manual_id = DBGeneral.select_where( - "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) + "engagement_manual_id", "ice_engagement", "uuid", + engagement_id, 1) engName = engagement_manual_id + ": " + vfName vf_left_nav_id = "clickable-" + engName Click.id(vf_left_nav_id) diff --git a/services/frontend/fe_next_step.py b/services/frontend/fe_next_step.py index be59949..b10735d 100644 --- a/services/frontend/fe_next_step.py +++ b/services/frontend/fe_next_step.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -50,7 +50,9 @@ class FENextStep(object): def check_select_deselect_all_files(): Click.id(Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.ID) Click.link_text( - Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.SELECT_ALL_FILES_NAME) + Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles. + SELECT_ALL_FILES_NAME) Wait.text_by_id( Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.ID, - Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.ALL_FILES_SELECTED) + Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles. + ALL_FILES_SELECTED) diff --git a/services/frontend/fe_overview.py b/services/frontend/fe_overview.py index 3764e2c..15f8725 100644 --- a/services/frontend/fe_overview.py +++ b/services/frontend/fe_overview.py @@ -65,8 +65,10 @@ class FEOverview: def click_on_vf(user_content): vfFullName = user_content[ 'engagement_manual_id'] + ": " + user_content['vfName'] - Enter.text_by_id(Constants.Dashboard.LeftPanel.SearchBox.ID, user_content[ - 'vfName'], True) + Enter.text_by_id( + Constants.Dashboard.LeftPanel.SearchBox.ID, + user_content['vfName'], + True) Click.id(Constants.Dashboard.LeftPanel.SearchBox.Results.ID % user_content['vfName'], True) Wait.text_by_id( @@ -78,22 +80,28 @@ class FEOverview: "Go to engagement's overview by clicking on the created Next Step") Click.name(user_content['engagement_manual_id'], wait_for_page=True) Wait.text_by_id( - Constants.Dashboard.Overview.Title.ID, user_content['engagement_manual_id'] + ":", wait_for_page=True) + Constants.Dashboard.Overview.Title.ID, + user_content['engagement_manual_id'] + ":", + wait_for_page=True) FEGeneral.re_open(Constants.Default.LoginURL.TEXT) logger.debug("Login with EL user " + user_content['el_email']) FEUser.login(user_content['el_email'], Constants.Default.Password.TEXT) # Query to select all assigned next steps on TODO state # el_native_id = str(DBGeneral.select_where( "id", "ice_user_profile", "email", user_content['el_email'], 1)) - queryStr = "SELECT count(*) FROM ice_user_profile AS users, ice_next_step_assignees AS assignees, ice_next_step AS ns WHERE users.id=" + \ + queryStr = "SELECT count(*) FROM ice_user_profile AS users, " +\ + "ice_next_step_assignees AS assignees, " +\ + "ice_next_step AS ns WHERE users.id=" + \ el_native_id + \ - " AND users.id=assignees.iceuserprofile_id AND assignees.nextstep_id=ns.uuid AND ns.state='Incomplete';" + " AND users.id=assignees.iceuserprofile_id " +\ + "AND assignees.nextstep_id=ns.uuid AND ns.state='Incomplete';" el_assigned_ns = str(DBGeneral.select_query(queryStr)) logger.debug("el_assigned_ns=" + el_assigned_ns) Wait.page_has_loaded() if (int(el_assigned_ns) >= 5): logger.debug( - "EL has 5 or more assigned next steps, checking that only 5 are shown") + "EL has 5 or more assigned next steps, " + + "checking that only 5 are shown") ns_list = Get.by_id("next-steps-list") if (ns_list.count("Engagement - ") > 5): logger.error("More than 5 next steps are listed in dashboard.") @@ -119,15 +127,25 @@ class FEOverview: @staticmethod def check_stage_next_steps(stage, engagement_uuid): - ns_list = DBGeneral.select_where_and("description", "ice_next_step", - "engagement_id", engagement_uuid, - "engagement_stage", stage, 0) # List of next steps from DB. + ns_list = DBGeneral.select_where_and( + "description", + "ice_next_step", + "engagement_id", + engagement_uuid, + "engagement_stage", + stage, + 0) # List of next steps from DB. logger.debug("Got list of Next Steps for current stage " + stage) for i in range(len(ns_list)): ns_description = ns_list[i] # Value number i from the list. - ns_uuid = DBGeneral.select_where_and("uuid", "ice_next_step", - "engagement_id", engagement_uuid, - "description", ns_description, 1) + ns_uuid = DBGeneral.select_where_and( + "uuid", + "ice_next_step", + "engagement_id", + engagement_uuid, + "description", + ns_description, + 1) logger.debug( "Compare presented text of next step with the text from DB.") portal_ns = Get.by_id("step-" + ns_uuid) @@ -146,10 +164,14 @@ class FEOverview: lambda: Wait.id(txtLine2ID), "Error: modal window opened.") else: Wait.text_by_id( - txtLine2ID, "Are you sure you want to set the Engagement's stage to " + next_stage + "?") + txtLine2ID, + "Are you sure you want to set the Engagement's stage to " + + next_stage + + "?") # Click on Approve (after validations inside window). Click.xpath( - Constants.Dashboard.Overview.Stage.Approve.XPATH, wait_for_page=True) + Constants.Dashboard.Overview.Stage.Approve.XPATH, + wait_for_page=True) @staticmethod def check_progress(expected_progress): @@ -166,12 +188,14 @@ class FEOverview: @staticmethod def set_progress(new_value): Click.id(Constants.Dashboard.Overview.Progress.Change.ID) - Helper.internal_assert(Constants.Dashboard.Overview.Progress.Wizard.Title.TEXT, - Get.by_id(Constants.Dashboard.Modal.TITLE_ID)) + Helper.internal_assert( + Constants.Dashboard.Overview.Progress.Wizard.Title.TEXT, Get.by_id( + Constants.Dashboard.Modal.TITLE_ID)) Enter.text_by_name( Constants.Dashboard.Overview.Progress.Wizard.NAME, new_value) - Wait.text_by_css(Constants.SubmitButton.CSS, - Constants.Dashboard.Overview.Progress.Wizard.Button.TEXT) + Wait.text_by_css( + Constants.SubmitButton.CSS, + Constants.Dashboard.Overview.Progress.Wizard.Button.TEXT) Click.css(Constants.SubmitButton.CSS) Wait.modal_to_dissappear() @@ -182,7 +206,8 @@ class FEOverview: Wait.text_by_id( Constants.Dashboard.GeneralPrompt.Title.ID, "Delete Step") Click.id( - Constants.Dashboard.GeneralPrompt.ApproveButton.ID, wait_for_page=True) + Constants.Dashboard.GeneralPrompt.ApproveButton.ID, + wait_for_page=True) Wait.id_to_dissappear("test_" + next_step_uuid) @staticmethod @@ -194,56 +219,83 @@ class FEOverview: def click_on_archeive_engagement_from_dropdown(): FEOverview.click_on_admin_dropdown() Click.link_text( - Constants.Dashboard.Overview.AdminDropdown.ArchiveEngagement.LINK_TEXT, wait_for_page=True) + Constants.Dashboard.Overview.AdminDropdown. + ArchiveEngagement.LINK_TEXT, + wait_for_page=True) @staticmethod def archive_engagement_modal(engagement_manual_id, vf_name): - Wait.text_by_id(Constants.Dashboard.Overview.AdminDropdown.ArchiveEngagement.Wizard.Title.ID, - Constants.Dashboard.Overview.AdminDropdown.ArchiveEngagement.Wizard.Title.TEXT) + Wait.text_by_id( + Constants.Dashboard.Overview.AdminDropdown. + ArchiveEngagement.Wizard.Title.ID, + Constants.Dashboard.Overview.AdminDropdown. + ArchiveEngagement.Wizard.Title.TEXT) random_reason = Helper.rand_string() - Enter.text_by_name(Constants.Dashboard.Overview.AdminDropdown.ArchiveEngagement.Wizard.Reason.NAME, - random_reason) + Enter.text_by_name( + Constants.Dashboard.Overview.AdminDropdown. + ArchiveEngagement.Wizard.Reason.NAME, + random_reason) Click.id(Constants.SubmitButton.ID) - Wait.text_by_id(Constants.Toast.ID, "Engagement '%s: %s' archived successfully." % - (engagement_manual_id, vf_name)) - query = "select archived_time,archive_reason from ice_engagement where engagement_manual_id='{engagement_manual_id}'".format( - engagement_manual_id=engagement_manual_id) + Wait.text_by_id( + Constants.Toast.ID, + "Engagement '%s: %s' archived successfully." % ( + engagement_manual_id, vf_name)) + query = "select archived_time,archive_reason from " +\ + "ice_engagement where engagement_manual_id" +\ + "='{engagement_manual_id}'".format( + engagement_manual_id=engagement_manual_id) archived_time, db_reason = DBGeneral.select_query(query, "list") - Helper.assertTrue(archived_time != None) + Helper.assertTrue(archived_time is not None) Helper.internal_assert(random_reason, db_reason) @staticmethod def click_on_change_reviewer_from_dropdown(): FEOverview.click_on_admin_dropdown() Click.link_text( - Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.LINK_TEXT) + Constants.Dashboard.Overview.AdminDropdown. + ChangeReviewer.LINK_TEXT) @staticmethod def select_engagement_lead_from_list(el_name): Wait.name( - Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Wizard.Select.NAME, wait_for_page=True) + Constants.Dashboard.Overview.AdminDropdown. + ChangeReviewer.Wizard.Select.NAME, + wait_for_page=True) Select(session.ice_driver.find_element_by_name( - Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Wizard.Select.NAME)).select_by_visible_text(el_name) + Constants.Dashboard.Overview.AdminDropdown. + ChangeReviewer.Wizard.Select.NAME)).\ + select_by_visible_text(el_name) @staticmethod def change_engagement_lead_modal(el_name, is_reviewer=True): - Wait.text_by_id(Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Wizard.Title.ID, - Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Wizard.Title.TEXT) + Wait.text_by_id( + Constants.Dashboard.Overview.AdminDropdown. + ChangeReviewer.Wizard.Title.ID, + Constants.Dashboard.Overview.AdminDropdown. + ChangeReviewer.Wizard.Title.TEXT) FEOverview.select_engagement_lead_from_list(el_name) if is_reviewer: Wait.text_by_id( - Constants.Toast.ID, Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Toast.TEXT) + Constants.Toast.ID, + Constants.Dashboard.Overview.AdminDropdown. + ChangeReviewer.Toast.TEXT) else: Wait.text_by_id( - Constants.Toast.ID, Constants.Dashboard.Overview.AdminDropdown.ChangePeerReviewer.Toast.TEXT) + Constants.Toast.ID, + Constants.Dashboard.Overview.AdminDropdown. + ChangePeerReviewer.Toast.TEXT) @staticmethod def click_on_change_peer_reviewer_from_dropdown(): FEOverview.click_on_admin_dropdown() Click.link_text( - Constants.Dashboard.Overview.AdminDropdown.ChangePeerReviewer.LINK_TEXT) - Wait.text_by_id(Constants.Dashboard.Overview.AdminDropdown.ChangePeerReviewer.Wizard.Title.ID, - Constants.Dashboard.Overview.AdminDropdown.ChangePeerReviewer.Wizard.Title.TEXT) + Constants.Dashboard.Overview.AdminDropdown. + ChangePeerReviewer.LINK_TEXT) + Wait.text_by_id( + Constants.Dashboard.Overview.AdminDropdown. + ChangePeerReviewer.Wizard.Title.ID, + Constants.Dashboard.Overview.AdminDropdown. + ChangePeerReviewer.Wizard.Title.TEXT) @staticmethod def click_on_update_status_from_dropdown(): @@ -256,23 +308,39 @@ class FEOverview: def fill_update_status_form_admin_dropdown(): random_string = Helper.rand_string() Enter.text_by_name( - Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS, str(50)) - Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS, - Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.TARGET) - Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS, - Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.HEAT) - Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS, - Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.IMAGE_SACN) - Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS, - Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.AIC) - Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS, - Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.ASDC) + Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS, + str(50)) + Enter.date_picker( + Constants.Dashboard.Overview.AdminDropdown. + UpdateStatus.PROGRESS_CSS, + Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.TARGET) + Enter.date_picker( + Constants.Dashboard.Overview.AdminDropdown. + UpdateStatus.PROGRESS_CSS, + Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.HEAT) + Enter.date_picker( + Constants.Dashboard.Overview.AdminDropdown. + UpdateStatus.PROGRESS_CSS, + Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.IMAGE_SACN) + Enter.date_picker( + Constants.Dashboard.Overview.AdminDropdown. + UpdateStatus.PROGRESS_CSS, + Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.AIC) + Enter.date_picker( + Constants.Dashboard.Overview.AdminDropdown. + UpdateStatus.PROGRESS_CSS, + Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.ASDC) Enter.text_by_name( - Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.STATUS, random_string) + Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.STATUS, + random_string) Click.css( - Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.SUBMIT, wait_for_page=True) + Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.SUBMIT, + wait_for_page=True) Wait.text_by_id( - Constants.Toast.ID, Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.SUCCESS_MSG, wait_for_page=True) + Constants.Toast.ID, + Constants.Dashboard.Overview.AdminDropdown. + UpdateStatus.SUCCESS_MSG, + wait_for_page=True) Wait.text_by_id( Constants.Dashboard.Overview.Status.Description.ID, random_string) @@ -285,7 +353,8 @@ class FEOverview: i = 0 ns_list = [] steps_length = len( - session.ice_driver.find_elements_by_css_selector(".step-indication > li")) + session.ice_driver.find_elements_by_css_selector( + ".step-indication > li")) while i < steps_length: ns_list.append(FEOverview.get_next_step_description(i)) i += 1 @@ -297,20 +366,31 @@ class FEOverview: for idx, step_uuid in enumerate(steps_uuids): db_step_text = DBVirtualFunction.select_next_step_description( step_uuid) - Wait.text_by_id(Constants.Dashboard.Overview.NextSteps.Add.Description.STEP_DESC_ID + - str(idx), ui_steps[idx], wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Overview.NextSteps.Add.Description. + STEP_DESC_ID + str(idx), + ui_steps[idx], + wait_for_page=True) if db_step_text != ui_steps[idx]: - raise AssertionError("Next step is not located in expected index. db_step_text = " - + db_step_text + " ui_steps[idx] = " + ui_steps[idx] + "|| uuid = " + step_uuid) + raise AssertionError( + "Next step is not located in expected index. " + + "db_step_text = " + + db_step_text + + " ui_steps[idx] = " + + ui_steps[idx] + + "|| uuid = " + + step_uuid) @staticmethod def next_steps_filter_by_files(): Click.id( Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.ID) Click.link_text( - Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.ANY_FILE_LINK_TEXT) + Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown. + ANY_FILE_LINK_TEXT) Click.link_text( - Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.FILE0_LINK_TEXT) + Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown. + FILE0_LINK_TEXT) Click.id( Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.ID) @@ -327,9 +407,11 @@ class FEOverview: def next_steps_filter_by_states(): Click.id(Constants.Dashboard.Overview.NextSteps.StateDropDown.ID) Click.link_text( - Constants.Dashboard.Overview.NextSteps.StateDropDown.INCOMPLETE_LINK_TEXT) + Constants.Dashboard.Overview.NextSteps.StateDropDown. + INCOMPLETE_LINK_TEXT) Click.link_text( - Constants.Dashboard.Overview.NextSteps.StateDropDown.COMPLETED_LINK_TEXT) + Constants.Dashboard.Overview.NextSteps.StateDropDown. + COMPLETED_LINK_TEXT) Click.id(Constants.Dashboard.Overview.NextSteps.StateDropDown.ID) @staticmethod @@ -341,10 +423,12 @@ class FEOverview: Helper.rand_string("randomString") Click.id(Constants.Dashboard.Overview.NextSteps.Add.Description.ID) Enter.text_by_id( - Constants.Dashboard.Overview.NextSteps.Add.Description.ID, ns_description) + Constants.Dashboard.Overview.NextSteps.Add.Description.ID, + ns_description) FEWizard.date_picker_add_ns(0) - Wait.text_by_css(Constants.SubmitButton.CSS, - Constants.Dashboard.Overview.NextSteps.Add.Button.TEXT) + Wait.text_by_css( + Constants.SubmitButton.CSS, + Constants.Dashboard.Overview.NextSteps.Add.Button.TEXT) Click.css(Constants.SubmitButton.CSS) Wait.modal_to_dissappear() @@ -361,10 +445,14 @@ class FEOverview: Constants.Dashboard.Overview.TeamMember.RemoveUser.ID) else: Click.id(Constants.Dashboard.Overview.TeamMember.RemoveUser.ID) - Wait.text_by_id(Constants.Dashboard.GeneralPrompt.UpperTitle.ID, - Constants.Dashboard.Overview.TeamMember.RemoveUser.Title.TEXT % full_name) - Wait.text_by_id(Constants.Dashboard.GeneralPrompt.Title.ID, - Constants.Dashboard.Overview.TeamMember.RemoveUser.Message.TEXT) + Wait.text_by_id( + Constants.Dashboard.GeneralPrompt.UpperTitle.ID, + Constants.Dashboard.Overview.TeamMember.RemoveUser.Title.TEXT % + full_name) + Wait.text_by_id( + Constants.Dashboard.GeneralPrompt.Title.ID, + Constants.Dashboard.Overview.TeamMember.RemoveUser. + Message.TEXT) Click.id(Constants.Dashboard.GeneralPrompt.ApproveButton.ID) FEGeneral.refresh() Wait.id_to_dissappear( @@ -373,9 +461,18 @@ class FEOverview: @staticmethod def invite_and_reopen_link(user_content, other_el_email): enguuid = DBGeneral.select_where( - "uuid", "ice_engagement", "engagement_manual_id", user_content['engagement_manual_id'], 1) + "uuid", + "ice_engagement", + "engagement_manual_id", + user_content['engagement_manual_id'], + 1) invitation_token = DBUser.select_invitation_token( - "invitation_token", "ice_invitation", "engagement_uuid", enguuid, other_el_email, 1) + "invitation_token", + "ice_invitation", + "engagement_uuid", + enguuid, + other_el_email, + 1) inviterURL = Constants.Default.InviteURL.Login.TEXT + invitation_token FEGeneral.re_open(inviterURL) @@ -392,14 +489,16 @@ class FEOverview: def validate_empty_associated_files(): FEOverview.add_next_step() Click.id(Constants.Dashboard.Overview.NextSteps.AssociatedFiles.ID) - Wait.text_by_id(Constants.Dashboard.Overview.NextSteps.AssociatedFiles.EmptyMsgID, - Constants.Dashboard.Overview.NextSteps.AssociatedFiles.EmptyMsg) + Wait.text_by_id( + Constants.Dashboard.Overview.NextSteps.AssociatedFiles.EmptyMsgID, + Constants.Dashboard.Overview.NextSteps.AssociatedFiles.EmptyMsg) @staticmethod def validate_associated_files(file_name): Click.id(Constants.Dashboard.Overview.NextSteps.AssociatedFiles.ID) Wait.text_by_id( - Constants.Dashboard.Overview.NextSteps.AssociatedFiles.FileId, file_name) + Constants.Dashboard.Overview.NextSteps.AssociatedFiles.FileId, + file_name) @staticmethod def validate_bucket_url(eng_manual_id, vf_name): @@ -411,9 +510,10 @@ class FEOverview: @staticmethod def verify_validation_dates(): validation_date = Get.by_id( - Constants.Dashboard.Overview.Progress.ValidationsDates.AIC_ID, True) - validation_date = datetime.datetime.strptime( - validation_date, "%m/%d/%y").date() + Constants.Dashboard.Overview.Progress.ValidationsDates.AIC_ID, + True) + validation_date = datetime.datetime.strptime(validation_date, + "%m/%d/%y").date() current_date = timezone.now().date() Helper.internal_assert(validation_date, current_date) diff --git a/services/frontend/fe_user.py b/services/frontend/fe_user.py index eb25d23..1438007 100644 --- a/services/frontend/fe_user.py +++ b/services/frontend/fe_user.py @@ -59,7 +59,11 @@ logger = LoggingServiceFactory.get_logger() class FEUser: @staticmethod - def login(email, password, expected_element=Constants.Dashboard.Statuses.Title.ID, element_type="id"): + def login( + email, + password, + expected_element=Constants.Dashboard.Statuses.Title.ID, + element_type="id"): try: logger.debug("Verifying and Insert Login page elements:") logger.debug("Insert Email " + email) @@ -81,7 +85,11 @@ class FEUser: raise Exception(errorMsg, e) @staticmethod - def relogin(email, password, expected_element=Constants.Dashboard.Statuses.Title.ID, element_type="id"): + def relogin( + email, + password, + expected_element=Constants.Dashboard.Statuses.Title.ID, + element_type="id"): FEGeneral.re_open(Constants.Default.LoginURL.TEXT) FEUser.login(email, password, expected_element, element_type) @@ -91,7 +99,11 @@ class FEUser: Click.link_text(Constants.Dashboard.Avatar.Logout.LINK_TEXT) @staticmethod - def activate_and_login(email, password, expected_element=Constants.Dashboard.Statuses.Title.ID, element_type="id"): + def activate_and_login( + email, + password, + expected_element=Constants.Dashboard.Statuses.Title.ID, + element_type="id"): activationUrl = DBUser.get_activation_url(email) FEGeneral.re_open(activationUrl) FEUser.login(email, password, expected_element, element_type) @@ -124,7 +136,7 @@ class FEUser: accountObj = [randomName, phone, password] return accountObj # If failed - count the failure and add the error to list of errors. - except: + except BaseException: errorMsg = "Failed in update accaunt ." raise Exception(errorMsg) raise @@ -169,15 +181,18 @@ class FEUser: def click_on_feedback(): Click.id(Constants.Dashboard.Feedback.ID, wait_for_page=True) Wait.id( - Constants.Dashboard.Feedback.FeedbackModal.SAVE_BTN_ID, wait_for_page=True) + Constants.Dashboard.Feedback.FeedbackModal.SAVE_BTN_ID, + wait_for_page=True) @staticmethod def validate_feedback(description, user_email): - query = "SELECT user_id FROM ice_feedback where description = '{desc}'".format( - desc=description) + query = "SELECT user_id FROM ice_feedback where " +\ + "description = '{desc}'".format( + desc=description) feedback_user_uuid = DBGeneral.select_query(query) - query = "SELECT id FROM ice_user_profile where email = '{email}'".format( - email=user_email) + query = "SELECT id FROM ice_user_profile where " +\ + "email = '{email}'".format( + email=user_email) user_uuid = DBGeneral.select_query(query) Helper.internal_assert(user_uuid, feedback_user_uuid) @@ -187,7 +202,8 @@ class FEUser: description = Helper.rand_string("randomString") Enter.text_by_css("textarea[name=\"description\"]", description) Click.id( - Constants.Dashboard.Feedback.FeedbackModal.SAVE_BTN_ID, wait_for_page=True) + Constants.Dashboard.Feedback.FeedbackModal.SAVE_BTN_ID, + wait_for_page=True) Wait.text_by_id(Constants.Toast.ID, "Feedback was sent successfully.", wait_for_page=True) return description @@ -206,9 +222,12 @@ class FEUser: def click_on_notifications(): try: Click.link_text( - Constants.Dashboard.Avatar.Notifications.LINK_TEXT, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.Avatar.Notifications.Title.ID, - Constants.Dashboard.Avatar.Notifications.Title.TEXT, wait_for_page=True) + Constants.Dashboard.Avatar.Notifications.LINK_TEXT, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Avatar.Notifications.Title.ID, + Constants.Dashboard.Avatar.Notifications.Title.TEXT, + wait_for_page=True) except Exception as e: errorMsg = "Failed to click_on on Admin." raise Exception(errorMsg, e) @@ -231,21 +250,28 @@ class FEUser: logger.error( "APIUser should not have assigned next steps at first login.") raise - if (Get.by_id("next-steps-list") != "No next steps are assigned to you."): + if (Get.by_id("next-steps-list") != + "No next steps are assigned to you."): logger.error( - "No assigned next steps and text 'No next steps are assigned to you.' was not found.") + "No assigned next steps and text 'No next steps are " + + "assigned to you.' was not found.") raise token = "token " + APIUser.login_user(user_content['el_email']) user_content['session_token'] = token logger.debug( - "Adding new next step (via api) and assigning it to user " + user_content['full_name']) + "Adding new next step (via api) and assigning it to user " + + user_content['full_name']) APIVirtualFunction.add_next_step(user_content) logger.debug( - "Refresh page and look for changes in assigned next steps section:") + "Refresh page and look for changes in assigned " + + "next steps section:") FEGeneral.refresh() logger.debug(" > Check if number has changed in 'Assigned To You'") - Wait.text_by_id( - "next-steps-header", "Assigned To You (1)", wait_for_page=True) + FEUser.logout() + FEUser.login( + user_content['email'], Constants.Default.Password.TEXT) + text = Get.by_id("next-steps-header", True) + Helper.internal_assert(text, "Assigned To You (1)") @staticmethod def set_ssh_key_from_account(key, is_negative=False): @@ -254,23 +280,29 @@ class FEUser: Click.css(Constants.SubmitButton.CSS) if is_negative: Wait.text_by_id( - Constants.Toast.ID, Constants.Dashboard.Avatar.Account.SSHKey.UpdateFailed.TEXT) + Constants.Toast.ID, + Constants.Dashboard.Avatar.Account.SSHKey.UpdateFailed.TEXT) else: Wait.text_by_id( - Constants.Toast.ID, Constants.Dashboard.Avatar.Account.Update.Success.TEXT) + Constants.Toast.ID, + Constants.Dashboard.Avatar.Account.Update.Success.TEXT) @staticmethod def reset_password(): Wait.text_by_css( - Constants.UpdatePassword.Title.CSS, Constants.UpdatePassword.Title.TEXT) + Constants.UpdatePassword.Title.CSS, + Constants.UpdatePassword.Title.TEXT) Wait.text_by_css( - Constants.UpdatePassword.SubTitle.CSS, Constants.UpdatePassword.SubTitle.TEXT) + Constants.UpdatePassword.SubTitle.CSS, + Constants.UpdatePassword.SubTitle.TEXT) Wait.text_by_css( Constants.SubmitButton.CSS, Constants.UpdatePassword.Button.TEXT) Enter.text_by_name( - Constants.UpdatePassword.Password.NAME, Constants.Default.Password.NewPass.TEXT) + Constants.UpdatePassword.Password.NAME, + Constants.Default.Password.NewPass.TEXT) Enter.text_by_name( - Constants.UpdatePassword.ConfirmPassword.NAME, Constants.Default.Password.NewPass.TEXT) + Constants.UpdatePassword.ConfirmPassword.NAME, + Constants.Default.Password.NewPass.TEXT) Click.css(Constants.SubmitButton.CSS) Wait.text_by_id( Constants.Toast.ID, Constants.UpdatePassword.Toast.TEXT) @@ -279,8 +311,8 @@ class FEUser: def delete_notification(notificationID): if isinstance(notificationID, tuple): notificationID = notificationID[0] - delete_button = Constants.Dashboard.Avatar.Notifications.DeleteNotification.ID + \ - notificationID + delete_button = Constants.Dashboard.Avatar.Notifications.\ + DeleteNotification.ID + notificationID # Click on delete button. Click.id(delete_button, wait_for_page=True) Wait.id_to_dissappear(delete_button) @@ -292,7 +324,8 @@ class FEUser: if isinstance(notifID, tuple): notifID = notifID[0] ui_list.append(str(Get.by_id( - Constants.Dashboard.Avatar.Notifications.NotificationColumn.ID + notifID))) + Constants.Dashboard.Avatar.Notifications. + NotificationColumn.ID + notifID))) for activity in notification_list: if not any(activity in s for s in ui_list): raise AssertionError( @@ -313,8 +346,9 @@ class FEUser: def open_invite_team_member_form(vf_left_nav_id): Click.id(vf_left_nav_id) Click.id(Constants.Dashboard.Overview.TeamMember.ID) - Wait.text_by_name(Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME, - Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT) + Wait.text_by_name( + Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME, + Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT) @staticmethod def invite_single_user_to_team(email): @@ -325,49 +359,64 @@ class FEUser: def go_to_user_profile_settings(): FEUser.go_to_account() Click.id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ID, wait_for_page=True) - Wait.text_by_id(Constants.Dashboard.Avatar.Account.UserProfileSettings.TitleID, - Constants.Dashboard.Avatar.Account.UserProfileSettings.TitleText, wait_for_page=True) + Constants.Dashboard.Avatar.Account.UserProfileSettings.ID, + wait_for_page=True) + Wait.text_by_id( + Constants.Dashboard.Avatar.Account.UserProfileSettings.TitleID, + Constants.Dashboard.Avatar.Account.UserProfileSettings.TitleText, + wait_for_page=True) @staticmethod def check_user_profile_settings_checkboxes(): Click.id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, wait_for_page=True) + Constants.Dashboard.Avatar.Account. + UserProfileSettings.ReceiveEmailsID, + wait_for_page=True) Click.id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID, wait_for_page=True) + Constants.Dashboard.Avatar.Account.UserProfileSettings. + ReceiveEmailEveryTimeID, + wait_for_page=True) Click.id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, wait_for_page=True) + Constants.Dashboard.Avatar.Account.UserProfileSettings. + ReceiveDigestEmailID, + wait_for_page=True) Click.id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.UpdateButtonID, wait_for_page=True) + Constants.Dashboard.Avatar.Account.UserProfileSettings. + UpdateButtonID, + wait_for_page=True) @staticmethod def validate_user_profile_settings_checkboxes(checked): Wait.page_has_loaded() receive_emails = Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, True) + Constants.Dashboard.Avatar.Account.UserProfileSettings. + ReceiveEmailsID, True) Helper.internal_assert(receive_emails, checked) - receive_notifications = \ - Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID, True) - receive_email_every_time = \ - Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID, True) + Get.is_selected_by_id( + Constants.Dashboard.Avatar.Account.UserProfileSettings. + ReceiveNotificationsID, True) + receive_email_every_time = Get.is_selected_by_id( + Constants.Dashboard.Avatar.Account.UserProfileSettings. + ReceiveEmailEveryTimeID, True) Helper.internal_assert(receive_email_every_time, checked) - receive_digest_email = \ - Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, True) + receive_digest_email = Get.is_selected_by_id( + Constants.Dashboard.Avatar.Account.UserProfileSettings. + ReceiveDigestEmailID, True) Helper.internal_assert(receive_digest_email, not checked) @staticmethod def compare_notifications_count_for_user(expected_count): Wait.text_by_id( - Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, True) + Constants.Dashboard.Avatar.Notifications.Count.ID, + expected_count, + True) @staticmethod def check_notification_number_is_not_presented(): FEGeneral.refresh() Wait.id_to_dissappear( - Constants.Dashboard.Avatar.Notifications.Count.ID, wait_for_page=True) + Constants.Dashboard.Avatar.Notifications.Count.ID, + wait_for_page=True) @staticmethod def validate_account_details(full_name, phone_number, ssh_key): @@ -375,8 +424,8 @@ class FEUser: Constants.Dashboard.Avatar.Account.FullName.NAME)) Helper.internal_assert(phone_number, Get.value_by_name( Constants.Dashboard.Avatar.Account.Phone.NAME)) - Helper.internal_assert( - ssh_key, Get.value_by_name(Constants.Dashboard.Avatar.Account.SSHKey.NAME)) + Helper.internal_assert(ssh_key, Get.value_by_name( + Constants.Dashboard.Avatar.Account.SSHKey.NAME)) @staticmethod def check_rgwa_access_key(my_key): @@ -387,14 +436,16 @@ class FEUser: def check_rgwa_access_secret(my_secret): Click.id(Constants.Dashboard.Avatar.Account.RGWA.Secret.BUTTON_ID) Wait.text_by_id( - Constants.Dashboard.Avatar.Account.RGWA.Secret.SECRET_ID, my_secret) + Constants.Dashboard.Avatar.Account.RGWA.Secret.SECRET_ID, + my_secret) @staticmethod def get_rgwa_access_secret(): Click.id(Constants.Dashboard.Avatar.Account.RGWA.Secret.BUTTON_ID, wait_for_page=True) secret = Get.by_id( - Constants.Dashboard.Avatar.Account.RGWA.Secret.SECRET_ID, wait_for_page=True) + Constants.Dashboard.Avatar.Account.RGWA.Secret.SECRET_ID, + wait_for_page=True) return secret @staticmethod diff --git a/services/frontend/fe_wizard.py b/services/frontend/fe_wizard.py index df5087d..c299a4b 100644 --- a/services/frontend/fe_wizard.py +++ b/services/frontend/fe_wizard.py @@ -61,7 +61,9 @@ class FEWizard: try: logger.debug("Tab Add Virtual Functions") Wait.text_by_css( - Constants.Dashboard.Wizard.Title.CSS, Constants.Dashboard.Wizard.AddVF.Title.TEXT, wait_for_page=True) + Constants.Dashboard.Wizard.Title.CSS, + Constants.Dashboard.Wizard.AddVF.Title.TEXT, + wait_for_page=True) vfName = "newVF" + Helper.rand_string("randomString") vfVersion = "newVFVersion" + \ Helper.rand_string( @@ -70,9 +72,11 @@ class FEWizard: Enter.text_by_name("VFversion", vfVersion, wait_for_page=True) FEWizard.date_picker_wizard() Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.Wizard.AddVF.AIC_Version.TEXT)).select_by_visible_text("AIC 3.5") + Constants.Dashboard.Wizard.AddVF.AIC_Version.TEXT + )).select_by_visible_text("AIC 3.5") Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.Wizard.AddVF.ECOMP_Release.TEXT)).select_by_visible_text("Unknown") + Constants.Dashboard.Wizard.AddVF.ECOMP_Release.TEXT + )).select_by_visible_text("Unknown") session.E2Edate = FEWizard.get_lab_entry_date() Click.css(Constants.SubmitButton.CSS, wait_for_page=True) Wait.page_has_loaded() @@ -80,7 +84,8 @@ class FEWizard: return vfName # If failed - count the failure and add the error to list of errors. except Exception as e: - errorMsg = "Failed to add a Virtual Function via modal window. Exception " + \ + errorMsg = "Failed to add a Virtual Function via modal window. " +\ + "Exception " +\ str(e) raise Exception(errorMsg) @@ -93,8 +98,10 @@ class FEWizard: @staticmethod def add_vendor_contact(): logger.debug("Tab Add Vendor Contact") - Wait.text_by_css(Constants.Dashboard.Wizard.Title.CSS, - Constants.Dashboard.Wizard.AddVendorContact.Title.TEXT, wait_for_page=True) + Wait.text_by_css( + Constants.Dashboard.Wizard.Title.CSS, + Constants.Dashboard.Wizard.AddVendorContact.Title.TEXT, + wait_for_page=True) Select(session.ice_driver.find_element_by_name( "company")).select_by_visible_text("Ericsson") fullname = Helper.rand_string( @@ -115,7 +122,10 @@ class FEWizard: logger.debug( "Tab Add " + ServiceProvider.MainServiceProvider + " Sponsor") Wait.text_by_css( - Constants.Dashboard.Wizard.Title.CSS, "Add " + ServiceProvider.MainServiceProvider + " Sponsor") + Constants.Dashboard.Wizard.Title.CSS, + "Add " + + ServiceProvider.MainServiceProvider + + " Sponsor") fullname = Helper.rand_string( "randomString") + Helper.rand_string("randomString") Enter.text_by_name("fullname", fullname) @@ -127,19 +137,21 @@ class FEWizard: Enter.text_by_name("phone", phone) Click.css(Constants.SubmitButton.CSS) Wait.name_to_dissappear("Add AT&T Sponsor") - sponsor = {"company": ServiceProvider.MainServiceProvider, "full_name": fullname, - "email": email, "phone": phone} + sponsor = {"company": ServiceProvider.MainServiceProvider, + "full_name": fullname, "email": email, "phone": phone} return sponsor @staticmethod def invite_team_members(email): try: logger.debug("Tab Invite Team Members") - Wait.text_by_name(Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME, - Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT) + Wait.text_by_name( + Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME, + Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT) Enter.text_by_name("email", email) Wait.text_by_css( - Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT) + Constants.SubmitButton.CSS, + Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT) Click.css(Constants.SubmitButton.CSS) Wait.name_to_dissappear( Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME) @@ -162,12 +174,15 @@ class FEWizard: # Check that the submit button exists. Wait.text_by_css( - Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.AddSSHKey.Title.TEXT) + Constants.SubmitButton.CSS, + Constants.Dashboard.Wizard.AddSSHKey.Title.TEXT) Click.css(Constants.SubmitButton.CSS) # Click on submit button. if is_negative: Wait.text_by_id( - Constants.Toast.ID, Constants.Dashboard.Avatar.Account.SSHKey.UpdateFailed.TEXT) + Constants.Toast.ID, + Constants.Dashboard.Avatar.Account + .SSHKey.UpdateFailed.TEXT) else: Wait.name_to_dissappear( Constants.Dashboard.Wizard.AddSSHKey.Title.NAME) @@ -175,7 +190,8 @@ class FEWizard: return sshKey # If failed - count the failure and add the error to list of errors. except Exception as e: - errorMsg = "Failed to add an SSH Key in the modal window. Exception=" + \ + errorMsg = "Failed to add an SSH Key in " +\ + "the modal window. Exception=" + \ str(e) raise Exception(errorMsg) @@ -183,12 +199,15 @@ class FEWizard: def invite_team_members_modal(email, wait_modal_to_disappear=True): try: Click.id( - Constants.Dashboard.Overview.TeamMember.ID, wait_for_page=True) - Wait.text_by_css(Constants.Dashboard.Wizard.Title.CSS, - Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT) + Constants.Dashboard.Overview.TeamMember.ID, + wait_for_page=True) + Wait.text_by_css( + Constants.Dashboard.Wizard.Title.CSS, + Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT) Enter.text_by_name("email", email) Wait.text_by_css( - Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT) + Constants.SubmitButton.CSS, + Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT) Click.css(".inviteMembers-form button.btn.btn-primary", True) if wait_modal_to_disappear: Wait.modal_to_dissappear() @@ -201,8 +220,14 @@ class FEWizard: @staticmethod def date_picker_add_ns(count): try: - session.ice_driver.execute_script("var el = angular.element(document.querySelector('.addNextSteps')); el.scope().vm.nextSteps[" + str( - count) + "].duedate = new Date('" + str(datetime.today().isoformat()) + "')") + session.ice_driver.execute_script( + "var el = angular.element(document.querySelector" + + "('.addNextSteps')); el.scope().vm.nextSteps[" + + str(count) + + "].duedate = new Date('" + + str( + datetime.today().isoformat()) + + "')") Click.css("div.modal-content", wait_for_page=True) except Exception as e: errorMsg = "Failed to select date with datePicker." diff --git a/services/helper.py b/services/helper.py index 418b260..dd779be 100644 --- a/services/helper.py +++ b/services/helper.py @@ -1,4 +1,3 @@ - # ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== @@ -42,9 +41,10 @@ import string import subprocess import unittest -from cryptography.hazmat.backends import default_backend as crypto_default_backend, \ - default_backend -from cryptography.hazmat.primitives import serialization as crypto_serialization +from cryptography.hazmat.backends import \ + default_backend as crypto_default_backend, default_backend +from cryptography.hazmat.primitives import \ + serialization as crypto_serialization from cryptography.hazmat.primitives.asymmetric import rsa from django.conf import settings @@ -65,8 +65,11 @@ class Helper: def rand_string(type_of_value='randomString', numberOfDigits=0): letters_and_numbers = string.ascii_letters + string.digits if type_of_value == 'email': - myEmail = ''.join(random.choice(letters_and_numbers) for _ in range( - 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com" + myEmail = ''.join( + random.choice(letters_and_numbers) for _ in range( + 4)) + "@" + ''.join( + random.choice( + string.ascii_uppercase) for _ in range(4)) + ".com" return "ST" + myEmail elif type_of_value == 'randomNumber': randomNumber = ''.join("%s" % random.randint(2, 9) @@ -93,7 +96,7 @@ class Helper: key_size=2048, backend=default_backend() ) - private_key = key.private_bytes( + key.private_bytes( encoding=crypto_serialization.Encoding.PEM, format=crypto_serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=crypto_serialization.NoEncryption()) @@ -114,7 +117,8 @@ class Helper: def check_admin_ssh_existence(path, admin_ssh): if admin_ssh == open(path).read().rstrip('\n'): logger.debug( - "Admin SSH already defined in DB and equal to the one stored on the local system.") + "Admin SSH already defined in DB and equal " + + "to the one stored on the local system.") return True return False @@ -173,7 +177,8 @@ class Helper: return user_pub_key except Exception as error: logger.error( - "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error) + "_-_-_-_-_- Unexpected error in " + + "get_or_create_rsa_key_for_admin: %s" % error) raise Exception("Failed to create SSH keys for user admin", error) @staticmethod @@ -215,5 +220,5 @@ class Helper: @staticmethod def assertTrue(expr, msg=None): """Check that the expression is true.""" - if expr != True: + if not expr: raise Exception("AssertionError: \"not expr") diff --git a/services/session.py b/services/session.py index 174f54b..0b8e290 100644 --- a/services/session.py +++ b/services/session.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -58,7 +58,8 @@ class SessionSingletone(): def __init__(self): self.wait_until_retires = self.positive_wait_until_retires self.wait_until_time_pause = self.positive_wait_until_time_pause - self.wait_until_time_pause_long = self.positive_wait_until_time_pause_long + self.wait_until_time_pause_long = \ + self.positive_wait_until_time_pause_long self.wait_until_implicit_time = self.positive_wait_until_implicit_time self.test = '123' self.ice_driver = None @@ -94,7 +95,7 @@ class SessionSingletone(): self.start_negative() try: run_me() # click.css - except: + except BaseException: # will run if click does NOT succeed self.errorCounter = 0 self.errorList = "" @@ -103,4 +104,5 @@ class SessionSingletone(): raise Exception(error_msg) self.end_negative() + session = SessionSingletone() diff --git a/services/types.py b/services/types.py index 7dbbe94..db58ca0 100644 --- a/services/types.py +++ b/services/types.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. |