summaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorEdan Binshtok <eb578m@intl.att.com>2017-11-19 11:50:50 +0200
committerEdan Binshtok <eb578m@intl.att.com>2017-11-19 11:51:04 +0200
commit71891b3040605103397ffa7fb349215eced3a2c3 (patch)
tree22dda04edd1b8ee0d0de2a6a260d6d60e42cba25 /services
parent433a8256e31f755f5e236491bbe39d3db24d6d6d (diff)
Pep8 another fixes
Add more fixes to pep8 Issue-ID: VVP-25 Change-Id: Icc3bd3977ced2b537c858a9801e04a6bf6d1db05 Signed-off-by: Edan Binshtok <eb578m@intl.att.com>
Diffstat (limited to 'services')
-rw-r--r--services/__init__.py4
-rw-r--r--services/api/__init__.py4
-rw-r--r--services/api/api_bridge.py15
-rw-r--r--services/api/api_checklist.py66
-rw-r--r--services/api/api_gitlab.py107
-rw-r--r--services/api/api_jenkins.py15
-rw-r--r--services/api/api_rados.py85
-rw-r--r--services/api/api_user.py64
-rw-r--r--services/api/api_virtual_function.py117
-rw-r--r--services/constants.py29
-rw-r--r--services/database/__init__.py4
-rw-r--r--services/database/db_bridge.py15
-rw-r--r--services/database/db_checklist.py270
-rw-r--r--services/database/db_cms.py75
-rwxr-xr-xservices/database/db_general.py145
-rw-r--r--services/database/db_user.py238
-rw-r--r--services/database/db_virtual_function.py123
-rw-r--r--services/frontend/__init__.py4
-rw-r--r--services/frontend/base_actions/__init__.py4
-rw-r--r--services/frontend/base_actions/click.py36
-rw-r--r--services/frontend/base_actions/enter.py114
-rw-r--r--services/frontend/base_actions/get.py36
-rw-r--r--services/frontend/base_actions/wait.py80
-rw-r--r--services/frontend/fe_checklist.py308
-rw-r--r--services/frontend/fe_checklist_template.py207
-rw-r--r--services/frontend/fe_cms.py54
-rw-r--r--services/frontend/fe_dashboard.py53
-rw-r--r--services/frontend/fe_detailed_view.py284
-rw-r--r--services/frontend/fe_general.py15
-rw-r--r--services/frontend/fe_invite.py69
-rw-r--r--services/frontend/fe_next_step.py10
-rw-r--r--services/frontend/fe_overview.py254
-rw-r--r--services/frontend/fe_user.py157
-rw-r--r--services/frontend/fe_wizard.py67
-rw-r--r--services/helper.py25
-rw-r--r--services/session.py10
-rw-r--r--services/types.py4
37 files changed, 2136 insertions, 1031 deletions
diff --git a/services/__init__.py b/services/__init__.py
index 30d7152..32b601a 100644
--- a/services/__init__.py
+++ b/services/__init__.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
diff --git a/services/api/__init__.py b/services/api/__init__.py
index 30d7152..32b601a 100644
--- a/services/api/__init__.py
+++ b/services/api/__init__.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
diff --git a/services/api/api_bridge.py b/services/api/api_bridge.py
index 8926a1d..d8cb46c 100644
--- a/services/api/api_bridge.py
+++ b/services/api/api_bridge.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -36,11 +36,15 @@
# ============LICENSE_END============================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+
class APIBridge:
"""
- This class helps to use functions inside classes with circular import (dependencies).
- Use this class only when there is circular import in one of the API services.
+ This class helps to use functions inside
+ classes with circular import (dependencies).
+ Use this class only when there is circular
+ import in one of the API services.
"""
@staticmethod
@@ -63,7 +67,8 @@ class APIBridge:
@staticmethod
def create_engagement(wait_for_gitlab=True):
- """create_engagement: Originally can be found under APIVirtualFunction class."""
+ """create_engagement: Originally can be found under
+ APIVirtualFunction class."""
from services.api.api_virtual_function import APIVirtualFunction
return APIVirtualFunction.create_engagement(wait_for_gitlab)
diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py
index fda0730..02ab311 100644
--- a/services/api/api_checklist.py
+++ b/services/api/api_checklist.py
@@ -37,7 +37,6 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import datetime
-import logging
import requests
@@ -56,7 +55,12 @@ logger = LoggingServiceFactory.get_logger()
class APIChecklist:
@staticmethod
- def create_checklist(user_content, files=["file0", "file1"], return_negative_response=False):
+ def create_checklist(
+ user_content,
+ files=[
+ "file0",
+ "file1"],
+ return_negative_response=False):
r1 = None
postURL = Constants.Default.URL.Checklist.TEXT + \
user_content['engagement_uuid'] + '/checklist/new/'
@@ -69,11 +73,13 @@ class APIChecklist:
data['checkListName'] = "checklistAPI" + \
Helper.rand_string('randomString')
data['checkListTemplateUuid'] = DBGeneral.select_where(
- "uuid", "ice_checklist_template", "name", Constants.Template.Heat.TEXT, 1)
+ "uuid", "ice_checklist_template", "name",
+ Constants.Template.Heat.TEXT, 1)
try:
if not APIGitLab.is_gitlab_ready(user_content):
raise Exception(
- "Gitlab is not ready and because of that the test is failed.")
+ "Gitlab is not ready and because " +
+ "of that the test is failed.")
r1 = requests.post(postURL, json=data,
headers=headers, verify=False)
@@ -82,15 +88,19 @@ class APIChecklist:
logger.debug("Checklist was created successfully!")
cl_content = r1.json()
return cl_content
- except:
+ except BaseException:
if return_negative_response:
return r1
if r1 is None:
logger.error(
- "Failed to create checklist for VF " + user_content['vfName'])
+ "Failed to create checklist for VF " +
+ user_content['vfName'])
else:
- logger.error("Failed to create checklist for VF " + user_content[
- 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "Failed to create checklist for VF " +
+ user_content['vfName'] +
+ ", see response >>> %s %s.\nContent: %s" %
+ (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
raise
@staticmethod
@@ -107,7 +117,8 @@ class APIChecklist:
data['checkListName'] = "UpdateChecklistAPI" + \
Helper.rand_string('randomString')
data['checkListTemplateUuid'] = DBGeneral.select_where(
- "uuid", "ice_checklist_template", "name", Constants.Template.Heat.TEXT, 1)
+ "uuid", "ice_checklist_template", "name",
+ Constants.Template.Heat.TEXT, 1)
try:
r1 = requests.put(
postURL, json=data, headers=headers, verify=False)
@@ -115,13 +126,17 @@ class APIChecklist:
logger.debug("DBChecklist was created successfully!")
cl_content = r1.json()
return cl_content['uuid']
- except:
+ except BaseException:
if r1 is None:
logger.error(
- "Failed to create checklist for VF " + user_content['vfName'])
+ "Failed to create checklist for VF " +
+ user_content['vfName'])
else:
- logger.error("Failed to create checklist for VF " + user_content[
- 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "Failed to create checklist for VF " +
+ user_content['vfName'] +
+ ", see response >>> %s %s.\nContent: %s" %
+ (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
raise
@staticmethod
@@ -141,13 +156,16 @@ class APIChecklist:
postURL, json=data, headers=headers, verify=False)
Helper.internal_assert_boolean(r1.status_code, 200)
logger.debug("Audit log was added successfully!")
- except:
+ except BaseException:
if r1 is None:
logger.error(
"Failed to add audit log for checklist uuid: " + cl_uuid)
else:
- logger.error("Failed to add audit log for checklist uuid: " + cl_uuid +
- ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "Failed to add audit log for checklist uuid: " +
+ cl_uuid +
+ ", see response >>> %s %s.\nContent: %s" %
+ (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
raise
@staticmethod
@@ -175,13 +193,16 @@ class APIChecklist:
logger.debug("Next step was added successfully!")
ns_uuid = r1.json()
return ns_uuid[0]['uuid']
- except:
+ except BaseException:
if r1 is None:
logger.error(
"Failed to add next step for checklist uuid: " + cl_uuid)
else:
- logger.error("Failed to add next step for checklist uuid: " + cl_uuid +
- ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "Failed to add next step for checklist uuid: " +
+ cl_uuid +
+ ", see response >>> %s %s.\nContent: %s" %
+ (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
raise
@staticmethod
@@ -202,8 +223,8 @@ class APIChecklist:
logger.debug("go_to_next_state put request result status: %s" %
r.status_code)
else:
- logger.error(
- "PUT request failed to change checklist state >>> " + str(r.status_code) + " " + r.reason)
+ logger.error("PUT request failed to change checklist state >>> " +
+ str(r.status_code) + " " + r.reason)
raise Exception("PUT request failed to change checklist state")
@staticmethod
@@ -215,7 +236,8 @@ class APIChecklist:
Constants.ChecklistStates.Closed.TEXT]
for i in range(len(vf_staff_emails)):
logger.debug(
- "Trying to jump state for %s [%s]" % (vf_staff_emails[i], i))
+ "Trying to jump state for %s [%s]" %
+ (vf_staff_emails[i], i))
DBChecklist.update_all_decisions_to_approve(cl_uuid)
api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i])
logger.debug("Checking state changed to %s" % states[i])
diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py
index 6c5a2ff..639f67e 100644
--- a/services/api/api_gitlab.py
+++ b/services/api/api_gitlab.py
@@ -36,7 +36,6 @@
# ============LICENSE_END============================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import logging
import os
import subprocess
import sys
@@ -63,7 +62,7 @@ class APIGitLab:
def display_output(p):
while True:
out = p.stderr.read(1)
- if out == b'' and p.poll() != None:
+ if out == b'' and p.poll() is not None:
break
if out != '':
sys.stdout.write(str(out.decode()))
@@ -81,31 +80,42 @@ class APIGitLab:
try:
r1 = requests.get(getURL, headers=headers, verify=False)
counter = 0
- while r1.status_code == 404 or r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ while r1.status_code == 404 or r1.content == b'[]' and \
+ counter <= Constants.\
+ GitLabConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause)
r1 = requests.get(getURL, headers=headers, verify=False)
logger.debug(
- "trying to get the git project, yet to succeed (try #%s)" % counter)
+ "trying to get the git project, " +
+ "yet to succeed (try #%s)" % counter)
counter += 1
+ Helper.internal_assert(r1.status_code, 200)
if r1.content == b'[]':
logger.error("Got an empty list as a response.")
raise
logger.debug("Project exists on APIGitLab!")
content = r1.json() # Change it from list to dict.
return content
- except:
+ except BaseException:
if r1 is None:
logger.error("Failed to get project from APIGitLab.")
else:
- logger.error("Failed to get project from APIGitLab, see response >>> %s %s \n %s"
- % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "Failed to get project from APIGitLab, " +
+ "see response >>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
- def are_all_list_users_registered_as_project_members(self, users_emails_list, project_path_with_namespace):
+ def are_all_list_users_registered_as_project_members(
+ self, users_emails_list, project_path_with_namespace):
for email in users_emails_list:
- if not self.validate_git_project_members(project_path_with_namespace, email):
+ if not self.validate_git_project_members(
+ project_path_with_namespace, email):
raise Exception(
- "Couldn't find the invited users: " + email + " in GitLab.")
+ "Couldn't find the invited users: " +
+ email +
+ " in GitLab.")
logger.debug(
"Invited user: " + email + " found in GitLab.")
@@ -121,7 +131,9 @@ class APIGitLab:
headers['Content-type'] = 'application/json'
headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
counter = 0
- while (r1 is None or r1.content == b'[]' or r1.status_code != 200) and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ while (r1 is None or r1.content == b'[]' or r1.status_code !=
+ 200) and counter <= Constants.GitLabConstants.\
+ RETRIES_NUMBER:
logger.debug(
"try to get git project members (try #%s)" % counter)
time.sleep(session.wait_until_time_pause)
@@ -130,8 +142,11 @@ class APIGitLab:
counter += 1
except Exception as e:
if counter >= Constants.GitLabConstants.RETRIES_NUMBER:
- logger.error("Failed to get project's team members from APIGitLab, see response >>> %s %s \n %s %s"
- % (r1.status_code, r1.reason, str(r1.content, 'utf-8'), e.message))
+ logger.error(
+ "Failed to get project's team members from " +
+ "APIGitLab, see response >>> %s %s \n %s %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8'), e.message))
return False
if r1.content == b'[]':
logger.error("Got an empty list as a response.")
@@ -144,7 +159,8 @@ class APIGitLab:
return True
@staticmethod
- def negative_validate_git_project_member(path_with_namespace, user_email, git_user_id):
+ def negative_validate_git_project_member(
+ path_with_namespace, user_email, git_user_id):
if settings.DATABASE_TYPE != 'local':
r1 = None
headers = dict()
@@ -154,7 +170,9 @@ class APIGitLab:
headers['Content-type'] = 'application/json'
headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
counter = 0
- while r1 is None or str.encode(user_email) not in r1.content and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ while r1 is None or str.encode(
+ user_email) not in r1.content and counter <= Constants.\
+ GitLabConstants.RETRIES_NUMBER:
logger.debug(
"try to get git project members (try #%s)" % counter)
time.sleep(session.wait_until_time_pause)
@@ -163,8 +181,11 @@ class APIGitLab:
counter += 1
except Exception as e:
if counter >= Constants.GitLabConstants.RETRIES_NUMBER:
- logger.error("Failed to get project's team members from APIGitLab, see response >>> %s %s \n %s %s"
- % (r1.status_code, r1.reason, str(r1.content, 'utf-8'), e.message))
+ logger.error(
+ "Failed to get project's team members from " +
+ "APIGitLab, see response >>> %s %s \n %s %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8'), e.message))
return False
if r1.content == b'[]':
@@ -193,7 +214,9 @@ class APIGitLab:
counter = 0
while r1.content == b'[]' and counter <= 60:
logger.info(
- "Will try to get gitlab user until will be response... #%s" % counter)
+ "Will try to get gitlab user until " +
+ "will be response... #%s" %
+ counter)
time.sleep(session.wait_until_time_pause_long)
r1 = requests.get(getURL, headers=headers, verify=False)
Helper.internal_assert(r1.status_code, 200)
@@ -207,12 +230,15 @@ class APIGitLab:
(r1.status_code, r1.reason, r1.content))
content = r1.json()
return content[0]
- except:
+ except BaseException:
if r1 is None:
logger.error("Failed to get user from APIGitLab.")
else:
- logger.error("Failed to get user from APIGitLab, see response >>> %s %s \n %s"
- % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "Failed to get user from APIGitLab, see response " +
+ ">>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
@@ -235,19 +261,24 @@ class APIGitLab:
content = r1.json() # Change it from list to dict.
gitPubKey = content[0]['key']
return gitPubKey
- except:
+ except BaseException:
if r1 is None:
- logger.error("Failed to get user's public key from APIGitLab.")
+ logger.error("Failed to get user's public key " +
+ "from APIGitLab.")
else:
- logger.error("Failed to get user's public key from APIGitLab, see response >>> %s %s \n %s"
- % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "Failed to get user's public key from APIGitLab, " +
+ "see response >>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
def git_clone_push(user_content):
if settings.DATABASE_TYPE != 'local':
logger.debug(
- "About to push files into project's repository on the local folder(not over origin).")
+ "About to push files into project's repository on the " +
+ "local folder(not over origin).")
try:
user_content['session_token'] = "token " + \
APIBridge.login_user(Constants.Users.Admin.EMAIL)
@@ -267,7 +298,8 @@ class APIGitLab:
counter = 0
git_user_pub_key = None
- while user_pub_key != git_user_pub_key and counter < Constants.GitLabConstants.RETRIES_NUMBER:
+ while user_pub_key != git_user_pub_key and counter < \
+ Constants.GitLabConstants.RETRIES_NUMBER:
try:
git_user_pub_key = APIGitLab.get_git_user_ssh_key(
git_user['id'])
@@ -279,17 +311,20 @@ class APIGitLab:
# Check that the SSH key was added to user on APIGitLab.
if user_pub_key != git_user_pub_key:
- raise Exception("The SSH Key received does not equal to the"
- " one provided! The key from"
- "APIGitLab:\n %s ==<>== %s"
- % (git_user_pub_key, user_pub_key))
+ raise Exception(
+ "The SSH Key received does not equal to the"
+ " one provided! The key from"
+ "APIGitLab:\n %s ==<>== %s" %
+ (git_user_pub_key, user_pub_key))
gitRepoURL = "git@gitlab:%s/%s.git" % (
- user_content['engagement_manual_id'], user_content['vfName'])
+ user_content['engagement_manual_id'],
+ user_content['vfName'])
logger.debug("Clone repo from: " + gitRepoURL)
APIGitLab.is_gitlab_ready(user_content)
cmd = 'cd ' + repo_dir + \
- '; git config --global user.email \"' + Constants.Users.Admin.EMAIL + \
+ '; git config --global user.email \"' + Constants.\
+ Users.Admin.EMAIL + \
'\"; git config --global user.name \"' + \
Constants.Users.Admin.FULLNAME + '\";'
# Commit all changes.
@@ -358,9 +393,11 @@ class APIGitLab:
"All edited files were commited and pushed to APIGitLab.")
except Exception as e:
logger.error(
- "_-_-_-_-_- Unexpected error in git_push_commit : " + str(e))
+ "_-_-_-_-_- Unexpected error in git_push_commit : " +
+ str(e))
raise Exception(
- "Something went wrong on git_push_commit function, please check logs.")
+ "Something went wrong on git_push_commit " +
+ "function, please check logs.")
@staticmethod
def is_gitlab_ready(user_content):
diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py
index b63cb66..0b57b00 100644
--- a/services/api/api_jenkins.py
+++ b/services/api/api_jenkins.py
@@ -40,10 +40,10 @@ from django.conf import settings
from requests.auth import HTTPBasicAuth
from services.constants import Constants
from services.helper import Helper
+from services.session import session
import logging
import requests
import time
-from services.session import session
logger = logging.getLogger('ice-ci.logger')
@@ -60,7 +60,8 @@ class APIJenkins:
try:
r1 = requests.get(getURL, auth=HTTPBasicAuth(
settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD))
- while r1.status_code != 200 and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ while r1.status_code != 200 and counter <= \
+ Constants.GitLabConstants.RETRIES_NUMBER:
r1 = requests.get(getURL, auth=HTTPBasicAuth(
settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD))
time.sleep(session.wait_until_time_pause)
@@ -69,14 +70,15 @@ class APIJenkins:
counter += 1
Helper.internal_assert(r1.status_code, 200)
logger.debug("Job was created on APIJenkins!")
- except:
+ except BaseException:
msg = None
if r1 is None:
msg = "APIJenkins didn't create job for %s" % job_name
else:
- msg = "APIJenkins didn't create job for %s, see response >>> %s %s" % (
- job_name, r1.status_code, r1.reason)
+ msg = "APIJenkins didn't create job for %s, " +\
+ "see response >>> %s %s" % (
+ job_name, r1.status_code, r1.reason)
logger.error(msg)
raise Exception(msg)
@@ -85,6 +87,7 @@ class APIJenkins:
def find_build_num_out_of_jenkins_log(log):
lines_array = log.splitlines()
for line in lines_array:
- if Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.BUILD_IDENTIFIER in line:
+ if Constants.Dashboard.Checklist.JenkinsLog.\
+ Modal.Body.BUILD_IDENTIFIER in line:
parts = line.partition('jenkins')
return parts[2]
diff --git a/services/api/api_rados.py b/services/api/api_rados.py
index 61cfa5c..cdad7d4 100644
--- a/services/api/api_rados.py
+++ b/services/api/api_rados.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -36,7 +36,6 @@
# ============LICENSE_END============================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import logging
import time
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
@@ -50,6 +49,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class APIRados:
@staticmethod
@@ -75,77 +75,56 @@ class APIRados:
@staticmethod
def get_bucket_grants(bucket_name):
"""Return the Grants."""
- counter = 1
- bucket = APIRados.get_bucket(bucket_name)
- while not bucket and counter <= Constants.RGWAConstants.BUCKET_RETRIES_NUMBER:
- logger.error("Bucket not found. Retry #%s" % counter)
- time.sleep(session.wait_until_time_pause_long)
+ for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
bucket = APIRados.get_bucket(bucket_name)
- counter += 1
- if not bucket:
+ if bucket:
+ break
+ logger.error("Bucket not found. Retry #%s" % counter+1)
+ time.sleep(session.wait_until_time_pause_long)
+ else:
raise TimeoutError("Max retries exceeded, failing test...")
grants = bucket.list_grants()
- print("***********grants=", grants)
return grants
@staticmethod
def is_bucket_ready(bucket_id):
- counter = 1
- bucket = APIRados.get_bucket(bucket_id)
- while (bucket == None and counter <=
- Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
- time.sleep(session.wait_until_time_pause_long)
- logger.debug(
- "bucket are not ready yet, trying again (%s of 180)" % counter)
+ for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
bucket = APIRados.get_bucket(bucket_id)
- counter += 1
- print("****_+__+bucket= ", str(bucket))
- time.sleep(session.wait_until_time_pause_long)
- if bucket == None:
+ if bucket:
+ break
+ logger.debug(
+ "bucket are not ready yet, trying again (%s of %s)" % (
+ counter+1, Constants.RGWAConstants.BUCKET_RETRIES_NUMBER))
+ time.sleep(session.wait_until_time_pause_long)
+ else:
raise TimeoutError("Max retries exceeded, failing test...")
- elif bucket != None:
- logger.debug("bucket are ready to continue!")
- return True
+ logger.debug("bucket are ready to continue!")
+ return True
@staticmethod
def users_of_bucket_ready_after_complete(bucket_id, user_name):
- grants = APIRados.get_bucket_grants(bucket_id)
- count = 0
- counter = 1
- while (count != 0 and counter <=
- Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
+ for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
grants = APIRados.get_bucket_grants(bucket_id)
+ if not any(user_name == g.id for g in grants):
+ break
time.sleep(session.wait_until_time_pause_long)
- for g in grants:
- if g.id == user_name:
- count = +1
- time.sleep(session.wait_until_time_pause_long)
- if count != 0:
+ else:
raise Exception("Max retries exceeded, failing test...")
return False
- elif count == 0:
- logger.debug("users_of_bucket are ready to continue!")
- return True
+ logger.debug("users_of_bucket are ready to continue!")
+ return True
@staticmethod
- def users_of_bucket_ready_after_created(bucket_id, user_name):
- grants = APIRados.get_bucket_grants(bucket_id)
- count = 0
- counter = 1
- while (count == 0 and counter <=
- Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
+ def users_of_bucket_ready_after_created(bucket_id, user_uuid):
+ for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
grants = APIRados.get_bucket_grants(bucket_id)
+ if any(user_uuid == g.id for g in grants):
+ break
time.sleep(session.wait_until_time_pause_long)
- for g in grants:
- if g.id == user_name:
- count = +1
- time.sleep(session.wait_until_time_pause_long)
- if count == 0:
+ else:
raise Exception("Max retries exceeded, failing test...")
- return False
- elif count > 0:
- logger.debug("users_of_bucket are ready to continue!")
- return True
+ logger.debug("users_of_bucket are ready to continue!")
+ return True
@staticmethod
def specific_client(access_key_id, secret_access_key):
diff --git a/services/api/api_user.py b/services/api/api_user.py
index 3e38fd2..963280e 100644
--- a/services/api/api_user.py
+++ b/services/api/api_user.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -50,13 +50,14 @@ from services.logging_service import LoggingServiceFactory
logger = LoggingServiceFactory.get_logger()
+
class APIUser:
@staticmethod
# Update account API - only adds new SSH key!
def update_account(user_content):
r1 = None
- token = APIUser.login_user(user_content['email'])
+ token = APIUser.login_user(user_content['email'])
user_content['session_token'] = 'token ' + token
sshKey = Helper.generate_sshpub_key()
putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account'
@@ -82,16 +83,20 @@ class APIUser:
putURL, json=put_data, headers=headers, verify=False)
Helper.internal_assert(r1.status_code, 200)
logger.debug(
- "SSH Key was added successfully to user " + user_content['full_name'])
+ "SSH Key was added successfully to user " +
+ user_content['full_name'])
if not APIBridge.is_gitlab_ready(user_content):
raise
return sshKey
- except:
+ except BaseException:
if r1 is None:
logger.error("Failed to add public SSH key to user.")
else:
- logger.error("PUT request failed to add SSH key to user, see response >>> %s %s \n %s" % (
- r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "PUT request failed to add SSH key to user, see " +
+ "response >>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
@@ -125,12 +130,15 @@ class APIUser:
if not APIBridge.is_gitlab_ready(user_content):
raise
return True
- except:
+ except BaseException:
if r1 is None:
logger.error("Failed to add public SSH key to user.")
else:
- logger.error("PUT request failed to add SSH key to user, see response >>> %s %s \n %s" % (
- r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "PUT request failed to add SSH key to user, " +
+ "see response >>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
@@ -168,7 +176,7 @@ class APIUser:
logger.debug(str(r.status_code) + " " + r.reason)
decoded_response = r.json()
return decoded_response['token']
- except:
+ except BaseException:
logger.debug("Failed to login.")
raise
@@ -195,12 +203,14 @@ class APIUser:
if not APIBridge.is_gitlab_ready(user_content):
raise
return sshKey
- except:
+ except BaseException:
if r1 is None:
logger.error("Failed to add public SSH key.")
else:
logger.error(
- "POST request failed to add SSH key to user, see response >>> %s %s" % (r1.status_code, r1.reason))
+ "POST request failed to add SSH key to user, " +
+ "see response >>> %s %s" %
+ (r1.status_code, r1.reason))
raise
@staticmethod
@@ -214,11 +224,11 @@ class APIUser:
data = {
"company": company,
"full_name": Helper.rand_string("randomString"),
- "email": Helper.rand_string("randomString") + "@" + email_domain,
+ "email": Helper.rand_string("randomString") +
+ "@" + email_domain,
"phone_number": Constants.Default.Phone.TEXT,
"password": Constants.Default.Password.TEXT,
- "regular_email_updates": "True"
- }
+ "regular_email_updates": "True"}
return data
# If failed - count the failure and add the error to list of errors.
@@ -237,10 +247,19 @@ class APIUser:
return True
else:
raise Exception(
- "Failed to activate user >>> %s %s" % (r1.status_code, r1.reason))
+ "Failed to activate user >>> %s %s" %
+ (r1.status_code, r1.reason))
@staticmethod
- def signup_invited_user(company, invited_email, invite_token, invite_url, user_content, is_contact_user="false", activate=False, wait_for_gitlab=True):
+ def signup_invited_user(
+ company,
+ invited_email,
+ invite_token,
+ invite_url,
+ user_content,
+ is_contact_user="false",
+ activate=False,
+ wait_for_gitlab=True):
r1 = None
postURL = settings.ICE_EM_URL + '/v1/engmgr/signup'
logger.debug("Post signup URL: " + postURL)
@@ -279,12 +298,15 @@ class APIUser:
if not APIBridge.is_gitlab_ready(user_content):
raise
return post_data
- except:
+ except BaseException:
if r1 is None:
logger.error("Failed to sign up the invited team member.")
else:
- logger.error("POST request failed to sign up the invited team member, see response >>> %s %s \n %s" % (
- r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "POST request failed to sign up the invited " +
+ "team member, see response >>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
diff --git a/services/api/api_virtual_function.py b/services/api/api_virtual_function.py
index 46610e8..193d6e2 100644
--- a/services/api/api_virtual_function.py
+++ b/services/api/api_virtual_function.py
@@ -43,7 +43,6 @@ import time
from django.conf import settings
import requests
-from services.api.api_gitlab import APIGitLab
from services.api.api_user import APIUser
from services.constants import Constants, ServiceProvider
from services.database.db_general import DBGeneral
@@ -67,7 +66,7 @@ class APIVirtualFunction:
headers['Authorization'] = user_content['session_token']
data = dict() # Create JSON data for post request.
files_list = list()
- if type(files) is list:
+ if isinstance(files, list):
for file in files:
files_list.append(file)
else:
@@ -85,13 +84,16 @@ class APIVirtualFunction:
logger.debug("Next step was added to the engagement!")
ns_uuid = r1.json()
return ns_uuid[0]['uuid']
- except:
+ except BaseException:
if r1 is None:
logger.error(
"Failed to add next step to VF " + user_content['vfName'])
else:
- logger.error("Failed to add next step to VF " + user_content[
- 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error("Failed to add next step to VF " +
+ user_content['vfName'] +
+ ", see response >>> %s %s.\nContent: %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
@@ -104,12 +106,15 @@ class APIVirtualFunction:
headers = dict() # Create header for post request.
headers['Content-type'] = 'application/json'
headers['Authorization'] = 'token ' + token
- jdata = [{"virtual_function": Helper.rand_string("randomString"),
- "version": Helper.rand_string("randomString") + Helper.rand_string("randomNumber"),
- "target_lab_entry_date": time.strftime("%Y-%m-%d"),
- "target_aic_uuid": targetVersion,
- "ecomp_release": ecompRelease,
- "is_service_provider_internal": False}]
+ jdata = [
+ {
+ "virtual_function": Helper.rand_string("randomString"),
+ "version": Helper.rand_string("randomString") +
+ Helper.rand_string("randomNumber"),
+ "target_lab_entry_date": time.strftime("%Y-%m-%d"),
+ "target_aic_uuid": targetVersion,
+ "ecomp_release": ecompRelease,
+ "is_service_provider_internal": False}]
try:
r1 = requests.post(
postUrl, json=jdata, headers=headers, verify=False)
@@ -117,12 +122,14 @@ class APIVirtualFunction:
logger.debug("Virtual Function created successfully!")
content = r1.content[1:-1]
return content
- except:
+ except BaseException:
if r1 is None:
logger.debug("Failed to create VF >>> request failed!")
else:
logger.debug(
- "Failed to create VF >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ "Failed to create VF >>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
@@ -140,13 +147,15 @@ class APIVirtualFunction:
logger.debug("Retrieved the Engagement successfully!")
content = r1.content
return json.loads(content)
- except:
+ except BaseException:
if r1 is None:
logger.debug(
"Failed to Retrieve the Engagement >>> request failed!")
else:
logger.debug(
- "Failed to Retrieve the Engagement >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ "Failed to Retrieve the Engagement >>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
@@ -168,18 +177,25 @@ class APIVirtualFunction:
postURL, json=list_data, headers=headers, verify=False)
Helper.internal_assert_boolean(r1.status_code, 200)
logger.debug("Invite sent successfully to email " + data['email'])
- invite_token = DBGeneral.select_where_and("invitation_token", "ice_invitation", "email", data[
- 'email'], "engagement_uuid", user_content['engagement_uuid'], 1)
- invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + invite_token + \
- "&email=" + data['email']
+ invite_token = DBGeneral.select_where_and(
+ "invitation_token",
+ "ice_invitation",
+ "email",
+ data['email'],
+ "engagement_uuid",
+ user_content['engagement_uuid'],
+ 1)
+ invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + \
+ invite_token + "&email=" + data['email']
logger.debug("Invitation URL is: " + invite_url)
return data['email'], invite_token, invite_url
- except:
+ except BaseException:
if r1 is None:
logger.error("Failed to invite team member.")
else:
logger.error(
- "POST request failed to invite team member, see response >>> %s %s" % (r1.status_code, r1.reason))
+ "POST request failed to invite team member, " +
+ "see response >>> %s %s" % (r1.status_code, r1.reason))
raise
@staticmethod
@@ -202,21 +218,30 @@ class APIVirtualFunction:
postURL, json=data, headers=headers, verify=False)
Helper.internal_assert_boolean(r1.status_code, 200)
logger.debug("Invite sent successfully to email " + data['email'])
- invite_token = DBGeneral.select_where_and("invitation_token", "ice_invitation", "email", data[
- 'email'], "engagement_uuid", user_content['engagement_uuid'], 1)
- invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + invite_token + \
- "&email=" + data['email'] + "&full_name=" + data['full_name'] + \
- "&phone_number=" + \
- data['phone_number'] + "&company=" + \
+ invite_token = DBGeneral.select_where_and(
+ "invitation_token",
+ "ice_invitation",
+ "email",
+ data['email'],
+ "engagement_uuid",
+ user_content['engagement_uuid'],
+ 1)
+ invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" +\
+ invite_token + "&email=" + data['email'] +\
+ "&full_name=" + data['full_name'] + \
+ "&phone_number=" + data['phone_number'] + "&company=" + \
data['company'] + "&is_contact_user=true"
logger.debug("Invitation URL is: " + invite_url)
return data['email'], invite_token, invite_url
- except:
+ except BaseException:
if r1 is None:
logger.error("Failed to invite vendor contact.")
else:
- logger.error("POST request failed to invite vendor contact, see response >>> %s %s \n %s" % (
- r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ logger.error(
+ "POST request failed to invite vendor contact, " +
+ "see response >>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
@@ -239,12 +264,16 @@ class APIVirtualFunction:
postURL, json=data, headers=headers, verify=False)
Helper.internal_assert_boolean(r1.status_code, 202)
logger.debug("Next step was edited successfully!")
- except:
+ except BaseException:
if r1 is None:
logger.error("Failed to edit next step uuid: " + ns_uuid)
else:
- logger.error("Failed to edit next step uuid: " + ns_uuid +
- ", see response >>> %s %s" % (r1.status_code, r1.reason))
+ logger.error(
+ "Failed to edit next step uuid: " +
+ ns_uuid +
+ ", see response >>> %s %s" %
+ (r1.status_code,
+ r1.reason))
raise
@staticmethod
@@ -259,7 +288,8 @@ class APIVirtualFunction:
return r1.content
else:
raise Exception(
- "Failed to activate user >>> %s %s" % (r1.status_code, r1.reason))
+ "Failed to activate user >>> %s %s" %
+ (r1.status_code, r1.reason))
return False
@staticmethod
@@ -314,15 +344,19 @@ class APIVirtualFunction:
putUrl, headers=headers, verify=False)
Helper.internal_assert(r1.status_code, 202)
logger.debug(
- "Engagement stage was successfully changed to " + str(requested_stage) + "!")
+ "Engagement stage was successfully changed to " +
+ str(requested_stage) +
+ "!")
content = r1.content[1:-1]
return content
- except:
+ except BaseException:
if r1 is None:
logger.debug("Failed to set eng stage >>> request failed!")
else:
logger.debug(
- "Failed to set eng stage >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ "Failed to set eng stage >>> %s %s \n %s" %
+ (r1.status_code, r1.reason, str(
+ r1.content, 'utf-8')))
raise
@staticmethod
@@ -338,7 +372,7 @@ class APIVirtualFunction:
r1 = requests.put(putURL, headers=headers, verify=False)
Helper.internal_assert_boolean(r1.status_code, 200)
logger.debug("AIC version has changed!")
- except:
+ except BaseException:
if r1 is None:
msg = "Failed to edit AIC version"
else:
@@ -359,10 +393,11 @@ class APIVirtualFunction:
r1 = requests.put(putURL, headers=headers, verify=False)
Helper.internal_assert_boolean(r1.status_code, 200)
logger.debug("AIC version has changed!")
- except:
+ except BaseException:
if r1 is None:
msg = "Failed to update ECOMP release"
else:
- msg = "Failed to update ECOMP release, see response >>> %s %s" % (
- r1.status_code, r1.reason)
+ msg = "Failed to update ECOMP release," +\
+ " see response >>> %s %s" % (
+ r1.status_code, r1.reason)
raise msg
diff --git a/services/constants.py b/services/constants.py
index 23d7ef1..b97ff6b 100644
--- a/services/constants.py
+++ b/services/constants.py
@@ -40,9 +40,9 @@ from django.conf import settings
class ServiceProvider:
- PROGRAM_NAME = "VVP"
- MainServiceProvider = "ServiceProvider"
- email = "example-domain.com"
+ PROGRAM_NAME = settings.PROGRAM_NAME
+ MainServiceProvider = settings.SERVICE_PROVIDER
+ email = settings.SERVICE_PROVIDER_DOMAIN
class Constants:
@@ -134,6 +134,7 @@ class Constants:
ID = "toast-successfully-message"
CMS_ID = "announcement-successfully-message"
CSS = "html.ng-scope"
+ TEXT = "Important announcement: "
class Cms:
Toast_title_id = "toast-title-id"
@@ -258,7 +259,8 @@ class Constants:
TEXT = "Please fill CAPTCHA!"
class NotMainVendor:
- TEXT = "Email address should be with service provider domain for signees that their company =" \
+ TEXT = "Email address should be with service provider " +\
+ "domain for signees that their company =" \
+ ServiceProvider.MainServiceProvider
class HaveAccount:
@@ -273,7 +275,8 @@ class Constants:
class SubTitle:
CSS = "h2.ng-binding"
- TEXT = "Please follow the instructions below to activate your account."
+ TEXT = "Please follow the instructions below to " +\
+ "activate your account."
class Toast:
TEXT = "Please activate your account first"
@@ -283,7 +286,8 @@ class Constants:
class Toast:
class Success:
- TEXT = "An email with detailed instructions on how to reset your password was sent to your Email."
+ TEXT = "An email with detailed instructions on how " +\
+ "to reset your password was sent to your Email."
class Title:
CSS = "h1.ng-binding"
@@ -291,7 +295,8 @@ class Constants:
class SubTitle:
CSS = "h2.ng-binding"
- TEXT = "Please follow the instructions below to reset your password"
+ TEXT = "Please follow the instructions below to reset " +\
+ "your password"
class Button:
TEXT = "Send Instructions"
@@ -307,7 +312,8 @@ class Constants:
class SubTitle:
CSS = "h2.ng-binding"
- TEXT = "Please follow the instructions below to update your password"
+ TEXT = "Please follow the instructions below to" +\
+ " update your password"
class Password:
NAME = "password"
@@ -490,7 +496,6 @@ class Constants:
NAME = "ssh_key"
class UpdateFailed:
- # TEXT = "Something went wrong while trying to update user account"
TEXT = "Updating SSH Key failed due to invalid key."
class Update:
@@ -584,7 +589,8 @@ class Constants:
TEXT = "Statistics"
class FilterDropdown:
- CSS = "#statistics-header > .search-filters > .search-filter-stage"
+ CSS = "#statistics-header > .search-filters" +\
+ " > .search-filter-stage"
class ValidationsNumber:
ID = "id-validations-num"
@@ -751,7 +757,8 @@ class Constants:
TEXT = "Remove user from engagement team: %s"
class Message:
- TEXT = "Are you sure you would like to remove the user out of the team members?"
+ TEXT = "Are you sure you would like to remove " +\
+ "the user out of the team members?"
class NextSteps:
diff --git a/services/database/__init__.py b/services/database/__init__.py
index 30d7152..32b601a 100644
--- a/services/database/__init__.py
+++ b/services/database/__init__.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
diff --git a/services/database/db_bridge.py b/services/database/db_bridge.py
index fc765c7..1eb79fa 100644
--- a/services/database/db_bridge.py
+++ b/services/database/db_bridge.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -36,16 +36,21 @@
# ============LICENSE_END============================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+
class DBBridge:
"""
- This class helps to use functions inside classes with circular import (dependencies).
- Use this class only when there is circular import in one of the DB services.
+ This class helps to use functions inside classes
+ with circular import (dependencies).
+ Use this class only when there is circular
+ import in one of the DB services.
"""
@staticmethod
def select_personal_next_step(user_email):
- """select_personal_next_step: Originally can be found under DBUser class."""
+ """select_personal_next_step: Originally """ +\
+ """can be found under DBUser class."""
from services.database.db_user import DBUser
return DBUser.select_personal_next_step(user_email)
diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py
index 04f8a44..0f8fd6e 100644
--- a/services/database/db_checklist.py
+++ b/services/database/db_checklist.py
@@ -54,14 +54,21 @@ logger = LoggingServiceFactory.get_logger()
class DBChecklist:
@staticmethod
- def select_where_approval_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_where_approval_state(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and state = 'approval';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName) +\
+ "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\
+ " and state = 'approval';"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -74,25 +81,33 @@ class DBChecklist:
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
logger.debug("Query result: " + str(result))
- if result == None:
+ if result is None:
errorMsg = "select_where_approval_state FAILED "
logger.error(errorMsg)
raise
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where_approval_state FAILED "
raise Exception(errorMsg, "select_where_approval_state FAILED")
@staticmethod
- def select_where_pr_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_where_pr_state(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and state = 'peer_review';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName) +\
+ "Where %s = '%s' and " % (
+ whereParametrType, whereParametrValue) +\
+ "state = 'peer_review';"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -104,26 +119,33 @@ class DBChecklist:
elif(result.find(",)") != -1): # formatting ints e.g id
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
- if result == None:
+ if result is None:
errorMsg = "select_where_pr_state FAILED "
logger.error(errorMsg)
raise
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@staticmethod
- def select_where_cl_not_archive(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_where_cl_not_archive(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and state != 'archive';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName) +\
+ "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\
+ "and state != 'archive';"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -138,19 +160,25 @@ class DBChecklist:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@staticmethod
- def select_native_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_native_where(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "select %s from %s Where %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -165,7 +193,7 @@ class DBChecklist:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@@ -176,14 +204,15 @@ class DBChecklist:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "UPDATE ice_checklist SET state='review' Where name= '%s' and state= 'pending';" % (
- queryTableName)
+ queryStr = "UPDATE ice_checklist SET state='review' Where " +\
+ "name= '%s' and state= 'pending';" % (
+ queryTableName)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
dbConn.commit()
dbConn.close()
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "Could not Update User"
raise Exception(errorMsg, "Update")
@@ -194,8 +223,10 @@ class DBChecklist:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "UPDATE ice_checklist_decision SET review_value='approved' , peer_review_value='approved' Where checklist_id = '%s';" % (
- whereParametrValue)
+ queryStr = "UPDATE ice_checklist_decision SET " +\
+ "review_value='approved' , peer_review_value='approved' " +\
+ "Where checklist_id = '%s';" % (
+ whereParametrValue)
logger.debug(queryStr)
cur.execute(queryStr)
dbConn.commit()
@@ -226,7 +257,7 @@ class DBChecklist:
break
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "is_archive FAILED "
raise Exception(errorMsg, "is_archive")
@@ -248,14 +279,15 @@ class DBChecklist:
@staticmethod
def get_admin_email(checklistUuid):
try:
+ # Fetch one AT&T user ID.
owner_id = DBChecklist.select_where_approval_state(
- "owner_id", "ice_checklist", "uuid", checklistUuid, 1) # Fetch one AT&T user ID.
+ "owner_id", "ice_checklist", "uuid", checklistUuid, 1)
engLeadEmail = DBGeneral.select_where(
"email", "ice_user_profile", "id", owner_id, 1)
logger.debug("get_admin_email = " + engLeadEmail)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "get_admin_email FAILED "
raise Exception(errorMsg, "get_admin_email")
@@ -270,7 +302,7 @@ class DBChecklist:
logger.debug("getPreeReviewerEngLeadEmail = " + engLeadEmail)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "get_admin_email FAILED "
raise Exception(errorMsg, "get_owner_email")
@@ -279,83 +311,181 @@ class DBChecklist:
checklistTempid = DBGeneral.select_where(
"template_id", "ice_checklist", "name", checklistName, 1)
checklistLineItems = DBGeneral.select_where_and(
- "uuid", "ice_checklist_line_item", "line_type", "auto", "template_id", checklistTempid, 0)
+ "uuid",
+ "ice_checklist_line_item",
+ "line_type",
+ "auto",
+ "template_id",
+ checklistTempid,
+ 0)
for lineItem in checklistLineItems:
setParametrType2 = "peer_review_value"
setParametrValue2 = "approved"
whereParametrType2 = "lineitem_id"
whereParametrValue2 = lineItem
- DBGeneral.update_where_and("ice_checklist_decision", "review_value", checklistUuid, "approved",
- "checklist_id", setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2)
+ DBGeneral.update_where_and(
+ "ice_checklist_decision",
+ "review_value",
+ checklistUuid,
+ "approved",
+ "checklist_id",
+ setParametrType2,
+ setParametrValue2,
+ whereParametrType2,
+ whereParametrValue2)
@staticmethod
def checkChecklistIsUpdated():
- query = "select uuid from ice_checklist_section where template_id in (select template_id from ice_checklist_template where name='{template_name}') and name='{section_name}'".format(
- template_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, section_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT)
+ query = "select uuid from ice_checklist_section where template_id " +\
+ "in (select template_id from ice_checklist_template where " +\
+ "name='{template_name}') and name='{section_name}'".format(
+ template_name=Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.HEAT, section_name=Constants.
+ Dashboard.LeftPanel.EditChecklistTemplate.HEAT)
return DBGeneral.select_query(query)
@staticmethod
def fetchEngByVfName(vfName):
# Fetch one AT&T user ID.
- return DBGeneral.select_where("engagement_id", "ice_vf", "name", vfName, 1)
+ return DBGeneral.select_where(
+ "engagement_id", "ice_vf", "name", vfName, 1)
@staticmethod
def fetchEngManIdByEngUuid(engagement_id):
- return DBGeneral.select_where("engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ return DBGeneral.select_where(
+ "engagement_manual_id",
+ "ice_engagement",
+ "uuid",
+ engagement_id,
+ 1)
@staticmethod
def fetchChecklistByName(checklistName):
- query = "select uuid from ice_checklist where name='{cl_name}'".format(
- cl_name=checklistName)
+ query = "select uuid from ice_checklist where " +\
+ "name='{cl_name}'".format(
+ cl_name=checklistName)
return DBGeneral.select_query(query)
@staticmethod
def create_default_heat_teampleate():
- template_query = "INSERT INTO public.ice_checklist_template(uuid, name, category, version, create_time)"\
+ template_query = "INSERT INTO public.ice_checklist_template(uuid, " +\
+ "name, category, version, create_time)"\
"VALUES ('%s', '%s', '%s', '%s', '%s');" % (
- str(uuid4()), 'Editing Heat', 'first category', '1', timezone.now())
+ str(uuid4()), 'Editing Heat', 'first category', '1',
+ timezone.now())
DBGeneral.insert_query(template_query)
template_id = DBGeneral.select_query(
- "SELECT uuid FROM public.ice_checklist_template where name = 'Editing Heat'")
+ "SELECT uuid FROM public.ice_checklist_template where " +
+ "name = 'Editing Heat'")
# SECTIONS
- section1_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'External References',
- '1', 'section descripyion', 'valid instructions', timezone.now(), template_id)
+ section1_query = "INSERT INTO public.ice_checklist_section(uuid, " +\
+ "name, weight, description, validation_instructions, " +\
+ "create_time, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (
+ str(uuid4()), 'External References',
+ '1', 'section descripyion', 'valid instructions',
+ timezone.now(), template_id)
DBGeneral.insert_query(section1_query)
section1_id = DBGeneral.select_query(
- ("""SELECT uuid FROM public.ice_checklist_section where name = 'External References' and template_id = '{s}'""").format(s=template_id))
- section2_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'Parameter Specification',
- '2', 'section descripyion', 'valid instructions', timezone.now(), template_id)
+ ("""SELECT uuid FROM public.ice_checklist_section """ +
+ """where name = 'External References' """ +
+ """and template_id = '{s}'""").format(
+ s=template_id))
+ section2_query = "INSERT INTO public.ice_checklist_section(uuid, " +\
+ "name, weight, description, validation_instructions, " +\
+ "create_time, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (
+ str(uuid4()), 'Parameter Specification',
+ '2', 'section descripyion', 'valid instructions',
+ timezone.now(), template_id)
DBGeneral.insert_query(section2_query)
section2_id = DBGeneral.select_query(
- ("""SELECT uuid FROM public.ice_checklist_section where name = 'Parameter Specification' and template_id = '{s}'""").format(s=template_id))
+ ("""SELECT uuid FROM public.ice_checklist_section """ +
+ """where name = """ +
+ """'Parameter Specification' and template_id = '{s}'""").format(
+ s=template_id))
# Line items
- line_item1 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Normal references', '1', 'Numeric parameters should include range and/or allowed values.', 'manual',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section1_id, template_id)
+ line_item1 = \
+ "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, validation_instructions," +\
+ "create_time,section_id, template_id) "\
+ "VALUES ('%s', '%s', " % (str(uuid4()), 'Normal references') +\
+ "'%s', " % '1' +\
+ "'%s'," % 'Numeric parameters should include ' +\
+ 'range and/or allowed values.' +\
+ " '%s'," % 'manual', +\
+ "'%s'" % 'Here are some useful tips ' +\
+ 'for how to validate this item ' +\
+ 'in the most awesome way:<br><br><ul><li>Here is my ' +\
+ 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +\
+ 'Here is my awesome tip 3</li></ul>' +\
+ ", '%s'" % timezone.now() +\
+ ", '%s'," % section1_id +\
+ " '%s');" % template_id
DBGeneral.insert_query(line_item1)
- line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'String parameters', '2', 'Numeric parameters should include range and/or allowed values.', 'auto',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id)
+ line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, validation_instructions," +\
+ "create_time, section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (
+ str(uuid4()), 'String parameters', '2',
+ 'Numeric parameters should include range ' +
+ 'and/or allowed values.', 'auto',
+ 'Here are some useful tips for how to validate this item ' +
+ 'in the most awesome way:<br><br><ul><li>Here is my ' +
+ 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +
+ 'Here is my awesome tip 3</li></ul>', timezone.now(),
+ section2_id, template_id)
DBGeneral.insert_query(line_item2)
- line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Numeric parameters', '3', 'Numeric parameters should include range and/or allowed values.', 'manual',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id)
+ line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, validation_instructions," +\
+ "create_time,section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', " +\
+ "'%s', '%s', '%s');" % (
+ str(uuid4()), 'Numeric parameters', '3',
+ 'Numeric parameters should include range and/or ' +
+ 'allowed values.', 'manual',
+ 'Here are some useful tips for how to validate this item ' +
+ 'in the most awesome way:<br><br><ul><li>Here is my ' +
+ 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +
+ 'Here is my awesome tip 3</li></ul>', timezone.now(),
+ section2_id, template_id)
DBGeneral.insert_query(line_item3)
- line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'VF image', '2', 'Numeric parameters should include range and/or allowed values.', 'auto',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section1_id, template_id)
+ line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, " +\
+ "validation_instructions,create_time, section_id, " +\
+ "template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', " +\
+ "'%s', '%s');" % (
+ str(uuid4()), 'VF image', '2',
+ 'Numeric parameters should include range and/or ' +
+ 'allowed values.', 'auto',
+ 'Here are some useful tips for how to validate this ' +
+ 'item in the most awesome way:<br><br><ul><li>Here is ' +
+ 'my awesome tip 1</li><li>Here is my awesome tip 2' +
+ '</li><li>Here is my awesome tip 3</li></ul>',
+ timezone.now(), section1_id, template_id)
DBGeneral.insert_query(line_item4)
- line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Parameters', '1', 'Numeric parameters should include range and/or allowed values.', 'auto',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id)
+ line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, validation_instructions," +\
+ "create_time,section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s'," +\
+ " '%s', '%s');" % (str(
+ uuid4()), 'Parameters', '1',
+ 'Numeric parameters should include range ' +
+ 'and/or allowed values.', 'auto',
+ 'Here are some useful tips for how to validate this item ' +
+ 'in the most awesome way:<br><br><ul><li>Here is my awesome ' +
+ 'tip 1</li><li>Here is my awesome tip 2</li><li>Here is my ' +
+ 'awesome tip 3</li></ul>', timezone.now(), section2_id,
+ template_id)
DBGeneral.insert_query(line_item5)
@staticmethod
def create_editing_cl_template_if_not_exist():
- template_id = DBGeneral.select_query(("""SELECT uuid FROM public.ice_checklist_template where name = '{s}'""").format(
- s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT))
+ template_id = DBGeneral.select_query(
+ ("""SELECT uuid FROM public.ice_checklist_template """ +
+ """where name = '{s}'""").format(
+ s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT))
if template_id == 'None':
DBChecklist.create_default_heat_teampleate()
session.createTemplatecount = True
@@ -366,10 +496,14 @@ class DBChecklist:
"state", Constants.DBConstants.IceTables.CHECKLIST,
identify_field, field_value, "create_time")[0]
counter = 0
- while get_state != expected_state and counter <= Constants.DBConstants.RETRIES_NUMBER:
+ while get_state != expected_state and \
+ counter <= Constants.DBConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause_long)
- logger.debug("Checklist state not changed yet , expecting state: %s, current result: %s (attempt %s of %s)" % (
- expected_state, get_state, counter, Constants.DBConstants.RETRIES_NUMBER))
+ logger.debug(
+ "Checklist state not changed yet ," +
+ "expecting state: %s, current result: %s (attempt %s of %s)" %
+ (expected_state, get_state, counter,
+ Constants.DBConstants.RETRIES_NUMBER))
counter += 1
get_state = DBGeneral.select_where_order_by_desc(
"state", Constants.DBConstants.IceTables.CHECKLIST,
@@ -380,7 +514,9 @@ class DBChecklist:
expected_state + ", and was verified over the DB")
return expected_state
raise Exception(
- "Expected checklist state never arrived " + expected_state, get_state)
+ "Expected checklist state never arrived " +
+ expected_state,
+ get_state)
@staticmethod
def get_recent_checklist_uuid(name):
diff --git a/services/database/db_cms.py b/services/database/db_cms.py
index 3c2b2c6..288121a 100644
--- a/services/database/db_cms.py
+++ b/services/database/db_cms.py
@@ -1,5 +1,4 @@
-
-# ============LICENSE_START==========================================
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -39,17 +38,9 @@
import psycopg2
from wheel.signatures import assertTrue
-from services.constants import Constants
from services.database.db_general import DBGeneral
-from services.frontend.base_actions.click import Click
-from services.frontend.base_actions.enter import Enter
-from services.frontend.base_actions.wait import Wait
-from services.frontend.fe_dashboard import FEDashboard
-from services.frontend.fe_general import FEGeneral
-from services.frontend.fe_user import FEUser
from services.helper import Helper
from services.logging_service import LoggingServiceFactory
-from services.session import session
logger = LoggingServiceFactory.get_logger()
@@ -70,7 +61,7 @@ class DBCMS:
dbConn.close()
logger.debug("Insert query success!")
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
raise Exception("Couldn't fetch answer using the given query.")
@staticmethod
@@ -86,7 +77,7 @@ class DBCMS:
dbConn.close()
logger.debug("Update query success!")
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
raise Exception("Couldn't fetch answer using the given query.")
@staticmethod
@@ -107,14 +98,14 @@ class DBCMS:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
raise Exception("Couldn't fetch answer using the given query.")
@staticmethod
def get_cms_category_id(categoryName):
logger.debug("Get DBCMS category id for name: " + categoryName)
- queryStr = "SELECT id FROM public.blog_blogcategory WHERE title = '%s' LIMIT 1;" % (
- categoryName)
+ queryStr = "SELECT id FROM public.blog_blogcategory WHERE " +\
+ "title = '%s' LIMIT 1;" % (categoryName)
logger.debug("Query : " + queryStr)
result = DBCMS.select_query(queryStr)
return result
@@ -123,9 +114,17 @@ class DBCMS:
def insert_cms_new_post(title, description, categoryName):
logger.debug("Insert new post : " + title)
queryStr = "INSERT INTO public.blog_blogpost" \
- "(comments_count, keywords_string, rating_count, rating_sum, rating_average, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, content, allow_comments, featured_image, site_id, user_id) "\
- "VALUES (0, '', 0, 0, 0, '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '2 day', 2, current_timestamp - interval '1 day', NULL, '', true, '<p>%s</p>', true, '', 1, 1);" % (
- title, title, description, description)
+ "(comments_count, keywords_string, rating_count, rating_sum, " +\
+ "rating_average, title, slug, _meta_title, description, " +\
+ "gen_description, created, updated, status, publish_date, " +\
+ "expiry_date, short_url, in_sitemap, content, allow_comments, " +\
+ "featured_image, site_id, user_id) "\
+ "VALUES (0, '', 0, 0, 0, " +\
+ "'%s', '%s-slug', " % (title, title) +\
+ "'', '%s', true, " % description +\
+ "current_timestamp - interval '1 day', current_timestamp - " +\
+ "interval '2 day', 2, current_timestamp - interval '1 day', " +\
+ "NULL, '', true, '<p>%s</p>', true, '', 1, 1);" % description
logger.debug("Query : " + queryStr)
DBCMS.insert_query(queryStr)
post_id = DBCMS.get_last_added_post_id()
@@ -144,9 +143,9 @@ class DBCMS:
@staticmethod
def update_days(xdays, title):
logger.debug("Get the id of the post inserted")
-# queryStr = "select MAX(id) FROM public.blog_blogpost;"
- queryStr = "UPDATE public.blog_blogpost SET created=current_timestamp - interval '%s day' WHERE title='%s';" % (
- xdays, title)
+ queryStr = "UPDATE public.blog_blogpost SET " +\
+ "created=current_timestamp - interval '%s day' " % xdays +\
+ "WHERE title='%s';" % title
logger.debug("Query : " + queryStr)
result = DBCMS.update_query(queryStr)
return result
@@ -154,15 +153,17 @@ class DBCMS:
@staticmethod
def add_category_to_post(postId, categoryId):
logger.debug("bind category into inserted post: " + postId)
- queryStr = "INSERT INTO public.blog_blogpost_categories(blogpost_id, blogcategory_id) VALUES (%s, %s);" % (
- postId, categoryId)
+ queryStr = "INSERT INTO public.blog_blogpost_categories" +\
+ "(blogpost_id, blogcategory_id) " +\
+ "VALUES (%s, %s);" % (postId, categoryId)
logger.debug("Query : " + queryStr)
DBCMS.insert_query(queryStr)
@staticmethod
def get_documentation_page_id():
logger.debug("Retrive id of documentation page: ")
- queryStr = "SELECT id FROM public.pages_page WHERE title = 'Documentation' LIMIT 1;"
+ queryStr = "SELECT id FROM public.pages_page WHERE " +\
+ "title = 'Documentation' LIMIT 1;"
logger.debug("Query : " + queryStr)
result = DBCMS.select_query(queryStr)
return result
@@ -191,17 +192,27 @@ class DBCMS:
if parent_id is None:
parent_id = DBCMS.get_documentation_page_id()
queryStr = "INSERT INTO public.pages_page(" \
- "keywords_string, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, _order, in_menus, titles, content_model, login_required, parent_id, site_id)" \
- "VALUES ('', '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '1 day', 2, current_timestamp - interval '1 day', NULL, '', true, 0, '1,2,3', '%s', 'richtextpage', true, %s, 1);" % (
- title, title, content, title, parent_id)
+ "keywords_string, title, slug, _meta_title, description, " +\
+ "gen_description, created, updated, status, publish_date, " +\
+ "expiry_date, short_url, in_sitemap, _order, in_menus, titles, " +\
+ "content_model, login_required, parent_id, site_id)" \
+ "VALUES ('', " +\
+ "'%s', '%s-slug'" % (title, title) +\
+ ", '', '%s', true, " % content +\
+ "current_timestamp - interval '1 day', current_timestamp " +\
+ "- interval '1 day', 2, current_timestamp - interval '1 day', " +\
+ "NULL, '', true, 0, '1,2,3', " +\
+ "'%s', 'richtextpage', " % title +\
+ "true, %s, 1);" % parent_id
logger.debug("Query : " + queryStr)
DBCMS.insert_query(queryStr)
createdPageId = DBCMS.get_last_inserted_page_id()
logger.debug(
"Bind the page with the rich text content related to this page")
- queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, content) VALUES (%s, '<p>%s</p>');" % (
- createdPageId, content)
+ queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, " +\
+ "content) VALUES (%s, '<p>%s</p>');" % (
+ createdPageId, content)
logger.debug("Query : " + queryStr)
DBCMS.insert_query(queryStr)
return createdPageId
@@ -244,8 +255,10 @@ class DBCMS:
@staticmethod
def update_X_days_back_post(title, xdays):
logger.debug("Get the id of the post inserted")
- queryStr = "UPDATE blog_blogpost SET created = current_timestamp - interval '%s day', publish_date=current_timestamp - interval '%s day' WHERE title= '%s' ;" % (
- xdays, xdays, title)
+ queryStr = "UPDATE blog_blogpost SET created = current_timestamp" +\
+ " - interval '%s day', " % xdays +\
+ "publish_date=current_timestamp - " +\
+ "interval '%s day' WHERE title= '%s' ;" % (xdays, title)
logger.debug("Query : " + queryStr)
DBCMS.update_query(queryStr)
diff --git a/services/database/db_general.py b/services/database/db_general.py
index c850d3a..2c83fb0 100755
--- a/services/database/db_general.py
+++ b/services/database/db_general.py
@@ -1,5 +1,4 @@
-
-# ============LICENSE_START==========================================
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -48,22 +47,31 @@ from services.logging_service import LoggingServiceFactory
logger = LoggingServiceFactory.get_logger()
+
class DBGeneral:
@staticmethod
# desigredDB: Use 'default' for CI General and 'em_db' for EM General
# (according to settings.DATABASES).
def return_db_native_connection(desigredDB):
- dbConnectionStr = "dbname='" + str(settings.SINGLETONE_DB[desigredDB]['NAME']) + \
+ dbConnectionStr = "dbname='" + str(
+ settings.SINGLETONE_DB[desigredDB]['NAME']) + \
"' user='" + str(settings.SINGLETONE_DB[desigredDB]['USER']) + \
"' host='" + str(settings.SINGLETONE_DB[desigredDB]['HOST']) + \
- "' password='" + str(settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \
+ "' password='" + str(
+ settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \
"' port='" + \
str(settings.SINGLETONE_DB[desigredDB]['PORT']) + "'"
return dbConnectionStr
@staticmethod
- def insert_results(testType, testFeature, testResult, testName, testDuration, notes=" "):
+ def insert_results(
+ testType,
+ testFeature,
+ testResult,
+ testName,
+ testDuration,
+ notes=" "):
try:
if settings.DATABASE_TYPE == 'sqlite':
dbfile = str(settings.DATABASES['default']['TEST_NAME'])
@@ -80,13 +88,20 @@ class DBGeneral:
raise Exception(errorMsg)
try: # Create INSERT query.
if settings.DATABASE_TYPE == 'sqlite':
- query_str = 'INSERT INTO ice_test_results (testType, testFeature, testResult, testName, notes,'\
- 'create_time, build_id, duration) VALUES (?, ?, ?, ?, ?, ?, ?, ?);'
+ query_str = 'INSERT INTO ice_test_results ' +\
+ '(testType, testFeature, testResult, testName, notes,'\
+ 'create_time, build_id, duration) VALUES ' +\
+ '(?, ?, ?, ?, ?, ?, ?, ?);'
else:
- query_str = 'INSERT INTO ice_test_results ("testType", "testFeature", "testResult", "testName", notes,'\
- 'create_time, build_id, duration) VALUES (%s, %s, %s, %s, %s, %s, %s, %s);'
- cur.execute(query_str, (testType, testFeature, testResult, testName, notes, str(datetime.now()),
- settings.ICE_BUILD_REPORT_NUM, testDuration))
+ query_str = 'INSERT INTO ice_test_results ("testType", ' +\
+ '"testFeature", "testResult", "testName", notes,'\
+ 'create_time, build_id, duration) VALUES ' +\
+ '(%s, %s, %s, %s, %s, %s, %s, %s);'
+ cur.execute(query_str, (testType, testFeature, testResult,
+ testName, notes,
+ str(datetime.now()),
+ settings.ICE_BUILD_REPORT_NUM,
+ testDuration))
dbConn.commit()
logger.debug("Test result in DB - " + testResult)
except Exception as e:
@@ -123,7 +138,7 @@ class DBGeneral:
dbConn.close()
logger.debug("Query result: " + str(result))
return result
- except:
+ except BaseException:
raise Exception("Couldn't fetch answer using the given query.")
@staticmethod
@@ -182,7 +197,7 @@ class DBGeneral:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where_email FAILED "
raise Exception(errorMsg, "select_where_email")
raise
@@ -213,13 +228,19 @@ class DBGeneral:
raise Exception(errorMsg, "select_from")
@staticmethod
- def select_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_where(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
cur = dbConn.cursor()
queryStr = "select %s from %s Where %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -234,17 +255,24 @@ class DBGeneral:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@staticmethod
- def select_where_order_by_desc(queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by):
+ def select_where_order_by_desc(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ order_by):
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' order by %s desc limit 1;" \
- % (queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName,) +\
+ "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\
+ "order by %s desc limit 1;" % order_by
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = str(cur.fetchall())
@@ -274,14 +302,22 @@ class DBGeneral:
return result
@staticmethod
- def select_where_not_and_order_by_desc(queryColumnName, queryTableName, whereParametrType,
- whereParametrValue, parametrTypeAnd, parametrAnd, order_by):
+ def select_where_not_and_order_by_desc(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ parametrTypeAnd,
+ parametrAnd,
+ order_by):
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and %s != '%s' order by %s desc limit 1;" \
- % (queryColumnName, queryTableName, whereParametrType, whereParametrValue,
- parametrTypeAnd, parametrAnd, order_by)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName) +\
+ "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\
+ "and %s != '%s' " % (parametrTypeAnd, parametrAnd) +\
+ "order by %s desc limit 1;" % order_by
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = str(cur.fetchall())
@@ -290,15 +326,22 @@ class DBGeneral:
return result
@staticmethod
- def select_where_and(queryColumnName, queryTableName, whereParametrType, whereParametrValue,
- parametrTypeAnd, parametrAnd, fetchNum):
+ def select_where_and(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ parametrTypeAnd,
+ parametrAnd,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "select %s from %s Where %s = '%s' and %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd)
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue, parametrTypeAnd, parametrAnd)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -314,19 +357,27 @@ class DBGeneral:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where_and FAILED "
raise Exception(errorMsg, "select_where_and")
@staticmethod
- def select_where_is_bigger(queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd, fetchNum):
+ def select_where_is_bigger(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ parametrTypeAnd,
+ parametrAnd,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "select %s from %s Where %s = '%s' and %s > %s;" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd)
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue, parametrTypeAnd, parametrAnd)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -340,19 +391,25 @@ class DBGeneral:
dbConn.close()
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where_is_bigger FAILED "
raise Exception(errorMsg, "select_where_is_bigger")
@staticmethod
- def update_where(queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue):
+ def update_where(
+ queryTableName,
+ setParametrType,
+ setparametrValue,
+ whereParametrType,
+ whereParametrValue):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "UPDATE %s SET %s = '%s' Where %s = '%s';" % (
- queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue)
+ queryTableName, setParametrType, setparametrValue,
+ whereParametrType, whereParametrValue)
cur.execute(queryStr)
dbConn.commit()
logger.debug("Query : " + queryStr)
@@ -376,25 +433,37 @@ class DBGeneral:
dbConn.commit()
dbConn.close()
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "Could not Update User"
raise Exception(errorMsg, "Update")
@staticmethod
- def update_where_and(queryTableName, setParametrType, parametrValue, changeToValue, whereParametrType, setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2):
+ def update_where_and(
+ queryTableName,
+ setParametrType,
+ parametrValue,
+ changeToValue,
+ whereParametrType,
+ setParametrType2,
+ setParametrValue2,
+ whereParametrType2,
+ whereParametrValue2):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "UPDATE %s SET %s = '%s', %s = '%s' Where %s = '%s' and %s = '%s';" % (
- queryTableName, setParametrType, changeToValue, setParametrType2, setParametrValue2, whereParametrType, parametrValue, whereParametrType2, whereParametrValue2)
+ queryStr = "UPDATE %s SET " % queryTableName +\
+ "%s = '%s', " % (setParametrType, changeToValue) +\
+ "%s = '%s' Where " % (setParametrType2, setParametrValue2) +\
+ "%s = '%s' " % (whereParametrType, parametrValue) +\
+ "and %s = '%s';" % (whereParametrType2, whereParametrValue2)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
dbConn.commit()
dbConn.close()
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "Could not Update User"
raise Exception(errorMsg, "Update")
diff --git a/services/database/db_user.py b/services/database/db_user.py
index d347dd2..10d02ff 100644
--- a/services/database/db_user.py
+++ b/services/database/db_user.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -52,6 +52,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class DBUser:
@staticmethod
@@ -69,24 +70,39 @@ class DBUser:
return activationUrl
@staticmethod
- def get_contact_signup_url(invite_token, uuid, email, fullName, phoneNum, companyName):
+ def get_contact_signup_url(
+ invite_token,
+ uuid,
+ email,
+ fullName,
+ phoneNum,
+ companyName):
companyId = DBGeneral.select_where(
"uuid", "ice_vendor", "name", companyName, 1)
- signUpURLforContact = settings.ICE_PORTAL_URL + "#/signUp?invitation=" + invite_token + \
+ signUpURLforContact = settings.ICE_PORTAL_URL + \
+ "#/signUp?invitation=" + invite_token + \
"&email=" + email + "&full_name=" + fullName + \
"&phone_number=" + phoneNum + "&company=" + companyId
logger.debug("SignUpURLforContact :" + signUpURLforContact)
return signUpURLforContact
@staticmethod
- def select_invitation_token(queryColumnName, queryTableName, whereParametrType, whereParametrValue, email, fetchNum):
+ def select_invitation_token(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ email,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and email = '%s' ;" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue, email)
+ queryStr = \
+ "select %s from %s Where %s = '%s' and email = '%s' ;" % (
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue, email)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -98,14 +114,14 @@ class DBUser:
elif(result.find(",)") != -1): # formatting ints e.g id
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
- if result == None:
+ if result is None:
errorMsg = "select_where_pr_state FAILED "
logger.error(errorMsg)
raise
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@@ -115,27 +131,30 @@ class DBUser:
# Fetch one AT&T user ID.
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
reviewer_id = DBGeneral.select_where(
- "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "reviewer_id",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
engLeadFullName = DBGeneral.select_where_and(
- "full_name", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ "full_name", "ice_user_profile", "id", reviewer_id,
+ "role_id", "2", 1)
return engLeadFullName
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "get_el_name FAILED "
raise Exception(errorMsg, "get_el_name")
@staticmethod
def get_email_by_full_name(fullname):
# try:
- query_str = "select email from ice_user_profile where full_name = '%s';" % (
- fullname)
+ query_str = "select email from ice_user_profile where " +\
+ "full_name = '%s';" % (fullname)
user_email = DBGeneral.select_query(query_str)
return user_email
-# except: # If failed - count the failure and add the error to list of errors.
-# errorMsg = "get_email_by_full_name FAILED "
-# raise Exception(errorMsg, "get_el_name")
@staticmethod
def select_recent_vf_of_user(user_uuid, fetchNum):
@@ -144,8 +163,9 @@ class DBUser:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "SELECT vf_id FROM public.ice_recent_engagement where user_uuid = '%s' order by last_update desc limit 20;" % (
- user_uuid)
+ queryStr = "SELECT vf_id FROM public.ice_recent_engagement " +\
+ "where user_uuid = '%s' order by last_update " % user_uuid +\
+ "desc limit 20;"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -160,7 +180,7 @@ class DBUser:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@@ -170,14 +190,20 @@ class DBUser:
# Fetch one AT&T user ID.
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
reviewer_id = DBGeneral.select_where(
- "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "reviewer_id",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
engLeadEmail = DBGeneral.select_where_and(
- "email", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ "email", "ice_user_profile", "id", reviewer_id, "role_id",
+ "2", 1)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_el_email FAILED "
raise Exception(errorMsg, "select_el_email")
@@ -188,14 +214,15 @@ class DBUser:
engLeadId = DBUser.select_user_profile_property(email, "id")
return engLeadId
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_user_native_id FAILED "
raise Exception(errorMsg, "select_user_native_id")
@staticmethod
def select_personal_next_step(email):
user_id = DBUser.select_user_native_id(email)
- return DBGeneral.select_where("uuid", "ice_next_step", "owner_id", user_id, 1)
+ return DBGeneral.select_where(
+ "uuid", "ice_next_step", "owner_id", user_id, 1)
@staticmethod
def select_pr_email(vfName):
@@ -203,14 +230,19 @@ class DBUser:
# Fetch one AT&T user ID.
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
reviewer_id = DBGeneral.select_where(
- "peer_reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "peer_reviewer_id",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
engLeadEmail = DBGeneral.select_where(
"email", "ice_user_profile", "id", reviewer_id, 1)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_el_email FAILED "
raise Exception(errorMsg, "select_el_email")
@@ -223,16 +255,30 @@ class DBUser:
return notifIDs
@staticmethod
- def get_not_seen_notifications_number_by_email(user_email, is_negative=False):
+ def get_not_seen_notifications_number_by_email(
+ user_email, is_negative=False):
user_id = DBGeneral.select_where_email(
"id", Constants.DBConstants.IceTables.USER_PROFILE, user_email)
notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ Constants.DBConstants.Queries.COUNT,
+ Constants.DBConstants.IceTables.NOTIFICATION,
+ "user_id",
+ user_id,
+ "is_read",
+ "False",
+ 1)
if is_negative:
counter = 0
- while notifications_number != "0" and counter <= Constants.Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
+ while notifications_number != "0" and counter <= Constants.\
+ Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ Constants.DBConstants.Queries.COUNT,
+ Constants.DBConstants.IceTables.NOTIFICATION,
+ "user_id",
+ user_id,
+ "is_read",
+ "False",
+ 1)
time.sleep(1)
counter += 1
return notifications_number
@@ -240,9 +286,14 @@ class DBUser:
@staticmethod
def get_eng_lead_email_per_enguuid(enguuid):
reviewer_id = DBGeneral.select_where(
- "reviewer_id", Constants.DBConstants.IceTables.ENGAGEMENT, "uuid", enguuid, 1)
+ "reviewer_id",
+ Constants.DBConstants.IceTables.ENGAGEMENT,
+ "uuid",
+ enguuid,
+ 1)
engLeadEmail = DBGeneral.select_where(
- "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", reviewer_id, 1)
+ "email", Constants.DBConstants.IceTables.USER_PROFILE, "id",
+ reviewer_id, 1)
return engLeadEmail
@staticmethod
@@ -252,8 +303,12 @@ class DBUser:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select COUNT(*) from ice_engagement_engagement_team Where iceuserprofile_id = %s and (select engagement_stage from public.ice_engagement where uuid = engagement_id LIMIT 1) != 'Archived';" % (
- engLeadID)
+ queryStr = "select COUNT(*) from ice_engagement_engagement_team" +\
+ " Where iceuserprofile_id = %s" % engLeadID +\
+ " and (select " +\
+ "engagement_stage from public.ice_engagement " +\
+ "where uuid = engagement_id LIMIT 1) != 'Archived';"
+
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = cur.fetchall()
@@ -262,7 +317,7 @@ class DBUser:
logger.debug(result[0][0])
return result[0][0]
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_user_engagements_by_stage FAILED "
raise Exception(errorMsg, "select_user_engagements_by_stage")
@@ -273,8 +328,14 @@ class DBUser:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select count(*) from ice_engagement INNER JOIN ice_engagement_engagement_team ON ice_engagement_engagement_team.engagement_id= ice_engagement.uuid Where (ice_engagement.engagement_stage = '%s') and (ice_engagement_engagement_team.iceuserprofile_id = %s );" % (
- stage, engLeadID)
+ queryStr = "select count(*) from ice_engagement INNER JOIN " +\
+ "ice_engagement_engagement_team ON " +\
+ "ice_engagement_engagement_team.engagement_id= " +\
+ "ice_engagement.uuid Where " +\
+ "(ice_engagement.engagement_stage " +\
+ "= '%s') and " % stage +\
+ "(ice_engagement_engagement_team.iceuserprofile_id = " +\
+ "%s );" % engLeadID
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = cur.fetchall()
@@ -283,7 +344,7 @@ class DBUser:
logger.debug(result[0][0])
return result[0][0]
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_user_engagements_by_stage FAILED "
raise Exception(errorMsg, "select_user_engagements_by_stage")
@@ -294,7 +355,11 @@ class DBUser:
# Fetch one user ID.
index = DBGeneral.select_where_email("id", "auth_user", email)
DBGeneral.update_where(
- "ice_custom_user", "temp_password", encodePass, "user_ptr_id", index)
+ "ice_custom_user",
+ "temp_password",
+ encodePass,
+ "user_ptr_id",
+ index)
@staticmethod
def set_password_to_default(email):
@@ -305,8 +370,9 @@ class DBUser:
@staticmethod
def select_el_not_in_engagement(el_name, pr_name):
- query_str = "select full_name from ice_user_profile where role_id = 2 and full_name != '%s' and full_name != '%s';" % (
- el_name, pr_name)
+ query_str = "select full_name from ice_user_profile where " +\
+ "role_id = 2 and full_name != '%s' and full_name != '%s';" % (
+ el_name, pr_name)
new_user = DBGeneral.select_query(query_str)
if new_user == 'None':
new_user = DBUser.update_to_el_not_in_engagement()
@@ -316,53 +382,65 @@ class DBUser:
def select_user_uuid(email):
user_uuid = DBUser.select_user_profile_property(email, "uuid")
return user_uuid
-
+
@staticmethod
def select_access_key(email):
- access_key = DBUser.select_user_profile_property(email, "rgwa_access_key")
+ access_key = DBUser.select_user_profile_property(
+ email, "rgwa_access_key")
return access_key
-
+
@staticmethod
def select_secret_key(email):
- secret_key = DBUser.select_user_profile_property(email, "rgwa_secret_key")
+ secret_key = DBUser.select_user_profile_property(
+ email, "rgwa_secret_key")
return secret_key
-
+
@staticmethod
def update_to_el_not_in_engagement():
query_str = "select uuid from ice_user_profile where role_id = 1 ;"
user_uuid = DBGeneral.select_query(query_str)
- updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name = 'el_for_test' WHERE uuid = '%s' ;" % (
- user_uuid)
+ updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name" +\
+ " = 'el_for_test' WHERE uuid = '%s' ;" % (
+ user_uuid)
DBGeneral.update_query(updatequery)
- updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE full_name = '%s' ;" % (
- 'el_for_test')
+ updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
+ "full_name = '%s' ;" % (
+ 'el_for_test')
DBGeneral.update_query(updatequery)
return 'el_for_test'
@staticmethod
def rollback_for_el_not_in_engagement():
- query_str = "select uuid from ice_user_profile where full_name = 'el_for_test';"
+ query_str = "select uuid from ice_user_profile where full_name = " +\
+ "'el_for_test';"
user_uuid = DBGeneral.select_query(query_str)
fullName = DBBridge.helper_rand_string("randomString")
- updatequery = "UPDATE ice_user_profile SET role_id=1,full_name = '%s' WHERE uuid = '%s' ;" % (
- fullName, user_uuid)
+ updatequery = "UPDATE ice_user_profile SET role_id=1,full_name " +\
+ "= '%s' WHERE uuid = '%s' ;" % (fullName, user_uuid)
DBGeneral.update_query(updatequery)
@staticmethod
def set_engagement_peer_reviewer(engagement_uuid, email):
user_uuid = DBUser.select_user_uuid(email)
- update_query = "UPDATE ice_user_profile SET role_id=2 WHERE uuid = '%s';" % user_uuid
+ update_query = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
+ "uuid = '%s';" % user_uuid
DBGeneral.update_query(update_query)
user_id = DBGeneral.select_query(
"SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid)
- update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s WHERE uuid = '%s';" % (
- user_id, engagement_uuid)
+ update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s " +\
+ "WHERE uuid = '%s';" % (
+ user_id, engagement_uuid)
DBGeneral.update_query(update_query)
@staticmethod
def select_user_profile_property(user_email, property_name):
- return DBGeneral.select_where(property_name, "ice_user_profile", "email", user_email, 1)
+ return DBGeneral.select_where(
+ property_name,
+ "ice_user_profile",
+ "email",
+ user_email,
+ 1)
@staticmethod
def validate_user_profile_settings_in_db(user_email, checked):
@@ -391,13 +469,23 @@ class DBUser:
def get_access_key(user_uuid):
counter = 0
access_key = DBGeneral.select_where(
- "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
- while access_key == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ "rgwa_access_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
+ while access_key == "None" and counter <= \
+ Constants.RGWAConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause)
logger.debug(
- "rgwa_access_key are not ready yet, trying again (%s of 20)" % counter)
+ "rgwa_access_key are not ready yet, trying again (%s of 20)" %
+ counter)
access_key = DBGeneral.select_where(
- "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+ "rgwa_access_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
counter += 1
return access_key
@@ -405,13 +493,23 @@ class DBUser:
def get_access_secret(user_uuid):
counter = 0
access_secret = DBGeneral.select_where(
- "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
- while access_secret == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ "rgwa_secret_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
+ while access_secret == "None" and counter <= Constants.\
+ RGWAConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause)
logger.debug(
- "rgwa_secret_key are not ready yet, trying again (%s of 100)" % counter)
+ "rgwa_secret_key are not ready yet, trying again (%s of 100)" %
+ counter)
access_secret = DBGeneral.select_where(
- "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
-
+ "rgwa_secret_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
+
counter += 1
return access_secret
diff --git a/services/database/db_virtual_function.py b/services/database/db_virtual_function.py
index 143bca2..f61d1b7 100644
--- a/services/database/db_virtual_function.py
+++ b/services/database/db_virtual_function.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -49,6 +49,7 @@ from services.logging_service import LoggingServiceFactory
logger = LoggingServiceFactory.get_logger()
+
class DBVirtualFunction:
@staticmethod
@@ -67,8 +68,10 @@ class DBVirtualFunction:
try:
logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE)
# Create INSERT query.
- queryStr = "INSERT INTO %s (""uuid, name, weight, ui_visibility"") VALUES ('%s', '%s', '%s', '%s');" % (
- queryTableName, uuid, name, 0, ui_visibility)
+ queryStr = "INSERT INTO %s " % queryTableName +\
+ "(""uuid, name, weight, ui_visibility"") VALUES " +\
+ "('%s', '%s', " % (uuid, name) +\
+ "'%s', '%s');" % (0, ui_visibility)
logger.debug("Query: " + queryStr)
cur.execute(queryStr) # Execute query.
dbConn.commit()
@@ -100,7 +103,8 @@ class DBVirtualFunction:
dbConn.commit()
logger.debug("Test results are in General now.")
except Exception as e:
- errorMsg = "Failed to delete ECOMP release from General . because :" + \
+ errorMsg = "Failed to delete ECOMP release from General ." +\
+ " because :" + \
str(e)
raise Exception(errorMsg)
raise
@@ -114,8 +118,10 @@ class DBVirtualFunction:
@staticmethod
def select_next_steps_uuids_by_stage(engagement_uuid, engagement_stage):
- query = "SELECT uuid FROM %s WHERE engagement_id='%s' AND engagement_stage='%s' ORDER BY position;" % (
- Constants.DBConstants.IceTables.NEXT_STEP, engagement_uuid, engagement_stage)
+ query = "SELECT uuid FROM %s WHERE " % (
+ Constants.DBConstants.IceTables.NEXT_STEP) + "engagement_id=" +\
+ "'%s' AND engagement_stage='%s' ORDER BY position;" % (
+ engagement_uuid, engagement_stage)
return DBGeneral.select_query(query, "list", 0)
@staticmethod
@@ -125,11 +131,17 @@ class DBVirtualFunction:
@staticmethod
def select_next_step_description(next_step_uuid):
- return DBGeneral.select_where("description", "ice_next_step", "uuid", next_step_uuid, 1)
+ return DBGeneral.select_where(
+ "description",
+ "ice_next_step",
+ "uuid",
+ next_step_uuid,
+ 1)
@staticmethod
def select_eng_uuid(vf_name):
- return DBGeneral.select_where("engagement_id", "ice_vf", "name", vf_name, 1)
+ return DBGeneral.select_where(
+ "engagement_id", "ice_vf", "name", vf_name, 1)
@staticmethod
def select_engagment_uuid_by_vf_name(vfName):
@@ -138,7 +150,11 @@ class DBVirtualFunction:
engagement_manual_id = DBGeneral.select_where(
"engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
enguuid = DBGeneral.select_where(
- "uuid", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "uuid",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
return enguuid
@staticmethod
@@ -149,7 +165,8 @@ class DBVirtualFunction:
@staticmethod
def select_vf_name_by_vf_version(version_name):
- queryofname = "SELECT name FROM ice_vf WHERE version= '%s';" % version_name
+ queryofname = "SELECT name FROM ice_vf WHERE " +\
+ "version= '%s';" % version_name
vfNameDb = str(DBGeneral.select_query(queryofname))
return vfNameDb
@@ -164,64 +181,106 @@ class DBVirtualFunction:
@staticmethod
def get_engagement():
- """Use this function instead of creating a new engagement where no need to"""
- queryStr = "SELECT DISTINCT ice_engagement.uuid, engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \
- ice_user_profile.email, reviewer_table.full_name, reviewer_table.email, \
+ """Use this function instead of creating a new """ +\
+ """engagement where no need to"""
+ queryStr = "SELECT DISTINCT ice_engagement.uuid, " +\
+ "engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \
+ ice_user_profile.email, reviewer_table.full_name, " +\
+ "reviewer_table.email, \
ice_deployment_target.version, ice_ecomp_release.name \
- FROM ice_engagement LEFT JOIN ice_vf ON engagement_id = ice_engagement.uuid \
- LEFT JOIN ice_user_profile reviewer_table ON reviewer_table.id = ice_engagement.reviewer_id \
- LEFT JOIN ice_user_profile ON ice_user_profile.id = ice_engagement.peer_reviewer_id \
- LEFT JOIN ice_deployment_target ON ice_deployment_target.uuid = ice_vf.deployment_target_id \
- LEFT JOIN ice_ecomp_release ON ice_ecomp_release.uuid = ice_vf.ecomp_release_id \
+ FROM ice_engagement LEFT JOIN ice_vf ON engagement_id " +\
+ "= ice_engagement.uuid \
+ LEFT JOIN ice_user_profile reviewer_table ON " +\
+ "reviewer_table.id = ice_engagement.reviewer_id \
+ LEFT JOIN ice_user_profile ON ice_user_profile.id = " +\
+ "ice_engagement.peer_reviewer_id \
+ LEFT JOIN ice_deployment_target ON " +\
+ "ice_deployment_target.uuid = " +\
+ "ice_vf.deployment_target_id \
+ LEFT JOIN ice_ecomp_release ON " +\
+ "ice_ecomp_release.uuid = ice_vf.ecomp_release_id \
WHERE ice_user_profile.id IS NOT NULL LIMIT 1;"
list_of_values = DBGeneral.select_query(queryStr, return_type="list")
- list_of_keys = ["engagement_uuid", "engagement_manual_id", "vfName", "pr_name",
- "pr_email", "el_name", "el_email", "target_aic", "ecomp_release"]
+ list_of_keys = [
+ "engagement_uuid",
+ "engagement_manual_id",
+ "vfName",
+ "pr_name",
+ "pr_email",
+ "el_name",
+ "el_email",
+ "target_aic",
+ "ecomp_release"]
return dict(zip(list_of_keys, list_of_values))
@staticmethod
def insert_aic_version(ui_visibility="TRUE"):
new_aic_version = {
- "uuid": str(uuid.uuid4()), "name": "AIC", "version": DBBridge.helper_rand_string("randomNumber", 2), "ui_visibility": ui_visibility, "weight": 0}
+ "uuid": str(
+ uuid.uuid4()),
+ "name": "AIC",
+ "version": DBBridge.helper_rand_string(
+ "randomNumber",
+ 2),
+ "ui_visibility": ui_visibility,
+ "weight": 0}
queryStr = "INSERT INTO public.ice_deployment_target( \
uuid, name, version, ui_visibility, weight) \
- VALUES ('%s', '%s', '%s', '%s', %s);" % (new_aic_version['uuid'], new_aic_version['name'], new_aic_version['version'], new_aic_version['ui_visibility'], new_aic_version['weight'])
+ VALUES " +\
+ "('%s', '%s', '%s', '%s', %s);" % (
+ new_aic_version['uuid'],
+ new_aic_version['name'],
+ new_aic_version['version'],
+ new_aic_version['ui_visibility'],
+ new_aic_version['weight'])
DBGeneral.insert_query(queryStr)
return new_aic_version
@staticmethod
def delete_aic_version(aic_uuid):
DBGeneral.insert_query(
- "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" % aic_uuid)
+ "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" %
+ aic_uuid)
@staticmethod
def change_aic_version_weight(new_weight, old_weight):
DBGeneral.insert_query(
- "UPDATE public.ice_deployment_target SET weight=%s WHERE weight=%s" % (new_weight, old_weight))
+ "UPDATE public.ice_deployment_target " +
+ "SET weight=%s " % new_weight +
+ "WHERE weight=%s" % old_weight)
@staticmethod
def change_ecomp_release_weight(new_weight, old_weight):
DBGeneral.insert_query(
- "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" % (new_weight, old_weight))
+ "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" %
+ (new_weight, old_weight))
@staticmethod
def select_aic_version_uuid(aic_version):
- return DBGeneral.select_where("uuid", "ice_deployment_target", "version", aic_version, 1)
+ return DBGeneral.select_where(
+ "uuid", "ice_deployment_target", "version", aic_version, 1)
@staticmethod
def select_ecomp_release_uuid(ecomp_release):
- return DBGeneral.select_where("uuid", "ice_ecomp_release", "name", ecomp_release, 1)
+ return DBGeneral.select_where(
+ "uuid", "ice_ecomp_release", "name", ecomp_release, 1)
@staticmethod
def add_admin_to_eng_team(eng_uuid):
admin_db_id = DBGeneral.select_where(
- 'id', Constants.DBConstants.IceTables.USER_PROFILE, 'email', Constants.Users.Admin.EMAIL, 1)
- queryStr = "INSERT INTO public.ice_engagement_engagement_team(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % (
- eng_uuid, admin_db_id)
+ 'id',
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ 'email',
+ Constants.Users.Admin.EMAIL,
+ 1)
+ queryStr = "INSERT INTO public.ice_engagement_engagement_team" +\
+ "(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % (
+ eng_uuid, admin_db_id)
logger.debug("add_admin_to_eng_team Query: %s" % queryStr)
DBGeneral.insert_query(queryStr)
@staticmethod
def remove_engagement_from_recent(vf_uuid):
DBGeneral.insert_query(
- "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants.IceTables.RECENT, vf_uuid))
+ "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants.
+ IceTables.RECENT, vf_uuid))
diff --git a/services/frontend/__init__.py b/services/frontend/__init__.py
index 30d7152..32b601a 100644
--- a/services/frontend/__init__.py
+++ b/services/frontend/__init__.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
diff --git a/services/frontend/base_actions/__init__.py b/services/frontend/base_actions/__init__.py
index 30d7152..32b601a 100644
--- a/services/frontend/base_actions/__init__.py
+++ b/services/frontend/base_actions/__init__.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
diff --git a/services/frontend/base_actions/click.py b/services/frontend/base_actions/click.py
index 00470b7..561b2f7 100644
--- a/services/frontend/base_actions/click.py
+++ b/services/frontend/base_actions/click.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -51,7 +51,8 @@ class Click:
Wait.page_has_loaded()
Wait.id(element_id)
session.ice_driver.find_element_by_id(element_id).click()
- except Exception as e: # If failed - count the failure and add the error to list of errors.
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
errorMsg = "Failed to click_on on ID " + element_id
raise Exception(errorMsg, e)
@@ -62,18 +63,21 @@ class Click:
Wait.page_has_loaded()
Wait.name(element_name)
session.ice_driver.find_element_by_name(element_name).click()
- except Exception as e: # If failed - count the failure and add the error to list of errors.
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
errorMsg = "Failed to click_on on ID " + element_name
raise Exception(errorMsg, e)
-
+
@staticmethod
def link_text(link_inner_text, wait_for_page=False):
try:
if wait_for_page:
Wait.page_has_loaded()
Wait.link_text(link_inner_text)
- session.ice_driver.find_element_by_link_text(link_inner_text).click()
- except Exception as e: # If failed - count the failure and add the error to list of errors.
+ session.ice_driver.find_element_by_link_text(
+ link_inner_text).click()
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
errorMsg = "Failed to click_on on LINK TEXT " + link_inner_text
raise Exception(errorMsg, e)
@@ -83,8 +87,10 @@ class Click:
if wait_for_page:
Wait.page_has_loaded()
Wait.css(element_css)
- session.ice_driver.find_element_by_css_selector(element_css).click()
- except Exception as e: # If failed - count the failure and add the error to list of errors.
+ session.ice_driver.find_element_by_css_selector(
+ element_css).click()
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
errorMsg = "Failed to click_on on CSS Selector " + element_css
raise Exception(errorMsg, e)
@@ -95,7 +101,8 @@ class Click:
Wait.page_has_loaded()
Wait.xpath(element_xpath)
session.ice_driver.find_element_by_xpath(element_xpath).click()
- except Exception as e: # If failed - count the failure and add the error to list of errors.
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
errorMsg = "Failed to click_on on XPATH " + element_xpath
raise Exception(errorMsg, e)
@@ -104,5 +111,10 @@ class Click:
ns = session.ice_driver.find_element_by_id("step-description-1")
ActionChains(session.ice_driver).move_to_element(ns).perform()
Wait.css(source_css)
- source_element = session.ice_driver.find_element_by_css_selector(source_css)
- ActionChains(session.ice_driver).drag_and_drop_by_offset(source_element, xoffset, yoffset).perform()
+ source_element = session.ice_driver.find_element_by_css_selector(
+ source_css)
+ ActionChains(
+ session.ice_driver).drag_and_drop_by_offset(
+ source_element,
+ xoffset,
+ yoffset).perform()
diff --git a/services/frontend/base_actions/enter.py b/services/frontend/base_actions/enter.py
index 4577a3d..2a6992d 100644
--- a/services/frontend/base_actions/enter.py
+++ b/services/frontend/base_actions/enter.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -45,85 +45,117 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
+
class Enter:
-
+
@staticmethod
def text_by_name(attr_name_value, typed_text, wait_for_page=False):
- try: # Send keys to element in UI, by name locator (e.g. type something in text box).
+ # Send keys to element in UI, by name locator (e.g. type something in
+ # text box).
+ try:
if wait_for_page:
Wait.page_has_loaded()
Wait.name(attr_name_value)
session.ice_driver.find_element_by_name(attr_name_value).clear()
- session.ice_driver.find_element_by_name(attr_name_value).send_keys(typed_text[:-1])
+ session.ice_driver.find_element_by_name(
+ attr_name_value).send_keys(typed_text[:-1])
time.sleep(session.wait_until_time_pause)
- session.ice_driver.find_element_by_name(attr_name_value).send_keys(typed_text[-1:])
- except Exception as e: # If failed - count the failure and add the error to list of errors.
- errorMsg = "Failed to type " + typed_text + " in text box"
+ session.ice_driver.find_element_by_name(
+ attr_name_value).send_keys(typed_text[-1:])
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ errorMsg = "Failed to type " + typed_text + " in text box"
raise Exception(errorMsg, e)
-
+
@staticmethod
def text_by_xpath(attr_xpath_value, typed_text, wait_for_page=False):
- try: # Send keys to element in UI, by name locator (e.g. type something in text box).
+ # Send keys to element in UI, by name locator (e.g. type something in
+ # text box).
+ try:
if wait_for_page:
Wait.page_has_loaded()
Wait.xpath(attr_xpath_value)
session.ice_driver.find_element_by_xpath(attr_xpath_value).clear()
- session.ice_driver.find_element_by_xpath(attr_xpath_value).send_keys(typed_text[:-1])
+ session.ice_driver.find_element_by_xpath(
+ attr_xpath_value).send_keys(typed_text[:-1])
time.sleep(session.wait_until_time_pause)
- session.ice_driver.find_element_by_xpath(attr_xpath_value).send_keys(typed_text[-1:])
- except Exception as e: # If failed - count the failure and add the error to list of errors.
- errorMsg = "Failed to type " + typed_text + " in text box"
+ session.ice_driver.find_element_by_xpath(
+ attr_xpath_value).send_keys(typed_text[-1:])
+ # If failed - count the failure and add the error to list of errors.
+ except Exception:
+ errorMsg = "Failed to type " + typed_text + " in text box"
raise Exception(errorMsg, attr_xpath_value)
-
+
@staticmethod
def text_by_id(attr_id_value, typed_text, wait_for_page=False):
- try: # Send keys to element in UI, by ID locator (e.g. type something in text box).
+ # Send keys to element in UI, by ID locator (e.g. type something in
+ # text box).
+ try:
if wait_for_page:
Wait.page_has_loaded()
Wait.id(attr_id_value)
session.ice_driver.find_element_by_id(attr_id_value).clear()
- session.ice_driver.find_element_by_id(attr_id_value).send_keys(typed_text[:-1])
+ session.ice_driver.find_element_by_id(
+ attr_id_value).send_keys(typed_text[:-1])
time.sleep(session.wait_until_time_pause)
- session.ice_driver.find_element_by_id(attr_id_value).send_keys(typed_text[-1:])
- except Exception as e: # If failed - count the failure and add the error to list of errors.
- errorMsg = "Failed to type " + typed_text + " in text box"
+ session.ice_driver.find_element_by_id(
+ attr_id_value).send_keys(typed_text[-1:])
+ # If failed - count the failure and add the error to list of errors.
+ except Exception:
+ errorMsg = "Failed to type " + typed_text + " in text box"
raise Exception(errorMsg, attr_id_value)
-
+
@staticmethod
def clear(attr_id_value):
try:
Wait.id(attr_id_value)
session.ice_driver.find_element_by_id(attr_id_value).clear()
- except Exception as e:
- errorMsg = "Failed to clear text box"
+ except Exception:
+ errorMsg = "Failed to clear text box"
raise Exception(errorMsg, attr_id_value)
-
+
@staticmethod
def text_by_css(attr_css_value, typed_text, wait_for_page=False):
- try: # Send keys to element in UI, by CSS locator (e.g. type something in text box).
+ # Send keys to element in UI, by CSS locator (e.g. type something in
+ # text box).
+ try:
if wait_for_page:
Wait.page_has_loaded()
Wait.css(attr_css_value)
- session.ice_driver.find_element_by_css_selector(attr_css_value).clear()
- session.ice_driver.find_element_by_css_selector(attr_css_value).send_keys(typed_text[:-1])
+ session.ice_driver.find_element_by_css_selector(
+ attr_css_value).clear()
+ session.ice_driver.find_element_by_css_selector(
+ attr_css_value).send_keys(typed_text[:-1])
time.sleep(session.wait_until_time_pause)
- session.ice_driver.find_element_by_css_selector(attr_css_value).send_keys(typed_text[-1:])
- except Exception as e: # If failed - count the failure and add the error to list of errors.
- errorMsg = "Failed to type " + typed_text + " in text box"
+ session.ice_driver.find_element_by_css_selector(
+ attr_css_value).send_keys(typed_text[-1:])
+ # If failed - count the failure and add the error to list of errors.
+ except Exception:
+ errorMsg = "Failed to type " + typed_text + " in text box"
raise Exception(errorMsg, attr_css_value)
-
+
@staticmethod
- def text_by_link_text(attr_link_text_value, typed_text, wait_for_page=False):
- try: # Send keys to element in UI, by name locator (e.g. type something in text box).
+ def text_by_link_text(
+ attr_link_text_value,
+ typed_text,
+ wait_for_page=False):
+ # Send keys to element in UI, by name locator (e.g. type something in
+ # text box).
+ try:
if wait_for_page:
Wait.page_has_loaded()
Wait.link_text(attr_link_text_value)
- session.ice_driver.find_element_by_link_text(attr_link_text_value).clear()
- session.ice_driver.find_element_by_link_text(attr_link_text_value).send_keys(typed_text[:-1])
+ session.ice_driver.find_element_by_link_text(
+ attr_link_text_value).clear()
+ session.ice_driver.find_element_by_link_text(
+ attr_link_text_value).send_keys(typed_text[:-1])
time.sleep(session.wait_until_time_pause)
- session.ice_driver.find_element_by_link_text(attr_link_text_value).send_keys(typed_text[-1:])
- except Exception as e: # If failed - count the failure and add the error to list of errors.
- errorMsg = "Failed to type " + typed_text + " in text box"
+ session.ice_driver.find_element_by_link_text(
+ attr_link_text_value).send_keys(typed_text[-1:])
+ # If failed - count the failure and add the error to list of errors.
+ except Exception:
+ errorMsg = "Failed to type " + typed_text + " in text box"
raise Exception(errorMsg, attr_link_text_value)
@staticmethod
@@ -132,8 +164,10 @@ class Enter:
if wait_for_page:
Wait.page_has_loaded()
session.ice_driver.execute_script(
- "var element = angular.element(document.querySelector('" + selector + "')); element.scope()." +
- property_name + " = new Date('" + str(datetime.today().isoformat()) + "')")
+ "var element = angular.element(document." +
+ "querySelector('" + selector + "')); element.scope()." +
+ property_name + " = new Date('" +
+ str(datetime.today().isoformat()) + "')")
except Exception as e:
errorMsg = "Failed to select date with datePicker."
raise Exception(errorMsg, str(e))
diff --git a/services/frontend/base_actions/get.py b/services/frontend/base_actions/get.py
index 5fb801a..0eb7959 100644
--- a/services/frontend/base_actions/get.py
+++ b/services/frontend/base_actions/get.py
@@ -51,7 +51,7 @@ class Get:
Wait.id(attr_id_value)
return session.ice_driver.find_element_by_id(attr_id_value).text
# If failed - count the failure and add the error to list of errors.
- except Exception as e:
+ except Exception:
errorMsg = "Failed to get text of element " + attr_id_value
raise Exception(errorMsg, attr_id_value)
@@ -61,9 +61,10 @@ class Get:
if wait_for_page:
Wait.page_has_loaded()
Wait.css(attr_css_value)
- return session.ice_driver.find_element_by_css_selector(attr_css_value).text
+ return session.ice_driver.find_element_by_css_selector(
+ attr_css_value).text
# If failed - count the failure and add the error to list of errors.
- except Exception as e:
+ except Exception:
errorMsg = "Failed to get text of element " + attr_css_value
raise Exception(errorMsg, attr_css_value)
@@ -74,7 +75,7 @@ class Get:
return session.ice_driver.find_element_by_css_selector(
"#" + attr_id_value + ".wysiwyg-textarea")
# If failed - count the failure and add the error to list of errors.
- except Exception as e:
+ except Exception:
errorMsg = "Failed to get element by id " + attr_id_value
raise Exception(errorMsg, attr_id_value)
@@ -82,9 +83,10 @@ class Get:
def by_name(attr_name_value):
try:
Wait.name(attr_name_value)
- return session.ice_driver.find_element_by_name(attr_name_value).text
+ return session.ice_driver.find_element_by_name(
+ attr_name_value).text
# If failed - count the failure and add the error to list of errors.
- except Exception as e:
+ except Exception:
errorMsg = "Failed to get text of element " + attr_name_value
raise Exception(errorMsg, attr_name_value)
@@ -92,9 +94,10 @@ class Get:
def by_xpath(attr_name_value):
try:
Wait.xpath(attr_name_value)
- return session.ice_driver.find_element_by_xpath(attr_name_value).text
+ return session.ice_driver.find_element_by_xpath(
+ attr_name_value).text
# If failed - count the failure and add the error to list of errors.
- except Exception as e:
+ except Exception:
errorMsg = "Failed to get text of element " + attr_name_value
raise Exception(errorMsg, attr_name_value)
@@ -104,8 +107,9 @@ class Get:
if wait_for_page:
Wait.page_has_loaded()
Wait.name(attr_name_value)
- return session.ice_driver.find_element_by_name(attr_name_value).get_attribute("value")
- except Exception as e:
+ return session.ice_driver.find_element_by_name(
+ attr_name_value).get_attribute("value")
+ except Exception:
errorMsg = "Failed to get value by name:" + attr_name_value
raise Exception(errorMsg, attr_name_value)
@@ -113,8 +117,9 @@ class Get:
def meta_order_by_id(attr_id_value):
try:
Wait.id(attr_id_value)
- return session.ice_driver.find_element_by_id(attr_id_value).get_attribute("meta-order")
- except Exception as e:
+ return session.ice_driver.find_element_by_id(
+ attr_id_value).get_attribute("meta-order")
+ except Exception:
errorMsg = "Failed to get meta order by id:" + attr_id_value
raise Exception(errorMsg, attr_id_value)
@@ -124,8 +129,9 @@ class Get:
if wait_for_page:
Wait.page_has_loaded()
Wait.id(attr_id_value)
- return session.ice_driver.find_element_by_id(attr_id_value).is_selected()
- except Exception as e:
+ return session.ice_driver.find_element_by_id(
+ attr_id_value).is_selected()
+ except Exception:
errorMsg = "Failed to get if it's selected by id:" + attr_id_value
raise Exception(errorMsg, attr_id_value)
@@ -138,6 +144,6 @@ class Get:
return Helper.internal_assert_boolean_true_false(
session.ice_driver.find_element_by_id(
attr_id_value).get_attribute("value"), "on")
- except Exception as e:
+ except Exception:
errorMsg = "Failed to get if it's selected by id:" + attr_id_value
raise Exception(errorMsg, attr_id_value)
diff --git a/services/frontend/base_actions/wait.py b/services/frontend/base_actions/wait.py
index a699917..4434bb1 100644
--- a/services/frontend/base_actions/wait.py
+++ b/services/frontend/base_actions/wait.py
@@ -58,12 +58,12 @@ class Wait:
try: # Wait 4 seconds for element and compare to expected result.
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_retires).until(
+ WebDriverWait(
+ session.ice_driver, session.wait_until_retires).until(
expected_conditions.text_to_be_present_in_element(
- (By.XPATH, xpath), text)
- )
+ (By.XPATH, xpath), text))
# If failed - count the failure and add the error to list of errors.
- except:
+ except Exception:
error_msg = "Text - " + text + " not found in xPath " + xpath
raise Exception(error_msg, xpath)
@@ -72,12 +72,12 @@ class Wait:
try: # Wait 4 seconds for element and compare to expected result.
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_retires).until(
+ WebDriverWait(
+ session.ice_driver, session.wait_until_retires).until(
expected_conditions.text_to_be_present_in_element(
- (By.ID, element_id), text)
- )
+ (By.ID, element_id), text))
# If failed - count the failure and add the error to list of errors.
- except:
+ except Exception:
error_msg = "Text - " + text + " not found in ID " + element_id
raise Exception(error_msg, element_id)
@@ -86,10 +86,10 @@ class Wait:
try: # Wait 4 seconds for element and compare to expected result.
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_retires).until(
+ WebDriverWait(
+ session.ice_driver, session.wait_until_retires).until(
expected_conditions.text_to_be_present_in_element(
- (By.CSS_SELECTOR, css), text)
- )
+ (By.CSS_SELECTOR, css), text))
# If failed - count the failure and add the error to list of errors.
except Exception as e:
error_msg = "Text - " + text + " not found in CSS - " + css
@@ -100,10 +100,10 @@ class Wait:
try: # Wait 4 seconds for element and compare to expected result.
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_retires).until(
+ WebDriverWait(
+ session.ice_driver, session.wait_until_retires).until(
expected_conditions.text_to_be_present_in_element(
- (By.NAME, name), text)
- )
+ (By.NAME, name), text))
# If failed - count the failure and add the error to list of errors.
except Exception as e:
error_msg = "Text - " + text + " not found by NAME - " + name
@@ -114,7 +114,8 @@ class Wait:
try: # Wait 4 seconds for element and compare to expected result.
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_retires).until(
+ WebDriverWait(session.ice_driver,
+ session.wait_until_retires).until(
expected_conditions.visibility_of_element_located(
(By.ID, element_id))
)
@@ -128,10 +129,10 @@ class Wait:
try: # Wait 4 seconds for element and compare to expected result.
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_retires).until(
+ WebDriverWait(
+ session.ice_driver, session.wait_until_retires).until(
expected_conditions.visibility_of_element_located(
- (By.CSS_SELECTOR, element_css))
- )
+ (By.CSS_SELECTOR, element_css)))
# If failed - count the failure and add the error to list of errors.
except Exception as e:
error_msg = "Didn't find CSS Selector " + element_css
@@ -142,7 +143,8 @@ class Wait:
try: # Wait 4 seconds for element and compare to expected result.
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_implicit_time).until(
+ WebDriverWait(session.ice_driver,
+ session.wait_until_implicit_time).until(
expected_conditions.visibility_of_element_located(
(By.CSS_SELECTOR, element_css))
)
@@ -157,10 +159,10 @@ class Wait:
try:
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_retires).until(
+ WebDriverWait(
+ session.ice_driver, session.wait_until_retires).until(
expected_conditions.visibility_of_element_located(
- (By.LINK_TEXT, link_inner_text))
- )
+ (By.LINK_TEXT, link_inner_text)))
# If failed - count the failure and add the error to list of errors.
except Exception as e:
error_msg = "Didn't find LINK TEXT " + link_inner_text
@@ -171,10 +173,10 @@ class Wait:
try:
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_retires).until(
+ WebDriverWait(
+ session.ice_driver, session.wait_until_retires).until(
expected_conditions.visibility_of_element_located(
- (By.NAME, element_name))
- )
+ (By.NAME, element_name)))
# If failed - count the failure and add the error to list of errors.
except Exception as e:
error_msg = "Didn't find NAME " + element_name
@@ -185,10 +187,10 @@ class Wait:
try:
if wait_for_page:
Wait.page_has_loaded()
- WebDriverWait(session.ice_driver, session.wait_until_retires).until(
+ WebDriverWait(
+ session.ice_driver, session.wait_until_retires).until(
expected_conditions.visibility_of_element_located(
- (By.XPATH, element_xpath))
- )
+ (By.XPATH, element_xpath)))
# If failed - count the failure and add the error to list of errors.
except Exception as e:
error_msg = "Didn't find XPath " + element_xpath
@@ -199,14 +201,17 @@ class Wait:
for _ in range(Constants.FEConstants.RETRIES_NUMBER):
try:
httpRequests = session.ice_driver.execute_script(
- 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;')
+ 'return window.angular ? window.angular.element("body").' +
+ 'injector().get("$http").pendingRequests.length : 1;')
if(str(httpRequests) == "0"):
time.sleep(session.wait_until_time_pause)
return
logger.debug(
- "Checking if {} page is loaded. ".format(session.ice_driver.current_url))
+ "Checking if {} page is loaded. ".format(
+ session.ice_driver.current_url))
time.sleep(session.wait_until_time_pause)
except Exception as exception:
+ time.sleep(session.wait_until_time_pause)
continue
raise Exception("Page loading took too much time")
@@ -266,7 +271,10 @@ class Wait:
return True
else:
raise Exception(
- "id_to_dissappear " + id_element + " num of retries = " + str(i))
+ "id_to_dissappear " +
+ id_element +
+ " num of retries = " +
+ str(i))
@staticmethod
def name_to_dissappear(name_element, wait_for_page=False):
@@ -292,7 +300,10 @@ class Wait:
return True
else:
raise Exception(
- "name_to_dissappear " + name_element + " num of retries = " + str(i))
+ "name_to_dissappear " +
+ name_element +
+ " num of retries = " +
+ str(i))
@staticmethod
def css_to_dissappear(css_element):
@@ -316,3 +327,8 @@ class Wait:
return True
else:
raise Exception("css_to_dissappear" + css_element)
+
+ @staticmethod
+ def bucket_to_create(bucket_id):
+ logger.debug("Waiting for %s bucket to be created" % bucket_id)
+ time.sleep(session.positive_timeout)
diff --git a/services/frontend/fe_checklist.py b/services/frontend/fe_checklist.py
index 3afc472..133c6d1 100644
--- a/services/frontend/fe_checklist.py
+++ b/services/frontend/fe_checklist.py
@@ -53,7 +53,9 @@ from services.frontend.fe_user import FEUser
from services.frontend.fe_wizard import FEWizard
from services.helper import Helper
from services.logging_service import LoggingServiceFactory
-from tests.uiTests.test_ui_base import *
+from services.database.db_general import DBGeneral
+from services.constants import Constants
+from services.session import session
logger = LoggingServiceFactory.get_logger()
@@ -86,8 +88,9 @@ class FEChecklist:
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engLeadEmail = DBUser.select_el_email(vfName)
logger.debug("EL email: " + engLeadEmail)
- engagement_manual_id = DBGeneral.select_where("engagement_manual_id", "ice_engagement", "uuid",
- engagement_id, 1)
+ engagement_manual_id = DBGeneral.select_where(
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
# Click on all default next steps
myVfName = engagement_manual_id + ": " + vfName
actualVfNameid = "clickable-" + myVfName
@@ -117,8 +120,13 @@ class FEChecklist:
engagement_id, vfName, actualVfName, engagement_manual_id)
checklistUuid = DBGeneral.select_where(
"uuid", "ice_checklist", "name", checklistName, 1)
- newObjWithChecklist = [checklistUuid, engLeadEmail, engagement_manual_id, actualVfNameid, myVfName,
- checklistName]
+ newObjWithChecklist = [
+ checklistUuid,
+ engLeadEmail,
+ engagement_manual_id,
+ actualVfNameid,
+ myVfName,
+ checklistName]
return newObjWithChecklist
# If failed - count the failure and add the error to list of errors.
except Exception as e:
@@ -126,7 +134,11 @@ class FEChecklist:
raise Exception(errorMsg, "create_new_checklist")
@staticmethod
- def create_checklist(engagement_id, vfName, actualVfName, engagement_manual_id):
+ def create_checklist(
+ engagement_id,
+ vfName,
+ actualVfName,
+ engagement_manual_id):
try:
checklistName = Helper.rand_string("randomString")
Wait.id("checklist-plus-" + engagement_id, wait_for_page=True)
@@ -140,12 +152,18 @@ class FEChecklist:
"checkListName", checklistName, wait_for_page=True)
Wait.xpath("//select")
- Select(session.ice_driver.find_element_by_id(Constants.Template.Subtitle.SelectTemplateTitle.TEXT)
- ).select_by_visible_text(Constants.Template.Heat.TEXT)
+ Select(
+ session.ice_driver.find_element_by_id(
+ Constants.Template.Subtitle.SelectTemplateTitle.TEXT
+ )).select_by_visible_text(
+ Constants.Template.Heat.TEXT)
Click.id(Constants.Template.Heat.TEXT, wait_for_page=True)
# Click.css("option.ng-binding.ng-scope")
Helper.internal_assert(
- "Associate Files", Get.by_id("associated-files-title", wait_for_page=True))
+ "Associate Files",
+ Get.by_id(
+ "associated-files-title",
+ wait_for_page=True))
Click.xpath("//multiselect/div/button", wait_for_page=True)
Click.link_text("file0", wait_for_page=True)
Click.link_text("file1")
@@ -202,7 +220,7 @@ class FEChecklist:
Click.xpath("(//button[@type='button'])[11]")
try:
Click.xpath("//div[3]/multiselect/div/ul/li/a")
- except:
+ except BaseException:
Click.link_text("Homer Simpson")
Click.css("div.modal-content")
count = 0
@@ -219,7 +237,8 @@ class FEChecklist:
Click.css("div.modal-content")
Click.xpath("(//button[@type='button'])[23]")
Click.css(
- "div.btn-group.open > ul.dropdown-menu > li.ng-scope > a.ng-binding")
+ "div.btn-group.open > ul.dropdown-menu > " +
+ "li.ng-scope > a.ng-binding")
Click.link_text("Add Another Next Step")
Click.xpath("(//button[@type='button'])[25]")
FEWizard.date_picker_add_ns(count)
@@ -243,7 +262,7 @@ class FEChecklist:
Click.xpath("(//button[@type='button'])[11]")
try:
Click.xpath("//div[3]/multiselect/div/ul/li/a")
- except:
+ except BaseException:
Wait.link_text("Homer Simpson")
Click.link_text("Homer Simpson")
Wait.css("div.modal-content")
@@ -263,7 +282,8 @@ class FEChecklist:
Click.css("div.modal-content")
Click.xpath("(//button[@type='button'])[23]")
Click.css(
- "div.btn-group.open > ul.dropdown-menu > li.ng-scope > a.ng-binding")
+ "div.btn-group.open > ul.dropdown-menu > " +
+ "li.ng-scope > a.ng-binding")
Click.link_text("Add Another Next Step")
Wait.xpath("(//button[@type='button'])[25]")
Click.xpath("(//button[@type='button'])[25]")
@@ -273,7 +293,6 @@ class FEChecklist:
Click.xpath("//div[4]/div/span")
Wait.id("btn-submit")
Wait.text_by_id("btn-submit", "Submit Next Steps")
-# Helper.internal_assert("Submit Next Steps", Get.by_id("btn-submit"))
Click.id("btn-submit")
@staticmethod
@@ -294,38 +313,42 @@ class FEChecklist:
Helper.internal_assert(
"Numeric parameters", Get.by_xpath("//li[3]/span[2]"))
if settings.DATABASE_TYPE == 'local':
- Helper.internal_assert("Section 2: External References",
- Get.by_xpath("//li[2]/h2"))
- # //li[2]/ul/li/span[2] #//ul[@id='line-item-list']/li[2]/ul/li/span[2]
+ Helper.internal_assert(
+ "Section 2: External References",
+ Get.by_xpath("//li[2]/h2"))
Helper.internal_assert(
- "Normal references", Get.by_xpath("//li[2]/ul/li/span[2]"))
+ "Normal references",
+ Get.by_xpath("//li[2]/ul/li/span[2]"))
Helper.internal_assert(
"VF image", Get.by_xpath("//li[2]/ul/li[2]/span[2]"))
- except:
+ except BaseException:
if settings.DATABASE_TYPE == 'local':
Wait.text_by_css(
"h2.ng-binding", "Section 1: External References")
try:
Helper.internal_assert(
- "Normal references", Get.by_css("span.col-md-9.ng-binding"))
- except:
+ "Normal references", Get.by_css(
+ "span.col-md-9.ng-binding"))
+ except BaseException:
if "VF image" in Get.by_xpath("//li[2]/span[2]"):
logger.debug("All Ok")
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Section 2: Parameter Specification", Get.by_xpath("//li[2]/h2"))
+ "Section 2: Parameter Specification",
+ Get.by_xpath("//li[2]/h2"))
try:
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
"1.1 - Parameters", Get.by_xpath("//header/h2"))
- except:
+ except BaseException:
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
"1.1 - Normal References", Get.by_xpath("//header/h2"))
if settings.DATABASE_TYPE == 'local':
elementTxt = Get.by_id("line-item-description")
Helper.internal_assert(
- "Numeric parameters should include range and/or allowed values.", elementTxt)
+ "Numeric parameters should include " +
+ "range and/or allowed values.", elementTxt)
Helper.internal_assert("Audit Logs", Get.by_css("h3.col-md-12"))
localLogText = "local log"
Enter.text_by_id("new-audit-log-text", localLogText)
@@ -333,63 +356,72 @@ class FEChecklist:
"Add Log Entry", Get.by_id("submit-new-audit-lop-text"))
Click.id("submit-new-audit-lop-text")
vfName = newObj[0]
- engLeadFullName = DBUser.get_el_name(vfName)
+ DBUser.get_el_name(vfName)
Helper.internal_assert(localLogText, Get.by_css(
Constants.Dashboard.Checklist.AuditLog.LastLocalAuditLog.CSS))
try:
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
"Parameters", Get.by_xpath("//li[2]/ul/li/span[2]"))
- except:
+ except BaseException:
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Numeric parameters", Get.by_xpath("//li[2]/ul/li/span[2]"))
+ "Numeric parameters",
+ Get.by_xpath("//li[2]/ul/li/span[2]"))
-# if Wait.css(Constants.Dashboard.Checklist.LineItem.Deny.CSS) or
-# Wait.css(Constants.Dashboard.Checklist.LineItem.Approve.CSS):
- session.run_negative(lambda: Wait.css(Constants.Dashboard.Checklist.LineItem.Deny.CSS) or Wait.css(
- Constants.Dashboard.Checklist.LineItem.Approve.CSS), "Buttons displayed for Admin it's NOT work")
-# logger.debug("Buttons displayed for Admin it's NOT work")
-# else:
-# print("Buttons not displayed for Admin it's work")
+ session.run_negative(
+ lambda: Wait.css(
+ Constants.Dashboard.Checklist.LineItem.Deny.CSS
+ ) or Wait.css(
+ Constants.Dashboard.Checklist.LineItem.Approve.CSS),
+ "Buttons displayed for Admin it's NOT work")
if state == "APPROVAL":
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Audit Log (6)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (6)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
else:
Helper.internal_assert(
- "Audit Log (7)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (7)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
if state == "HANDOFF":
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Audit Log (8)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (8)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
else:
Helper.internal_assert(
- "Audit Log (9)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (9)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
Click.id(Constants.Dashboard.Checklist.AuditLog.ID)
Wait.text_by_xpath("//span[2]", checklistName)
- engLeadFullName = DBUser.select_el_email(vfName)
+ DBUser.select_el_email(vfName)
Enter.text_by_xpath("//textarea", "zdfgsdyh")
Click.css(Constants.SubmitButton.CSS)
Wait.modal_to_dissappear()
if state == "APPROVAL":
if settings.DATABASE_TYPE == 'local':
Wait.text_by_id(
- Constants.Dashboard.Checklist.AuditLog.ID, "Audit Log (7)")
+ Constants.Dashboard.Checklist.AuditLog.ID,
+ "Audit Log (7)")
else:
Wait.text_by_id(
- Constants.Dashboard.Checklist.AuditLog.ID, "Audit Log (8)")
+ Constants.Dashboard.Checklist.AuditLog.ID,
+ "Audit Log (8)")
if state == "HANDOFF":
if settings.DATABASE_TYPE == 'local':
Wait.text_by_id(
- Constants.Dashboard.Checklist.AuditLog.ID, "Audit Log (9)")
+ Constants.Dashboard.Checklist.AuditLog.ID,
+ "Audit Log (9)")
else:
Wait.text_by_id(
- Constants.Dashboard.Checklist.AuditLog.ID, "Audit Log (10)")
+ Constants.Dashboard.Checklist.AuditLog.ID,
+ "Audit Log (10)")
if state == "APPROVAL":
Wait.text_by_xpath("//button[3]", "Add Next Steps")
- Wait.text_by_id(Constants.Dashboard.Checklist.Reject.ID,
- Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.Checklist.Reject.ID,
+ Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT)
Wait.text_by_xpath(
"//div[@id='state-actions']/button", "Approve")
if state == "HANDOFF":
@@ -399,9 +431,10 @@ class FEChecklist:
# If failed - count the failure and add the error to list of errors.
except Exception as e:
logger.error(
- state + " state FAILED CONNECT TO STAGING MANUAL AND VERIFY WHY! ")
- errorMsg = "approval_state_actions_and_validations FAILED because : " + \
- str(e)
+ state +
+ " state FAILED CONNECT TO STAGING MANUAL AND VERIFY WHY! ")
+ errorMsg = "approval_state_actions_and_validations " +\
+ "FAILED because : " + str(e)
raise Exception(errorMsg, "approval_state_actions_and_validations")
@staticmethod
@@ -414,7 +447,8 @@ class FEChecklist:
try:
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Section 1: Parameter Specification", Get.by_css("h2.ng-binding"))
+ "Section 1: Parameter Specification",
+ Get.by_css("h2.ng-binding"))
Helper.internal_assert(
"Parameters", Get.by_css("span.col-md-9.ng-binding"))
Helper.internal_assert(
@@ -423,33 +457,38 @@ class FEChecklist:
"Numeric parameters", Get.by_xpath("//li[3]/span[2]"))
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Section 2: External References", Get.by_xpath("//li[2]/h2"))
+ "Section 2: External References",
+ Get.by_xpath("//li[2]/h2"))
Helper.internal_assert(
"Normal references", Get.by_name("Normal references"))
Helper.internal_assert(
"VF image", Get.by_name("Normal references"))
- except:
+ except BaseException:
try:
Helper.internal_assert(
- "Section 1: External References", Get.by_css("h2.ng-binding"))
- except:
+ "Section 1: External References",
+ Get.by_css("h2.ng-binding"))
+ except BaseException:
Helper.internal_assert(
- "Section 1: Scaling Considerations", Get.by_css("h2.ng-binding"))
+ "Section 1: Scaling Considerations",
+ Get.by_css("h2.ng-binding"))
try:
Helper.internal_assert(
- "Normal references", Get.by_css("span.col-md-9.ng-binding"))
- except:
+ "Normal references",
+ Get.by_css("span.col-md-9.ng-binding"))
+ except BaseException:
if "VF image" in Get.by_xpath("//li[2]/span[2]"):
logger.debug("All Ok")
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Section 2: Parameter Specification", Get.by_xpath("//li[2]/h2"))
+ "Section 2: Parameter Specification",
+ Get.by_xpath("//li[2]/h2"))
Click.name("VF image")
Click.name("Normal references")
try:
Helper.internal_assert(
"1.1 - Parameters", Get.by_xpath("//header/h2"))
- except:
+ except BaseException:
text = Get.by_name("Normal references")
Helper.internal_assert("Normal references", text)
Helper.internal_assert("Audit Logs", Get.by_css("h3.col-md-12"))
@@ -461,16 +500,18 @@ class FEChecklist:
# Validate Local AuditLog
engLeadFullName = DBUser.get_el_name(vfName)
Helper.internal_assert(
- engLeadFullName, Get.by_xpath("//ul[@id='audit-log-list']/li/h4"))
+ engLeadFullName,
+ Get.by_xpath("//ul[@id='audit-log-list']/li/h4"))
Helper.internal_assert(localLogText, Get.by_css(
Constants.Dashboard.Checklist.AuditLog.LastLocalAuditLog.CSS))
if settings.DATABASE_TYPE == 'local':
try:
Helper.internal_assert(
"Parameters", Get.by_xpath("//li[2]/ul/li/span[2]"))
- except:
+ except BaseException:
Helper.internal_assert(
- "Numeric parameters", Get.by_xpath("//li[2]/ul/li/span[2]"))
+ "Numeric parameters",
+ Get.by_xpath("//li[2]/ul/li/span[2]"))
Click.name("Normal references")
Wait.css(Constants.Dashboard.Checklist.LineItem.Deny.CSS)
Wait.css(Constants.Dashboard.Checklist.LineItem.Approve.CSS)
@@ -486,31 +527,37 @@ class FEChecklist:
print("click on V button approve of decision in state = " + state)
try:
Wait.css("li.not-relevant-btn")
- except:
+ except BaseException:
Wait.xpath("//aside/header/ul/li")
if state == "review":
Wait.id("edit-checklist")
if state == "PEER":
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Audit Log (4)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (4)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
else:
Helper.internal_assert(
- "Audit Log (5)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (5)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
if state == "review":
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Audit Log (2)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (2)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
else:
Helper.internal_assert(
- "Audit Log (3)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (3)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
if state == "APPROVAL":
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Audit Log (8)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (8)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
else:
Helper.internal_assert(
- "Audit Log (9)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (9)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
Click.id(
Constants.Dashboard.Checklist.AuditLog.ID, wait_for_page=True)
Wait.text_by_xpath("//span[2]", checklistName)
@@ -520,32 +567,39 @@ class FEChecklist:
if state == "review":
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Audit Log (3)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (3)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
else:
Helper.internal_assert(
- "Audit Log (4)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (4)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
if state == "PEER":
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Audit Log (5)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (5)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
else:
Helper.internal_assert(
- "Audit Log (6)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (6)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
if state == "APPROVAL":
if settings.DATABASE_TYPE == 'local':
Helper.internal_assert(
- "Audit Log (9)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (9)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
else:
Helper.internal_assert(
- "Audit Log (10)", Get.by_id(Constants.Dashboard.Checklist.AuditLog.ID))
+ "Audit Log (10)", Get.by_id(
+ Constants.Dashboard.Checklist.AuditLog.ID))
# Validate Buttons
if settings.DATABASE_TYPE != 'local':
FEGeneral.refresh()
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engLeadEmail = DBUser.select_el_email(vfName)
logger.debug("EL email: " + engLeadEmail)
- engagement_manual_id = DBGeneral.select_where("engagement_manual_id", "ice_engagement",
- "uuid", engagement_id, 1)
+ engagement_manual_id = DBGeneral.select_where(
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
# Click on all default next steps
myVfName = engagement_manual_id + ": " + vfName
actualVfNameid = "clickable-" + myVfName
@@ -553,18 +607,21 @@ class FEChecklist:
Click.id("checklist-" + checklistUuid)
Helper.internal_assert(
"Add Next Steps", Get.by_xpath("//button[3]"))
- Wait.text_by_id(Constants.Dashboard.Checklist.Reject.ID,
- Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT, wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Checklist.Reject.ID,
+ Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT,
+ wait_for_page=True)
Helper.internal_assert(
"Approve", Get.by_xpath("//div[@id='state-actions']/button"))
logger.debug("ALL VALIDATION PASS FOR STATE: " + state)
# If failed - count the failure and add the error to list of errors.
except Exception as e:
- errorMsg = "review_state_actions_and_validations FAILED because: " + \
- str(e)
+ errorMsg = "review_state_actions_and_validations " +\
+ "FAILED because: " + str(e)
raise Exception(errorMsg, "review_state_actions_and_validations")
logger.error(
- state + " state FAILED CONNECT TO STAGING MANUAL AND VERIFY WHY!")
+ state +
+ " state FAILED CONNECT TO STAGING MANUAL AND VERIFY WHY!")
raise
@staticmethod
@@ -574,9 +631,12 @@ class FEChecklist:
Constants.Dashboard.Checklist.Reject.ID, wait_for_page=True)
if rejectMsg:
Enter.text_by_name(
- Constants.Dashboard.Checklist.Reject.Modal.Comment.NAME, rejectMsg, wait_for_page=True)
+ Constants.Dashboard.Checklist.Reject.Modal.Comment.NAME,
+ rejectMsg,
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.Checklist.Reject.Modal.Button.ID, wait_for_page=True)
+ Constants.Dashboard.Checklist.Reject.Modal.Button.ID,
+ wait_for_page=True)
except Exception as e:
errorMsg = "Failed to reject checklist."
raise Exception(errorMsg, e)
@@ -588,7 +648,9 @@ class FEChecklist:
Enter.text_by_id("new-audit-log-text", log_txt, wait_for_page=True)
Click.id("submit-new-audit-lop-text")
Wait.text_by_css(
- Constants.Dashboard.Checklist.AuditLog.LastLocalAuditLog.CSS, log_txt, wait_for_page=True)
+ Constants.Dashboard.Checklist.AuditLog.LastLocalAuditLog.CSS,
+ log_txt,
+ wait_for_page=True)
return log_txt
except Exception as e:
errorMsg = "Failed to add audit log to line item."
@@ -599,12 +661,20 @@ class FEChecklist:
FEOverview.click_on_vf(user_content)
if checklist_uuid is None:
checklist_uuid = DBGeneral.select_where_not_and_order_by_desc(
- 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', checklistName, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')[0]
+ 'uuid',
+ Constants.DBConstants.IceTables.CHECKLIST,
+ 'name',
+ checklistName,
+ 'state',
+ Constants.ChecklistStates.Archive.TEXT,
+ 'create_time')[0]
Click.id("checklist-" + checklist_uuid, True)
@staticmethod
def validate_reject_is_enabled():
- return Wait.id(Constants.Dashboard.Checklist.Reject.ID, wait_for_page=True)
+ return Wait.id(
+ Constants.Dashboard.Checklist.Reject.ID,
+ wait_for_page=True)
@staticmethod
def cl_to_next_stage(actualVfNameid):
@@ -619,7 +689,8 @@ class FEChecklist:
'engagement_manual_id'] + ": " + user_content['vfName']
if settings.DATABASE_TYPE != 'local':
Enter.text_by_id(
- Constants.Dashboard.LeftPanel.SearchBox.ID, user_content['vfName'])
+ Constants.Dashboard.LeftPanel.SearchBox.ID,
+ user_content['vfName'])
Click.css(Constants.Dashboard.LeftPanel.SearchBox.Results.CSS)
Wait.text_by_id(
Constants.Dashboard.Overview.Title.ID, vfFullName)
@@ -627,9 +698,12 @@ class FEChecklist:
@staticmethod
def search_by_manual_id(manual_id):
Enter.text_by_id(
- Constants.Dashboard.LeftPanel.SearchBox.ID, manual_id, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.SearchBox.ID,
+ manual_id,
+ wait_for_page=True)
Click.css(
- Constants.Dashboard.LeftPanel.SearchBox.Results.CSS, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.SearchBox.Results.CSS,
+ wait_for_page=True)
Wait.id(Constants.Dashboard.Overview.Title.ID)
@staticmethod
@@ -638,16 +712,23 @@ class FEChecklist:
vfName = newObj[0]
engLeadFullName = DBUser.get_el_name(vfName)
Enter.text_by_name(
- Constants.Dashboard.Checklist.Reject.Modal.Comment.NAME, "Reject state By :" + engLeadFullName)
+ Constants.Dashboard.Checklist.Reject.Modal.Comment.NAME,
+ "Reject state By :" + engLeadFullName)
Helper.internal_assert(
- "Checklist: " + checklistName, Get.by_css("span.state-title.ng-binding"))
+ "Checklist: " + checklistName,
+ Get.by_css("span.state-title.ng-binding"))
Wait.text_by_id(Constants.Dashboard.Checklist.Reject.Modal.Button.ID,
Constants.Dashboard.Checklist.Reject.Modal.Button.TEXT)
Click.id(Constants.Dashboard.Checklist.Reject.Modal.Button.ID)
Wait.modal_to_dissappear()
@staticmethod
- def add_nsteps(checklistUuid, actualVfNameid, myVfName, checklistName, newFileNames):
+ def add_nsteps(
+ checklistUuid,
+ actualVfNameid,
+ myVfName,
+ checklistName,
+ newFileNames):
Click.id(actualVfNameid, wait_for_page=True)
checklistUuid = DBChecklist.select_where_cl_not_archive(
"uuid", "ice_checklist", "name", newFileNames[0], 1)
@@ -661,30 +742,46 @@ class FEChecklist:
Helper.internal_assert(myVfName, actualVfName)
@staticmethod
- def validate_multi_eng(user_content, checklist_content, newEL_content, actualVfNameid):
+ def validate_multi_eng(
+ user_content,
+ checklist_content,
+ newEL_content,
+ actualVfNameid):
query = "UPDATE ice_user_profile SET role_id=2 WHERE email = '" + \
str(newEL_content['email']) + "';"
DBGeneral.update_by_query(query)
FEWizard.invite_team_members_modal(newEL_content['email'])
# Fetch one AT&T user ID.
enguuid = DBGeneral.select_where(
- "uuid", "ice_engagement", "engagement_manual_id", user_content['engagement_manual_id'], 1)
+ "uuid",
+ "ice_engagement",
+ "engagement_manual_id",
+ user_content['engagement_manual_id'],
+ 1)
invitation_token = DBUser.select_invitation_token(
- "invitation_token", "ice_invitation", "engagement_uuid", enguuid, newEL_content['email'], 1)
+ "invitation_token",
+ "ice_invitation",
+ "engagement_uuid",
+ enguuid,
+ newEL_content['email'],
+ 1)
URL = Constants.Default.InviteURL.Login.TEXT + invitation_token
FEGeneral.re_open(URL)
- FEUser.login(newEL_content[
- 'email'], Constants.Default.Password.TEXT, expected_element=actualVfNameid)
+ FEUser.login(
+ newEL_content['email'],
+ Constants.Default.Password.TEXT,
+ expected_element=actualVfNameid)
Click.id(actualVfNameid, wait_for_page=True)
count = None
try:
session.ice_driver.find_element_by_id(
"checklist-" + checklist_content['uuid'])
count += 1
- except:
+ except BaseException:
logger.debug(
- "check list not visible for EL invited : " + str(newEL_content['email']))
- assertTrue(count == None)
+ "check list not visible for EL invited : " +
+ str(newEL_content['email']))
+ assertTrue(count is None)
query = "UPDATE ice_user_profile SET role_id=1 WHERE email = '" + \
str(newEL_content['email']) + "';"
DBGeneral.update_by_query(query)
@@ -710,11 +807,12 @@ class FEChecklist:
@staticmethod
def validate_audit_log(log_txt):
audit_log_list_text = Get.by_id(
- Constants.Dashboard.Checklist.AuditLog.AuditLogList.ID, wait_for_page=True)
+ Constants.Dashboard.Checklist.AuditLog.AuditLogList.ID,
+ wait_for_page=True)
try:
log_txt in audit_log_list_text
logger.debug("validate_audit_log PASS")
- except Exception as e:
+ except Exception:
errorMsg = "Failed in validate_audit_log"
raise Exception(errorMsg)
@@ -748,7 +846,9 @@ class FEChecklist:
Constants.Dashboard.Checklist.JenkinsLog.Modal.Title.TEXT, True)
log = Get.by_id(
Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.ID, True)
- Helper.assertTrue(Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.TEXT_SAMPLE in log,
- "Jenkins log could not be viewed.")
+ Helper.assertTrue(
+ Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.
+ TEXT_SAMPLE in log,
+ "Jenkins log could not be viewed.")
Click.id(Constants.Dashboard.Modal.CLOSE_BUTTON_ID)
return log
diff --git a/services/frontend/fe_checklist_template.py b/services/frontend/fe_checklist_template.py
index 19e91aa..09a497f 100644
--- a/services/frontend/fe_checklist_template.py
+++ b/services/frontend/fe_checklist_template.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -37,7 +37,6 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
from selenium.webdriver.common.action_chains import ActionChains
-from selenium.webdriver.common.keys import Keys
from services.api.api_virtual_function import APIVirtualFunction
from services.constants import Constants
@@ -58,6 +57,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEChecklistTemplate:
@staticmethod
@@ -74,94 +74,143 @@ class FEChecklistTemplate:
@staticmethod
def click_on_save_and_assert_success_msg():
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID,
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID,
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_SAVE_MSG)
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID,
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.SUCCESS_SAVE_MSG)
@staticmethod
def click_on_disabled_save_and_assert_for_promp_msg():
Click.id(
Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID)
- session.run_negative(lambda: Click.id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID),
- "Ooops modal window is opened although 'Save' button should have been disabled")
+ session.run_negative(
+ lambda: Click.id(
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.APPROVE_BTN_ID),
+ "Ooops modal window is opened although 'Save' " +
+ "button should have been disabled")
@staticmethod
def save_with_no_changes():
- Wait.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID,
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN)
- Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID, wait_for_page=True)
- Wait.text_by_name(Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT,
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT)
- Wait.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_TITLE_ID,
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_TITLE_TEXT)
Wait.text_by_id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, "Yes")
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID,
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN)
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID,
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.CL_TEMPLATE_SAVED_TXT)
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.SAVE_BTN_ID,
+ wait_for_page=True)
+ Wait.text_by_name(
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT,
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT)
+ Wait.text_by_id(
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.APPROVE_BTN_TITLE_ID,
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.APPROVE_BTN_TITLE_TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID,
+ "Yes")
+ Click.id(
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID,
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.CL_TEMPLATE_SAVED_TXT)
@staticmethod
def discard_checklist_after_modification():
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_ID,
+ wait_for_page=True)
Enter.text_by_id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_INPUT_ID, "ttttttt", wait_for_page=True)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_INPUT_ID,
+ "ttttttt",
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_ID)
Click.id(
Constants.Dashboard.LeftPanel.EditChecklistTemplate.REJECT_BTN_ID)
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.APPROVE_BTN_ID,
+ wait_for_page=True)
Wait.text_by_id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID, "All changes discarded.")
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.SUCCESS_ID,
+ "All changes discarded.")
@staticmethod
def edit_template_and_save():
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_ID)
Enter.text_by_id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_INPUT_ID, "Ros Is My Mentor")
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_INPUT_ID,
+ "Ros Is My Mentor")
FEChecklistTemplate.click_on_save_and_assert_success_msg()
@staticmethod
def del_lineitem_and_save():
- Click.id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID, wait_for_page=True)
- Enter.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_INPUT_ID,
- "Ros Is My Mentor", wait_for_page=True)
- Click.id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID)
+ Click.id(
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_ID,
+ wait_for_page=True)
+ Enter.text_by_id(
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_INPUT_ID,
+ "Ros Is My Mentor",
+ wait_for_page=True)
+ Click.id(
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_ID)
FEChecklistTemplate.click_on_save_and_assert_success_msg()
@staticmethod
def add_lineitem_and_save():
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.ADD_LINE_ITEM_BTN, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.ADD_LINE_ITEM_BTN,
+ wait_for_page=True)
Click.xpath("//li[@id='select-lineitem-btn-0.1']/span[2]")
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_BTN)
Enter.text_by_id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_NAME, "xxx")
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_NAME,
+ "xxx")
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_BTN,
+ wait_for_page=True)
FEChecklistTemplate.click_on_save_and_assert_success_msg()
@staticmethod
def edit_description_lineitem_and_save():
- isBold = False
desc = Helper.rand_string("randomString")
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_LINE_ITEM_ID)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_LINE_ITEM_ID)
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN)
- Enter.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_NAME,
- Helper.rand_string("randomString"))
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_BTN)
+ Enter.text_by_id(
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_NAME,
+ Helper.rand_string("randomString"))
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_DESC)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_DESC)
editor_element = Get.wysiwyg_element_by_id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.LINE_ITEM_DESC_TEXT_BOX)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.LINE_ITEM_DESC_TEXT_BOX)
editor_element.clear()
editor_element.send_keys(desc)
Wait.page_has_loaded()
@@ -169,49 +218,66 @@ class FEChecklistTemplate:
actionChains.double_click(editor_element).perform()
Wait.page_has_loaded()
Click.xpath(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.WYSIWYG_BUTTON_BOLD)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.WYSIWYG_BUTTON_BOLD)
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_BTN,
+ wait_for_page=True)
isBold = Wait.is_css_exists("b")
while not isBold:
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_BTN,
+ wait_for_page=True)
actionChains.double_click(editor_element).perform()
Click.xpath(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.WYSIWYG_BUTTON_BOLD, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.WYSIWYG_BUTTON_BOLD,
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_BTN,
+ wait_for_page=True)
isBold = Wait.is_css_exists("b")
if isBold:
FEChecklistTemplate.click_on_save_and_assert_success_msg()
FEGeneral.refresh()
Click.name(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT,
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_LINE_ITEM_ID, wait_for_page=True)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_LINE_ITEM_ID,
+ wait_for_page=True)
Wait.css("b")
Wait.text_by_css("b", desc, wait_for_page=True)
@staticmethod
def rollback_add_lineitem_and_save():
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.DELETE_LINE_ITEM)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.DELETE_LINE_ITEM)
FEChecklistTemplate.click_on_save_and_assert_success_msg()
FEChecklistTemplate.rollback_to_heat_teampleate()
@staticmethod
def add_lineitem_and_check_db():
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_ID)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_ID)
Enter.text_by_id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.FIRST_SECTION_INPUT_ID, "Ros Is My Mentor")
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.FIRST_SECTION_INPUT_ID,
+ "Ros Is My Mentor")
FEChecklistTemplate.click_on_save_and_assert_success_msg()
result = DBChecklist.checkChecklistIsUpdated()
Helper.internal_not_equal(result, None)
@staticmethod
def check_cl_after_lineitem_added():
- template_name = Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT
+ template_name = Constants.Dashboard.LeftPanel.\
+ EditChecklistTemplate.HEAT
user_content = APIVirtualFunction.create_engagement()
FEUser.login(
Constants.Users.Admin.EMAIL, Constants.Default.Password.TEXT)
@@ -220,15 +286,17 @@ class FEChecklistTemplate:
engLeadEmail = DBUser.select_el_email(vfName)
engagement_manual_id = DBChecklist.fetchEngManIdByEngUuid(
engagement_id)
- myVfName = engagement_manual_id + ": " + vfName
FEOverview.click_on_vf(user_content)
FEGeneral.re_open(Constants.Default.LoginURL.TEXT)
FEUser.login(
- engLeadEmail, Constants.Default.Password.TEXT, engagement_manual_id)
+ engLeadEmail,
+ Constants.Default.Password.TEXT,
+ engagement_manual_id)
Click.id(
Constants.Dashboard.LeftPanel.EditChecklistTemplate.DASHBOARD_ID)
Enter.text_by_id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.SEARCH_ENG_ID, vfName)
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.SEARCH_ENG_ID,
+ vfName)
Click.id("test_" + vfName)
checklistName = FEChecklist.create_checklist(
engagement_id, vfName, None, engagement_manual_id)
@@ -236,19 +304,28 @@ class FEChecklistTemplate:
result = DBChecklist.fetchChecklistByName(checklistName)
FEUser.go_to_admin()
FEChecklistTemplate.click_on_template_name_on_navigation(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, template_name)
+ Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT,
+ template_name)
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN)
- Enter.text_by_id(Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_NAME,
- "test_lineitem_added_and_audit_log_on_dupl_cl-NAME")
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_BTN)
+ Enter.text_by_id(
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_NAME,
+ "test_lineitem_added_and_audit_log_on_dupl_cl-NAME")
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.EDIT_LINE_ITEM_BTN)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.EDIT_LINE_ITEM_BTN)
FEChecklistTemplate.click_on_save_and_assert_success_msg()
Click.id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.DASHBOARD_ID)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.DASHBOARD_ID)
Enter.text_by_id(
- Constants.Dashboard.LeftPanel.EditChecklistTemplate.SEARCH_ENG_ID, vfName)
+ Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.SEARCH_ENG_ID,
+ vfName)
Click.id("test_" + vfName)
Click.id("checklist-" + str(result))
Helper.internal_assert(
- "1. automation", session.ice_driver.find_element_by_xpath("//li[@id='']").text)
+ "1. automation",
+ session.ice_driver.find_element_by_xpath("//li[@id='']").text)
diff --git a/services/frontend/fe_cms.py b/services/frontend/fe_cms.py
index a86626e..9f5300d 100644
--- a/services/frontend/fe_cms.py
+++ b/services/frontend/fe_cms.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -40,6 +40,7 @@ from services.constants import Constants
from services.database.db_cms import DBCMS
from services.frontend.base_actions.click import Click
from services.frontend.base_actions.enter import Enter
+from services.frontend.base_actions.get import Get
from services.frontend.base_actions.wait import Wait
from services.frontend.fe_dashboard import FEDashboard
from services.frontend.fe_general import FEGeneral
@@ -50,10 +51,12 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FECms:
@staticmethod
- def validate_5_last_announcement_displayed(listOfTitleAnDescriptions, user_content, last_title):
+ def validate_5_last_announcement_displayed(
+ listOfTitleAnDescriptions, user_content, last_title):
last_description = listOfTitleAnDescriptions[
len(listOfTitleAnDescriptions) - 1][1]
Wait.text_by_id(Constants.Toast.CMS_ID, last_title + ".")
@@ -65,11 +68,19 @@ class FECms:
FEUser.logout()
# Validate Announcement TOAST not displayed
FEUser.login(user_content['email'], Constants.Default.Password.TEXT)
- session.run_negative(lambda: Wait.text_by_id(Constants.Cms.Toast_title_id, last_title),
- "Last Announcement displayed in News & Announcements sections %s" % last_title)
+ session.run_negative(
+ lambda: Wait.text_by_id(
+ Constants.Cms.Toast_title_id,
+ last_title),
+ "Last Announcement displayed in News & Announcements sections %s" %
+ last_title)
@staticmethod
- def validate_grandchild_page(parent_title, child_title, grand_child_title, description):
+ def validate_grandchild_page(
+ parent_title,
+ child_title,
+ grand_child_title,
+ description):
Click.id(Constants.Cms.Documentation)
Click.id(parent_title)
Click.id(child_title)
@@ -99,7 +110,10 @@ class FECms:
FEDashboard.open_documentation(title)
Wait.text_by_id(title, title)
logger.debug("Search Documentation by title")
- Enter.text_by_id(Constants.Cms.SearchDocumentation, title, wait_for_page=True)
+ Enter.text_by_id(
+ Constants.Cms.SearchDocumentation,
+ title,
+ wait_for_page=True)
Wait.text_by_id(title, title)
Click.id(title, wait_for_page=True)
Wait.text_by_id(title, title)
@@ -110,23 +124,33 @@ class FECms:
FEDashboard.open_documentation(title)
Wait.text_by_id(title, title)
logger.debug("Search Documentation by content")
- Enter.text_by_id(Constants.Cms.SearchDocumentation, content, wait_for_page=True)
+ Enter.text_by_id(
+ Constants.Cms.SearchDocumentation,
+ content,
+ wait_for_page=True)
Wait.text_by_id(title, title)
Click.id(title, wait_for_page=True)
Wait.text_by_css(Constants.Cms.DocumentationPageContent, content)
logger.debug("Documentation found (searched by content)")
@staticmethod
- def validate_expired_post_Announcement(title, description):
- Wait.text_by_id(Constants.Toast.CMS_ID, title + ".")
+ def validate_expired_post_Announcement(email, title, description):
+ title2 = Constants.Toast.TEXT + title + "."
+ Wait.text_by_id(
+ Constants.Toast.CMS_ID, title2, True)
FEDashboard.open_announcement()
Wait.text_by_id(Constants.Cms.Toast_title_id, title)
Wait.text_by_id(Constants.Cms.Toast_description, description)
DBCMS.update_X_days_back_post(title, xdays=3)
Click.id(Constants.Cms.Test_addDT_close_modal_button)
- FEGeneral.refresh()
- session.run_negative(lambda: Wait.text_by_id(
- Constants.Toast.CMS_ID, title + "."), "Announcement toast not disappear after 2 days %s" % title)
+ FEUser.logout()
+ FEUser.login(email, Constants.Default.Password.TEXT)
+ session.run_negative(
+ lambda: Wait.text_by_id(
+ Constants.Toast.CMS_ID,
+ title2),
+ "Announcement toast not disappear after 2 days %s" %
+ title)
@staticmethod
def validate_page(title, description):
@@ -138,8 +162,8 @@ class FECms:
@staticmethod
def validate_FAQ(description):
- Wait.text_by_id(Constants.Cms.Tooltip_title, "Did you know?")
- Wait.text_by_id(Constants.Cms.Tooltip_description, description)
+ Wait.text_by_id(Constants.Cms.Tooltip_title, "Did you know?", True)
+ Wait.text_by_id(Constants.Cms.Tooltip_description, description, True)
@staticmethod
def validate_news(title, description):
diff --git a/services/frontend/fe_dashboard.py b/services/frontend/fe_dashboard.py
index b8d51b5..cb845cc 100644
--- a/services/frontend/fe_dashboard.py
+++ b/services/frontend/fe_dashboard.py
@@ -113,10 +113,12 @@ class FEDashboard:
Wait.text_by_id("dashboard-title", "Statuses")
Wait.id(Constants.Dashboard.Statuses.FilterDropdown.ID)
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.Statuses.FilterDropdown.ID)).select_by_visible_text("Intake")
+ Constants.Dashboard.Statuses.FilterDropdown.ID
+ )).select_by_visible_text("Intake")
Wait.page_has_loaded()
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.Statuses.FilterDropdown.ID)).select_by_visible_text(stage)
+ Constants.Dashboard.Statuses.FilterDropdown.ID
+ )).select_by_visible_text(stage)
Wait.id(
Constants.Dashboard.Statuses.ExportExcel.ID, wait_for_page=True)
countIdsActive = 0
@@ -159,7 +161,8 @@ class FEDashboard:
Wait.text_by_id("dashboard-title", "Statuses")
Wait.css(Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS)
Select(session.ice_driver.find_element_by_css_selector(
- Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS)).select_by_visible_text("All")
+ Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS
+ )).select_by_visible_text("All")
engLeadID = DBUser.select_user_native_id(user_content['el_email'])
countOfEngInStagePerUser = DBUser.select_all_user_engagements(
engLeadID) # Scroll #
@@ -171,13 +174,17 @@ class FEDashboard:
# Stage Active Validation #
element.location_once_scrolled_into_view
Wait.css(
- Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS, wait_for_page=True)
+ Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS,
+ wait_for_page=True)
Select(session.ice_driver.find_element_by_css_selector(
- Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS)).select_by_visible_text("Active")
+ Constants.Dashboard.Statuses.Statistics.FilterDropdown.CSS
+ )).select_by_visible_text("Active")
countOfEngInStagePerUser = DBUser.select_user_engagements_by_stage(
"Active", engLeadID)
Wait.text_by_id(
- Constants.Dashboard.Statuses.Statistics.EngagementsNumber.ID, str(countOfEngInStagePerUser), wait_for_page=True)
+ Constants.Dashboard.Statuses.Statistics.EngagementsNumber.ID,
+ str(countOfEngInStagePerUser),
+ wait_for_page=True)
@staticmethod
def search_by_vf(user_content):
@@ -186,8 +193,10 @@ class FEDashboard:
engSearchID = "eng-" + engName
FEGeneral.re_open_not_clean_cache(Constants.Default.DashbaordURL.TEXT)
logger.debug("Search engagement by engagement_manual_id")
- Enter.text_by_id(Constants.Dashboard.Statuses.SearchBox.ID,
- user_content['engagement_manual_id'], wait_for_page=True)
+ Enter.text_by_id(
+ Constants.Dashboard.Statuses.SearchBox.ID,
+ user_content['engagement_manual_id'],
+ wait_for_page=True)
eng_manual_id = user_content['engagement_manual_id'] + ":"
Wait.text_by_id(engSearchID, eng_manual_id)
@@ -203,7 +212,11 @@ class FEDashboard:
@staticmethod
def check_vnf_version(user_content):
current_vnf_value = Get.by_css(
- "#progress_bar_" + user_content['engagement_manual_id'] + " ." + Constants.Dashboard.Overview.Progress.VnfVersion.CLASS, wait_for_page=True)
+ "#progress_bar_" +
+ user_content['engagement_manual_id'] +
+ " ." +
+ Constants.Dashboard.Overview.Progress.VnfVersion.CLASS,
+ wait_for_page=True)
Helper.internal_assert(current_vnf_value, user_content['vnf_version'])
@staticmethod
@@ -217,7 +230,8 @@ class FEDashboard:
FEUser.login(user, Constants.Default.Password.TEXT)
logger.debug("Search engagement by engagement_manual_id")
Enter.text_by_id(
- Constants.Dashboard.Statuses.SearchBox.ID, user_content['engagement_manual_id'])
+ Constants.Dashboard.Statuses.SearchBox.ID,
+ user_content['engagement_manual_id'])
eng_manual_id = user_content['engagement_manual_id'] + ":"
Wait.text_by_id(engSearchID, eng_manual_id)
logger.debug("Engagement found (searched by engagement_manual_id)")
@@ -225,7 +239,8 @@ class FEDashboard:
logger.debug("Search engagement by VF name")
# Search by VF name.
Enter.text_by_id(
- Constants.Dashboard.Statuses.SearchBox.ID, user_content['vfName'])
+ Constants.Dashboard.Statuses.SearchBox.ID,
+ user_content['vfName'])
Wait.text_by_id(engSearchID, eng_manual_id)
logger.debug("Engagement found (searched by VF name)")
FEGeneral.smart_refresh()
@@ -254,8 +269,10 @@ class FEDashboard:
@staticmethod
def check_if_creator_of_NS_is_the_EL(user_content):
logger.debug(
- " > Check if creator of NS is the EL " + user_content['el_name'])
- if (user_content['el_name'] not in Get.by_name("creator-full-name-" + user_content['el_name'])):
+ " > Check if creator of NS is the EL " +
+ user_content['el_name'])
+ if (user_content['el_name'] not in Get.by_name(
+ "creator-full-name-" + user_content['el_name'])):
logger.error("EL is not the creator of the NS according to UI.")
raise
@@ -264,7 +281,9 @@ class FEDashboard:
engName = engagement_manual_id + ": " + vf_name
# Search by VF name.
Enter.text_by_id(
- Constants.Dashboard.Statuses.SearchBox.ID, vf_name, wait_for_page=True)
+ Constants.Dashboard.Statuses.SearchBox.ID,
+ vf_name,
+ wait_for_page=True)
Wait.id("eng-" + engName, wait_for_page=True)
Click.id("eng-" + engName, wait_for_page=True)
Wait.text_by_id(
@@ -279,8 +298,10 @@ class FEDashboard:
# Click.id(Constants.Dashboard.Default.DASHBOARD_ID)
Wait.page_has_loaded()
if is_negative:
- session.run_negative(lambda: Wait.id(
- Constants.Dashboard.Default.STATISTICS), "Negative test failed at Statistics appears")
+ session.run_negative(
+ lambda: Wait.id(
+ Constants.Dashboard.Default.STATISTICS),
+ "Negative test failed at Statistics appears")
else:
Wait.id(Constants.Dashboard.Default.STATISTICS)
diff --git a/services/frontend/fe_detailed_view.py b/services/frontend/fe_detailed_view.py
index 556e7a7..49a3523 100644
--- a/services/frontend/fe_detailed_view.py
+++ b/services/frontend/fe_detailed_view.py
@@ -70,44 +70,61 @@ class FEDetailedView:
@staticmethod
def update_aic_version():
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID,
- Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True)
- Select(session.ice_driver.find_element_by_id(Constants.Dashboard.DetailedView.AIC.Dropdown.ID)
- ).select_by_visible_text(Constants.Dashboard.DetailedView.ValidationDetails.TargetAICVersion.AIC3)
+ Constants.Dashboard.DetailedView.ValidationDetails.PLUS,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Modal.TITLE_ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.TITLE,
+ wait_for_page=True)
+ Select(
+ session.ice_driver.find_element_by_id(
+ Constants.Dashboard.DetailedView.AIC.Dropdown.ID
+ )).select_by_visible_text(
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ TargetAICVersion.AIC3)
Click.xpath("//option[3]", wait_for_page=True)
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.SAVE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ValidationDetails.SAVE,
+ wait_for_page=True)
@staticmethod
def open_validation_details():
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID,
- Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ValidationDetails.PLUS,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Modal.TITLE_ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.TITLE,
+ wait_for_page=True)
@staticmethod
def save_validation_details():
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.SAVE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ValidationDetails.SAVE,
+ wait_for_page=True)
@staticmethod
def update_target_lab_entry():
Click.id(
- Constants.Dashboard.DetailedView.TargetLabEntry.CHANGE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.TargetLabEntry.CHANGE,
+ wait_for_page=True)
Enter.date_picker(
'#lab-entry-date', 'vm.targetLabDate', wait_for_page=True)
Click.css(
- Constants.Dashboard.DetailedView.TargetLabEntry.INPUT_CSS, wait_for_page=True)
+ Constants.Dashboard.DetailedView.TargetLabEntry.INPUT_CSS,
+ wait_for_page=True)
Click.css(Constants.SubmitButton.CSS, wait_for_page=True)
actualDate = Get.by_css(
- Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS, wait_for_page=True)
+ Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS,
+ wait_for_page=True)
return str(actualDate)
@staticmethod
def validate_target_lab_entry(date):
- Wait.text_by_css(Constants.Dashboard.DetailedView.TargetLabEntry.CSS,
- Constants.Dashboard.DetailedView.TargetLabEntry.TEXT, wait_for_page=True)
+ Wait.text_by_css(
+ Constants.Dashboard.DetailedView.TargetLabEntry.CSS,
+ Constants.Dashboard.DetailedView.TargetLabEntry.TEXT,
+ wait_for_page=True)
actualDate = Get.by_css(
Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS)
Helper.internal_assert(actualDate, date)
@@ -117,27 +134,47 @@ class FEDetailedView:
count = 0
try:
Click.id(Constants.Dashboard.DetailedView.ValidationDetails.PLUS)
- Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID,
- Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Modal.TITLE_ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.TITLE,
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID,
+ wait_for_page=True)
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID)).select_by_visible_text(EcompName)
- Click.id(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID_ECOMP +
- EcompName, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID
+ )).select_by_visible_text(EcompName)
+ Click.id(
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ ECOMPRelease.ID_ECOMP +
+ EcompName,
+ wait_for_page=True)
count += 1
- Wait.id(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID_ECOMP +
- Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.UNKNOW, wait_for_page=True)
- Select(session.ice_driver.find_element_by_id(Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID)
- ).select_by_visible_text(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.UNKNOW)
- Click.id(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID_ECOMP +
- Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.UNKNOW, wait_for_page=True)
+ Wait.id(
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ ECOMPRelease.ID_ECOMP +
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ ECOMPRelease.UNKNOW,
+ wait_for_page=True)
+ Select(
+ session.ice_driver.find_element_by_id(
+ Constants.Dashboard.DetailedView.ECOMP.Dropdown.ID
+ )).select_by_visible_text(
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ ECOMPRelease.UNKNOW)
+ Click.id(
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ ECOMPRelease.ID_ECOMP +
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ ECOMPRelease.UNKNOW,
+ wait_for_page=True)
count += 1
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.SAVE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ValidationDetails.SAVE,
+ wait_for_page=True)
Helper.internal_assert(count, 2)
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "Failed in update_ecomp_release ."
raise Exception(errorMsg)
@@ -145,20 +182,28 @@ class FEDetailedView:
def update_vf_version():
try:
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID,
- Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ValidationDetails.PLUS,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Modal.TITLE_ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.TITLE,
+ wait_for_page=True)
newVFVersionName = "newVFVersionName-" + \
Helper.rand_string("randomString")
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.ID_VERSION)
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ VFVersion.ID_VERSION)
Enter.text_by_id(
- Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.ID_VERSION, newVFVersionName, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ VFVersion.ID_VERSION,
+ newVFVersionName,
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.SAVE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ValidationDetails.SAVE,
+ wait_for_page=True)
return newVFVersionName
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "Failed in update_ecomp_release ."
raise Exception(errorMsg)
@@ -166,68 +211,100 @@ class FEDetailedView:
def validate_aic_version():
FEGeneral.refresh()
Wait.id(
- Constants.Dashboard.DetailedView.AIC.ID + "3.0", wait_for_page=True)
+ Constants.Dashboard.DetailedView.AIC.ID +
+ "3.0",
+ wait_for_page=True)
@staticmethod
def validate_ecomp_version():
FEGeneral.refresh()
- Wait.id(Constants.Dashboard.DetailedView.ECOMP.ID +
- Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.UNKNOW, wait_for_page=True)
+ Wait.id(
+ Constants.Dashboard.DetailedView.ECOMP.ID +
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ ECOMPRelease.UNKNOW,
+ wait_for_page=True)
@staticmethod
def validate_vf_version(newVFVersionName):
FEGeneral.refresh()
- Wait.id(Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.VF_VERSION_ID +
- newVFVersionName, wait_for_page=True)
+ Wait.id(
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ VFVersion.VF_VERSION_ID +
+ newVFVersionName,
+ wait_for_page=True)
@staticmethod
def validate_all_titles_on_dv_form():
- Wait.text_by_id(Constants.Dashboard.DetailedView.DeploymentTarget.ID,
- Constants.Dashboard.DetailedView.DeploymentTarget.TEXT, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.DetailedView.VirtualFunctionComponents.ID,
- Constants.Dashboard.DetailedView.VirtualFunctionComponents.TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.DetailedView.DeploymentTarget.ID,
+ Constants.Dashboard.DetailedView.DeploymentTarget.TEXT,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.DetailedView.VirtualFunctionComponents.ID,
+ Constants.Dashboard.DetailedView.VirtualFunctionComponents.TEXT)
Wait.text_by_id(Constants.Dashboard.DetailedView.TargetLabEntry.ID,
Constants.Dashboard.DetailedView.TargetLabEntry.TEXT)
- Wait.text_by_id(Constants.Dashboard.DetailedView.ValidationDetails.ID,
- Constants.Dashboard.DetailedView.ValidationDetails.TEXT)
- Wait.text_by_id(Constants.Dashboard.DetailedView.ValidationDetails.TargetAICVersion.ID,
- Constants.Dashboard.DetailedView.ValidationDetails.TargetAICVersion.TEXT)
- Wait.text_by_id(Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID,
- Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.TEXT)
- Wait.text_by_id(Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.ID,
- Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.TEXT, wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.DetailedView.ValidationDetails.ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ TargetAICVersion.ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ TargetAICVersion.TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.DetailedView.ValidationDetails.ECOMPRelease.ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.
+ ECOMPRelease.TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.VFVersion.TEXT,
+ wait_for_page=True)
@staticmethod
def add_deployment_target(user_content):
Click.id(Constants.Dashboard.DetailedView.TargetLabEntry.Add.ID)
Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID,
- Constants.Dashboard.DetailedView.DeploymentTarget.TITLE)
+ Constants.Dashboard.DetailedView.DeploymentTarget.
+ TITLE)
# FIXME: empty drop-down, tests will fail.
Select(session.ice_driver.find_element_by_xpath(
"//select")).select_by_visible_text("Lisle (DPA3)")
Click.id(
- Constants.Dashboard.DetailedView.DeploymentTarget.SAVE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.DeploymentTarget.SAVE,
+ wait_for_page=True)
Wait.text_by_css(
- Constants.Dashboard.DetailedView.DeploymentTarget.CSS, "Lisle (DPA3)", wait_for_page=True)
+ Constants.Dashboard.DetailedView.DeploymentTarget.CSS,
+ "Lisle (DPA3)",
+ wait_for_page=True)
Wait.text_by_id(Constants.Dashboard.DetailedView.AIC.ID +
user_content['target_aic'], user_content['target_aic'])
e2edate = FEGeneral.date_short_formatter()
Wait.text_by_css(
- Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS, e2edate)
+ Constants.Dashboard.DetailedView.TargetLabEntry.CONTENT_CSS,
+ e2edate)
@staticmethod
def remove_deployment_target(user_content):
Wait.text_by_id(
"visible-dts-Lisle (DPA3)", "Lisle (DPA3)", wait_for_page=True)
dt_site_id = DBGeneral.select_query(
- "SELECT uuid FROM public.ice_deployment_target_site where name = 'Lisle (DPA3)'")
+ "SELECT uuid FROM public.ice_deployment_target_site where name" +
+ " = 'Lisle (DPA3)'")
Click.id("visible-dts-Lisle (DPA3)")
Wait.id(
- Constants.Dashboard.DetailedView.DeploymentTarget.ID_REMOVE_DTS + dt_site_id)
- Click.id(Constants.Dashboard.DetailedView.DeploymentTarget.ID_REMOVE_DTS +
- dt_site_id, wait_for_page=True)
- session.run_negative(lambda: Wait.text_by_id(
- "visible-dts-Lisle (DPA3)", "Lisle (DPA3)", wait_for_page=True), "Negative test failed at wait text Lisle (DPA3)")
+ Constants.Dashboard.DetailedView.DeploymentTarget.ID_REMOVE_DTS +
+ dt_site_id)
+ Click.id(
+ Constants.Dashboard.DetailedView.DeploymentTarget.ID_REMOVE_DTS +
+ dt_site_id,
+ wait_for_page=True)
+ session.run_negative(
+ lambda: Wait.text_by_id(
+ "visible-dts-Lisle (DPA3)",
+ "Lisle (DPA3)",
+ wait_for_page=True),
+ "Negative test failed at wait text Lisle (DPA3)")
@staticmethod
def add_vfc():
@@ -237,7 +314,8 @@ class FEDetailedView:
session.ice_driver.find_element_by_name("extRefID").click()
Enter.text_by_name("extRefID", Helper.rand_string("randomNumber"))
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text(ServiceProvider.MainServiceProvider)
+ Constants.Dashboard.DetailedView.VFC.Choose_Company.ID
+ )).select_by_visible_text(ServiceProvider.MainServiceProvider)
Click.id(Constants.Dashboard.DetailedView.VFC.Save_button.ID)
return vfcName
@@ -251,7 +329,8 @@ class FEDetailedView:
Click.name("extRefID", wait_for_page=True)
Enter.text_by_name("extRefID", extRefID, wait_for_page=True)
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text("Amdocs")
+ Constants.Dashboard.DetailedView.VFC.Choose_Company.ID
+ )).select_by_visible_text("Amdocs")
Wait.text_by_css("span.add-text", "Add VFC", wait_for_page=True)
Click.css("span.add-text", wait_for_page=True)
logger.debug("Add VFC no.2")
@@ -260,7 +339,8 @@ class FEDetailedView:
Enter.text_by_xpath("//div[2]/ng-form/div[2]/input", "loka2")
Enter.text_by_xpath("//div[2]/ng-form/div[4]/input", "companyManual2")
Click.id(
- Constants.Dashboard.DetailedView.VFC.Save_button.ID, wait_for_page=True)
+ Constants.Dashboard.DetailedView.VFC.Save_button.ID,
+ wait_for_page=True)
@staticmethod
def remove_vfc(user_content):
@@ -268,7 +348,8 @@ class FEDetailedView:
"uuid", "ice_vf", "name", user_content['vfName'], 1)
djoni_uuid = None
counter = 0
- while not djoni_uuid and counter <= Constants.DBConstants.RETRIES_NUMBER:
+ while not djoni_uuid and counter <= Constants.DBConstants.\
+ RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause_long)
djoni_uuid = DBGeneral.select_where_and(
"uuid", "ice_vfc", "vf_id", vf_id, "name", "djoni", 1)
@@ -282,7 +363,9 @@ class FEDetailedView:
Wait.text_by_id(Constants.Dashboard.DetailedView.VFC.ID +
"djoni2", "djoni2 (loka2)", wait_for_page=True)
Click.id(
- Constants.Dashboard.DetailedView.VFC.ID + "djoni", wait_for_page=True)
+ Constants.Dashboard.DetailedView.VFC.ID +
+ "djoni",
+ wait_for_page=True)
Click.id(Constants.Dashboard.DetailedView.VFC.Remove.ID +
djoni_uuid, wait_for_page=True)
@@ -295,7 +378,8 @@ class FEDetailedView:
FEDetailedView.search_vf_and_go_to_detailed_view(
user_content['engagement_manual_id'], user_content['vfName'])
Wait.id(
- Constants.Dashboard.DetailedView.DeploymentTarget.AddDeploymentTargetButton.ID)
+ Constants.Dashboard.DetailedView.DeploymentTarget.
+ AddDeploymentTargetButton.ID)
@staticmethod
def add_remove_deployment_targets(user_content, users):
@@ -316,44 +400,72 @@ class FEDetailedView:
FEUser.login(user, Constants.Default.Password.TEXT)
FEDetailedView.search_vf_and_go_to_detailed_view(
user_content['engagement_manual_id'], user_content['vfName'])
- session.run_negative(lambda: Click.id(Constants.Dashboard.DetailedView.DeploymentTarget.AddDeploymentTargetButton.ID),
- "Negative test failed at click_on_ deployment-targets with user %s" % user)
+ session.run_negative(
+ lambda: Click.id(
+ Constants.Dashboard.DetailedView.DeploymentTarget.
+ AddDeploymentTargetButton.ID),
+ "Negative test failed at click_on_ deployment-targets " +
+ "with user %s" % user)
@staticmethod
def click_on_update_aic_version():
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID,
- Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ValidationDetails.PLUS,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Modal.TITLE_ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.TITLE,
+ wait_for_page=True)
@staticmethod
def click_on_update_ecomp_release():
Click.id(
- Constants.Dashboard.DetailedView.ValidationDetails.PLUS, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.Modal.TITLE_ID,
- Constants.Dashboard.DetailedView.ValidationDetails.TITLE, wait_for_page=True)
+ Constants.Dashboard.DetailedView.ValidationDetails.PLUS,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Modal.TITLE_ID,
+ Constants.Dashboard.DetailedView.ValidationDetails.TITLE,
+ wait_for_page=True)
@staticmethod
def select_aic_version_from_list(aic_version):
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.DetailedView.AIC.Dropdown.ID)).select_by_visible_text(aic_version)
+ Constants.Dashboard.DetailedView.AIC.Dropdown.ID
+ )).select_by_visible_text(aic_version)
@staticmethod
def compare_aic_selected_version(expected_aic_version):
- Helper.internal_assert(Get.by_id(
- Constants.Dashboard.DetailedView.AIC.ID + expected_aic_version), expected_aic_version)
+ Helper.internal_assert(
+ Get.by_id(
+ Constants.Dashboard.DetailedView.AIC.ID +
+ expected_aic_version),
+ expected_aic_version)
@staticmethod
def compare_selected_ecomp_release(expected_ecomp_release):
- Helper.internal_assert(Get.by_id(
- Constants.Dashboard.DetailedView.ECOMP.ID + expected_ecomp_release), expected_ecomp_release)
+ Helper.internal_assert(
+ Get.by_id(
+ Constants.Dashboard.DetailedView.ECOMP.ID +
+ expected_ecomp_release),
+ expected_ecomp_release)
@staticmethod
def validate_deprecated_aic_version_in_dropdown(expected_aic_version):
- Helper.internal_assert(Get.by_id(Constants.Dashboard.DetailedView.AIC.Dropdown.UniversalVersion.ID %
- expected_aic_version), "AIC " + expected_aic_version + " - Deprecated")
+ Helper.internal_assert(
+ Get.by_id(
+ Constants.Dashboard.DetailedView.AIC.Dropdown.
+ UniversalVersion.ID %
+ expected_aic_version),
+ "AIC " +
+ expected_aic_version +
+ " - Deprecated")
@staticmethod
def validate_deprecated_ecomp_release_in_dropdown(expected_ecomp_release):
- Helper.internal_assert(Get.by_id(Constants.Dashboard.DetailedView.ECOMP.Dropdown.UniversalRelease.ID %
- expected_ecomp_release), expected_ecomp_release + " - Deprecated")
+ Helper.internal_assert(
+ Get.by_id(
+ Constants.Dashboard.DetailedView.ECOMP.Dropdown.
+ UniversalRelease.ID %
+ expected_ecomp_release),
+ expected_ecomp_release +
+ " - Deprecated")
diff --git a/services/frontend/fe_general.py b/services/frontend/fe_general.py
index b3f97b4..10f9c72 100644
--- a/services/frontend/fe_general.py
+++ b/services/frontend/fe_general.py
@@ -39,7 +39,6 @@
import json
import time
-from django.conf import settings
from selenium.webdriver.support.select import Select
from services.constants import Constants
@@ -87,7 +86,7 @@ class FEGeneral(Helper):
session.ice_driver.get(reopen_url)
session.ice_driver.maximize_window()
Wait.page_has_loaded()
- except Exception as e:
+ except Exception:
errorMsg = "Could not reopen requested page"
raise Exception(errorMsg, reopen_url)
@@ -97,7 +96,7 @@ class FEGeneral(Helper):
# Open FireFox with requested URL.
session.ice_driver.get(url)
session.ice_driver.maximize_window()
- except:
+ except BaseException:
errorMsg = "Could not reopen requested page"
raise Exception(errorMsg, url)
logger.debug("Moving to next test case")
@@ -139,7 +138,9 @@ class FEGeneral(Helper):
def go_to_signup_from_login():
Click.link_text(Constants.Login.Signup.LINK_TEXT, wait_for_page=True)
Wait.text_by_css(
- Constants.Signup.Title.CSS, Constants.Signup.Title.TEXT, wait_for_page=True)
+ Constants.Signup.Title.CSS,
+ Constants.Signup.Title.TEXT,
+ wait_for_page=True)
@staticmethod
def form_enter_name(name):
@@ -213,7 +214,8 @@ class FEGeneral(Helper):
def send_reset_password(email):
FEGeneral.go_to_reset_password_from_login()
Wait.text_by_css(
- Constants.ResetPassword.Title.CSS, Constants.ResetPassword.Title.TEXT)
+ Constants.ResetPassword.Title.CSS,
+ Constants.ResetPassword.Title.TEXT)
Enter.text_by_name(Constants.ResetPassword.Email.NAME, email)
Wait.text_by_css(
Constants.SubmitButton.CSS, Constants.ResetPassword.Button.TEXT)
@@ -275,4 +277,5 @@ class FEGeneral(Helper):
Helper.assertTrue(
False, "%s does not exist over the client's side" % item)
logger.debug(
- "verify_existing_files_in_list succeeded, All vf repo files are available for choosing.")
+ "verify_existing_files_in_list succeeded, " +
+ "All vf repo files are available for choosing.")
diff --git a/services/frontend/fe_invite.py b/services/frontend/fe_invite.py
index cb84262..061df73 100644
--- a/services/frontend/fe_invite.py
+++ b/services/frontend/fe_invite.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -36,12 +36,8 @@
# ============LICENSE_END============================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-from asyncio.tasks import wait
-
-from selenium.webdriver.support.select import Select
from services.api.api_user import APIUser
-from services.api.api_virtual_function import APIVirtualFunction
from services.constants import Constants, ServiceProvider
from services.database.db_general import DBGeneral
from services.database.db_user import DBUser
@@ -59,6 +55,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEInvite:
@staticmethod
@@ -69,14 +66,21 @@ class FEInvite:
Click.id(vf_left_nav_id)
FEWizard.invite_team_members_modal(user_content[1]['email'])
# self.sleep(1) # TODO need to wait until modal window is closed.
- invitation_token = DBUser.select_invitation_token("invitation_token", "ice_invitation", "engagement_uuid",
- user_content[0]['engagement_uuid'], user_content[1]['email'], 1)
+ invitation_token = DBUser.select_invitation_token(
+ "invitation_token",
+ "ice_invitation",
+ "engagement_uuid",
+ user_content[0]['engagement_uuid'],
+ user_content[1]['email'],
+ 1)
inviterURL = Constants.Default.InviteURL.Login.TEXT + invitation_token
FEGeneral.re_open(inviterURL)
# Login with 2nd user #
title_id = "title-id-" + engName
FEUser.login(
- user_content[1]['email'], Constants.Default.Password.TEXT, title_id)
+ user_content[1]['email'],
+ Constants.Default.Password.TEXT,
+ title_id)
Click.id(vf_left_nav_id)
actualVfName = Get.by_id(vf_left_nav_id)
Helper.internal_assert(engName, actualVfName)
@@ -103,17 +107,23 @@ class FEInvite:
Click.id(vf_left_nav_id)
Click.id(Constants.Dashboard.Overview.TeamMember.ID)
Wait.text_by_css(Constants.Dashboard.Wizard.Title.CSS,
- Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT)
+ Constants.Dashboard.Wizard.InviteTeamMembers.
+ Title.TEXT)
Enter.text_by_name("email", user_content[1]['email'])
- Wait.text_by_css(Constants.SubmitButton.CSS,
- Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT)
+ Wait.text_by_css(
+ Constants.SubmitButton.CSS,
+ Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT)
Click.css(Constants.SubmitButton.CSS)
Wait.id(Constants.Toast.ID)
Helper.internal_assert(
Get.by_id(Constants.Toast.ID), "Invite couldn't be created")
@staticmethod
- def invite_x_users_from_tm(list_of_invite_emails, countofUser, countOfem, num):
+ def invite_x_users_from_tm(
+ list_of_invite_emails,
+ countofUser,
+ countOfem,
+ num):
Enter.text_by_name(
"email", list_of_invite_emails[countofUser], wait_for_page=True)
for _ in range(num):
@@ -121,13 +131,14 @@ class FEInvite:
session.run_negative(
lambda: Click.css("span.add-icon"), "css appears")
break
- except: # button exists
+ except BaseException: # button exists
pass
countofUser += 1
# Click.css("span.add-icon")
Wait.xpath("//fieldset[" + str(countOfem) + "]/div/input")
Enter.text_by_xpath(
- "//fieldset[" + str(countOfem) + "]/div/input", list_of_invite_emails[countofUser])
+ "//fieldset[" + str(countOfem) + "]/div/input",
+ list_of_invite_emails[countofUser])
countOfem += 1
Click.css(Constants.SubmitButton.CSS, wait_for_page=True)
@@ -160,11 +171,23 @@ class FEInvite:
sponsor = [ServiceProvider.MainServiceProvider, 'aaaaaa', inviteEmail,
'3058000000']
invitation_token = DBUser.select_invitation_token(
- "invitation_token", "ice_invitation", "engagement_uuid", engagement_id, inviteEmail, 1)
+ "invitation_token",
+ "ice_invitation",
+ "engagement_uuid",
+ engagement_id,
+ inviteEmail,
+ 1)
signUpURLforContact = DBUser.get_contact_signup_url(
- invitation_token, uuid, sponsor[2], sponsor[1], sponsor[3], sponsor[0])
+ invitation_token, uuid, sponsor[2], sponsor[1],
+ sponsor[3], sponsor[0])
APIUser.signup_invited_user(
- sponsor[0], inviteEmail, invitation_token, signUpURLforContact, user_content, True, wait_for_gitlab=False)
+ sponsor[0],
+ inviteEmail,
+ invitation_token,
+ signUpURLforContact,
+ user_content,
+ True,
+ wait_for_gitlab=False)
activationUrl2 = DBUser.get_activation_url(sponsor[2])
FEGeneral.re_open(activationUrl2) # Login with 2nd user #
engName = engagement_manual_id + ": " + vflist[0]
@@ -175,22 +198,24 @@ class FEInvite:
engagement_id = DBGeneral.select_where(
"engagement_id", "ice_vf", "name", vfName, 1)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
engName = engagement_manual_id + ": " + vfName
vf_left_nav_id = "clickable-" + engName
Click.id(vf_left_nav_id, wait_for_page=True)
@staticmethod
def invite_x_users_and_verify_VF_appers_for_invited(user_content, engName):
- inviteEmail = Helper.rand_string('randomString') + "@" \
- + ServiceProvider.email
+ inviteEmail = Helper.rand_string(
+ 'randomString') + "@" + ServiceProvider.email
vflist = FEInvite.create_x_vfs(user_content, engName, x=3)
for vfName in vflist:
# Fetch one AT&T user ID.
engagement_id = DBGeneral.select_where(
"engagement_id", "ice_vf", "name", vfName, 1)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
engName = engagement_manual_id + ": " + vfName
vf_left_nav_id = "clickable-" + engName
Click.id(vf_left_nav_id)
diff --git a/services/frontend/fe_next_step.py b/services/frontend/fe_next_step.py
index be59949..b10735d 100644
--- a/services/frontend/fe_next_step.py
+++ b/services/frontend/fe_next_step.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -50,7 +50,9 @@ class FENextStep(object):
def check_select_deselect_all_files():
Click.id(Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.ID)
Click.link_text(
- Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.SELECT_ALL_FILES_NAME)
+ Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.
+ SELECT_ALL_FILES_NAME)
Wait.text_by_id(
Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.ID,
- Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.ALL_FILES_SELECTED)
+ Constants.Dashboard.Overview.NextSteps.Add.AssociatedFiles.
+ ALL_FILES_SELECTED)
diff --git a/services/frontend/fe_overview.py b/services/frontend/fe_overview.py
index 3764e2c..15f8725 100644
--- a/services/frontend/fe_overview.py
+++ b/services/frontend/fe_overview.py
@@ -65,8 +65,10 @@ class FEOverview:
def click_on_vf(user_content):
vfFullName = user_content[
'engagement_manual_id'] + ": " + user_content['vfName']
- Enter.text_by_id(Constants.Dashboard.LeftPanel.SearchBox.ID, user_content[
- 'vfName'], True)
+ Enter.text_by_id(
+ Constants.Dashboard.LeftPanel.SearchBox.ID,
+ user_content['vfName'],
+ True)
Click.id(Constants.Dashboard.LeftPanel.SearchBox.Results.ID %
user_content['vfName'], True)
Wait.text_by_id(
@@ -78,22 +80,28 @@ class FEOverview:
"Go to engagement's overview by clicking on the created Next Step")
Click.name(user_content['engagement_manual_id'], wait_for_page=True)
Wait.text_by_id(
- Constants.Dashboard.Overview.Title.ID, user_content['engagement_manual_id'] + ":", wait_for_page=True)
+ Constants.Dashboard.Overview.Title.ID,
+ user_content['engagement_manual_id'] + ":",
+ wait_for_page=True)
FEGeneral.re_open(Constants.Default.LoginURL.TEXT)
logger.debug("Login with EL user " + user_content['el_email'])
FEUser.login(user_content['el_email'], Constants.Default.Password.TEXT)
# Query to select all assigned next steps on TODO state #
el_native_id = str(DBGeneral.select_where(
"id", "ice_user_profile", "email", user_content['el_email'], 1))
- queryStr = "SELECT count(*) FROM ice_user_profile AS users, ice_next_step_assignees AS assignees, ice_next_step AS ns WHERE users.id=" + \
+ queryStr = "SELECT count(*) FROM ice_user_profile AS users, " +\
+ "ice_next_step_assignees AS assignees, " +\
+ "ice_next_step AS ns WHERE users.id=" + \
el_native_id + \
- " AND users.id=assignees.iceuserprofile_id AND assignees.nextstep_id=ns.uuid AND ns.state='Incomplete';"
+ " AND users.id=assignees.iceuserprofile_id " +\
+ "AND assignees.nextstep_id=ns.uuid AND ns.state='Incomplete';"
el_assigned_ns = str(DBGeneral.select_query(queryStr))
logger.debug("el_assigned_ns=" + el_assigned_ns)
Wait.page_has_loaded()
if (int(el_assigned_ns) >= 5):
logger.debug(
- "EL has 5 or more assigned next steps, checking that only 5 are shown")
+ "EL has 5 or more assigned next steps, " +
+ "checking that only 5 are shown")
ns_list = Get.by_id("next-steps-list")
if (ns_list.count("Engagement - ") > 5):
logger.error("More than 5 next steps are listed in dashboard.")
@@ -119,15 +127,25 @@ class FEOverview:
@staticmethod
def check_stage_next_steps(stage, engagement_uuid):
- ns_list = DBGeneral.select_where_and("description", "ice_next_step",
- "engagement_id", engagement_uuid,
- "engagement_stage", stage, 0) # List of next steps from DB.
+ ns_list = DBGeneral.select_where_and(
+ "description",
+ "ice_next_step",
+ "engagement_id",
+ engagement_uuid,
+ "engagement_stage",
+ stage,
+ 0) # List of next steps from DB.
logger.debug("Got list of Next Steps for current stage " + stage)
for i in range(len(ns_list)):
ns_description = ns_list[i] # Value number i from the list.
- ns_uuid = DBGeneral.select_where_and("uuid", "ice_next_step",
- "engagement_id", engagement_uuid,
- "description", ns_description, 1)
+ ns_uuid = DBGeneral.select_where_and(
+ "uuid",
+ "ice_next_step",
+ "engagement_id",
+ engagement_uuid,
+ "description",
+ ns_description,
+ 1)
logger.debug(
"Compare presented text of next step with the text from DB.")
portal_ns = Get.by_id("step-" + ns_uuid)
@@ -146,10 +164,14 @@ class FEOverview:
lambda: Wait.id(txtLine2ID), "Error: modal window opened.")
else:
Wait.text_by_id(
- txtLine2ID, "Are you sure you want to set the Engagement's stage to " + next_stage + "?")
+ txtLine2ID,
+ "Are you sure you want to set the Engagement's stage to " +
+ next_stage +
+ "?")
# Click on Approve (after validations inside window).
Click.xpath(
- Constants.Dashboard.Overview.Stage.Approve.XPATH, wait_for_page=True)
+ Constants.Dashboard.Overview.Stage.Approve.XPATH,
+ wait_for_page=True)
@staticmethod
def check_progress(expected_progress):
@@ -166,12 +188,14 @@ class FEOverview:
@staticmethod
def set_progress(new_value):
Click.id(Constants.Dashboard.Overview.Progress.Change.ID)
- Helper.internal_assert(Constants.Dashboard.Overview.Progress.Wizard.Title.TEXT,
- Get.by_id(Constants.Dashboard.Modal.TITLE_ID))
+ Helper.internal_assert(
+ Constants.Dashboard.Overview.Progress.Wizard.Title.TEXT, Get.by_id(
+ Constants.Dashboard.Modal.TITLE_ID))
Enter.text_by_name(
Constants.Dashboard.Overview.Progress.Wizard.NAME, new_value)
- Wait.text_by_css(Constants.SubmitButton.CSS,
- Constants.Dashboard.Overview.Progress.Wizard.Button.TEXT)
+ Wait.text_by_css(
+ Constants.SubmitButton.CSS,
+ Constants.Dashboard.Overview.Progress.Wizard.Button.TEXT)
Click.css(Constants.SubmitButton.CSS)
Wait.modal_to_dissappear()
@@ -182,7 +206,8 @@ class FEOverview:
Wait.text_by_id(
Constants.Dashboard.GeneralPrompt.Title.ID, "Delete Step")
Click.id(
- Constants.Dashboard.GeneralPrompt.ApproveButton.ID, wait_for_page=True)
+ Constants.Dashboard.GeneralPrompt.ApproveButton.ID,
+ wait_for_page=True)
Wait.id_to_dissappear("test_" + next_step_uuid)
@staticmethod
@@ -194,56 +219,83 @@ class FEOverview:
def click_on_archeive_engagement_from_dropdown():
FEOverview.click_on_admin_dropdown()
Click.link_text(
- Constants.Dashboard.Overview.AdminDropdown.ArchiveEngagement.LINK_TEXT, wait_for_page=True)
+ Constants.Dashboard.Overview.AdminDropdown.
+ ArchiveEngagement.LINK_TEXT,
+ wait_for_page=True)
@staticmethod
def archive_engagement_modal(engagement_manual_id, vf_name):
- Wait.text_by_id(Constants.Dashboard.Overview.AdminDropdown.ArchiveEngagement.Wizard.Title.ID,
- Constants.Dashboard.Overview.AdminDropdown.ArchiveEngagement.Wizard.Title.TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.Overview.AdminDropdown.
+ ArchiveEngagement.Wizard.Title.ID,
+ Constants.Dashboard.Overview.AdminDropdown.
+ ArchiveEngagement.Wizard.Title.TEXT)
random_reason = Helper.rand_string()
- Enter.text_by_name(Constants.Dashboard.Overview.AdminDropdown.ArchiveEngagement.Wizard.Reason.NAME,
- random_reason)
+ Enter.text_by_name(
+ Constants.Dashboard.Overview.AdminDropdown.
+ ArchiveEngagement.Wizard.Reason.NAME,
+ random_reason)
Click.id(Constants.SubmitButton.ID)
- Wait.text_by_id(Constants.Toast.ID, "Engagement '%s: %s' archived successfully." %
- (engagement_manual_id, vf_name))
- query = "select archived_time,archive_reason from ice_engagement where engagement_manual_id='{engagement_manual_id}'".format(
- engagement_manual_id=engagement_manual_id)
+ Wait.text_by_id(
+ Constants.Toast.ID,
+ "Engagement '%s: %s' archived successfully." % (
+ engagement_manual_id, vf_name))
+ query = "select archived_time,archive_reason from " +\
+ "ice_engagement where engagement_manual_id" +\
+ "='{engagement_manual_id}'".format(
+ engagement_manual_id=engagement_manual_id)
archived_time, db_reason = DBGeneral.select_query(query, "list")
- Helper.assertTrue(archived_time != None)
+ Helper.assertTrue(archived_time is not None)
Helper.internal_assert(random_reason, db_reason)
@staticmethod
def click_on_change_reviewer_from_dropdown():
FEOverview.click_on_admin_dropdown()
Click.link_text(
- Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.LINK_TEXT)
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangeReviewer.LINK_TEXT)
@staticmethod
def select_engagement_lead_from_list(el_name):
Wait.name(
- Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Wizard.Select.NAME, wait_for_page=True)
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangeReviewer.Wizard.Select.NAME,
+ wait_for_page=True)
Select(session.ice_driver.find_element_by_name(
- Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Wizard.Select.NAME)).select_by_visible_text(el_name)
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangeReviewer.Wizard.Select.NAME)).\
+ select_by_visible_text(el_name)
@staticmethod
def change_engagement_lead_modal(el_name, is_reviewer=True):
- Wait.text_by_id(Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Wizard.Title.ID,
- Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Wizard.Title.TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangeReviewer.Wizard.Title.ID,
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangeReviewer.Wizard.Title.TEXT)
FEOverview.select_engagement_lead_from_list(el_name)
if is_reviewer:
Wait.text_by_id(
- Constants.Toast.ID, Constants.Dashboard.Overview.AdminDropdown.ChangeReviewer.Toast.TEXT)
+ Constants.Toast.ID,
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangeReviewer.Toast.TEXT)
else:
Wait.text_by_id(
- Constants.Toast.ID, Constants.Dashboard.Overview.AdminDropdown.ChangePeerReviewer.Toast.TEXT)
+ Constants.Toast.ID,
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangePeerReviewer.Toast.TEXT)
@staticmethod
def click_on_change_peer_reviewer_from_dropdown():
FEOverview.click_on_admin_dropdown()
Click.link_text(
- Constants.Dashboard.Overview.AdminDropdown.ChangePeerReviewer.LINK_TEXT)
- Wait.text_by_id(Constants.Dashboard.Overview.AdminDropdown.ChangePeerReviewer.Wizard.Title.ID,
- Constants.Dashboard.Overview.AdminDropdown.ChangePeerReviewer.Wizard.Title.TEXT)
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangePeerReviewer.LINK_TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangePeerReviewer.Wizard.Title.ID,
+ Constants.Dashboard.Overview.AdminDropdown.
+ ChangePeerReviewer.Wizard.Title.TEXT)
@staticmethod
def click_on_update_status_from_dropdown():
@@ -256,23 +308,39 @@ class FEOverview:
def fill_update_status_form_admin_dropdown():
random_string = Helper.rand_string()
Enter.text_by_name(
- Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS, str(50))
- Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS,
- Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.TARGET)
- Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS,
- Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.HEAT)
- Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS,
- Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.IMAGE_SACN)
- Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS,
- Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.AIC)
- Enter.date_picker(Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS_CSS,
- Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.ASDC)
+ Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.PROGRESS,
+ str(50))
+ Enter.date_picker(
+ Constants.Dashboard.Overview.AdminDropdown.
+ UpdateStatus.PROGRESS_CSS,
+ Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.TARGET)
+ Enter.date_picker(
+ Constants.Dashboard.Overview.AdminDropdown.
+ UpdateStatus.PROGRESS_CSS,
+ Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.HEAT)
+ Enter.date_picker(
+ Constants.Dashboard.Overview.AdminDropdown.
+ UpdateStatus.PROGRESS_CSS,
+ Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.IMAGE_SACN)
+ Enter.date_picker(
+ Constants.Dashboard.Overview.AdminDropdown.
+ UpdateStatus.PROGRESS_CSS,
+ Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.AIC)
+ Enter.date_picker(
+ Constants.Dashboard.Overview.AdminDropdown.
+ UpdateStatus.PROGRESS_CSS,
+ Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.ASDC)
Enter.text_by_name(
- Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.STATUS, random_string)
+ Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.STATUS,
+ random_string)
Click.css(
- Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.SUBMIT, wait_for_page=True)
+ Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.SUBMIT,
+ wait_for_page=True)
Wait.text_by_id(
- Constants.Toast.ID, Constants.Dashboard.Overview.AdminDropdown.UpdateStatus.SUCCESS_MSG, wait_for_page=True)
+ Constants.Toast.ID,
+ Constants.Dashboard.Overview.AdminDropdown.
+ UpdateStatus.SUCCESS_MSG,
+ wait_for_page=True)
Wait.text_by_id(
Constants.Dashboard.Overview.Status.Description.ID, random_string)
@@ -285,7 +353,8 @@ class FEOverview:
i = 0
ns_list = []
steps_length = len(
- session.ice_driver.find_elements_by_css_selector(".step-indication > li"))
+ session.ice_driver.find_elements_by_css_selector(
+ ".step-indication > li"))
while i < steps_length:
ns_list.append(FEOverview.get_next_step_description(i))
i += 1
@@ -297,20 +366,31 @@ class FEOverview:
for idx, step_uuid in enumerate(steps_uuids):
db_step_text = DBVirtualFunction.select_next_step_description(
step_uuid)
- Wait.text_by_id(Constants.Dashboard.Overview.NextSteps.Add.Description.STEP_DESC_ID +
- str(idx), ui_steps[idx], wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Overview.NextSteps.Add.Description.
+ STEP_DESC_ID + str(idx),
+ ui_steps[idx],
+ wait_for_page=True)
if db_step_text != ui_steps[idx]:
- raise AssertionError("Next step is not located in expected index. db_step_text = "
- + db_step_text + " ui_steps[idx] = " + ui_steps[idx] + "|| uuid = " + step_uuid)
+ raise AssertionError(
+ "Next step is not located in expected index. " +
+ "db_step_text = " +
+ db_step_text +
+ " ui_steps[idx] = " +
+ ui_steps[idx] +
+ "|| uuid = " +
+ step_uuid)
@staticmethod
def next_steps_filter_by_files():
Click.id(
Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.ID)
Click.link_text(
- Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.ANY_FILE_LINK_TEXT)
+ Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.
+ ANY_FILE_LINK_TEXT)
Click.link_text(
- Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.FILE0_LINK_TEXT)
+ Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.
+ FILE0_LINK_TEXT)
Click.id(
Constants.Dashboard.Overview.NextSteps.FilterByFileDropDown.ID)
@@ -327,9 +407,11 @@ class FEOverview:
def next_steps_filter_by_states():
Click.id(Constants.Dashboard.Overview.NextSteps.StateDropDown.ID)
Click.link_text(
- Constants.Dashboard.Overview.NextSteps.StateDropDown.INCOMPLETE_LINK_TEXT)
+ Constants.Dashboard.Overview.NextSteps.StateDropDown.
+ INCOMPLETE_LINK_TEXT)
Click.link_text(
- Constants.Dashboard.Overview.NextSteps.StateDropDown.COMPLETED_LINK_TEXT)
+ Constants.Dashboard.Overview.NextSteps.StateDropDown.
+ COMPLETED_LINK_TEXT)
Click.id(Constants.Dashboard.Overview.NextSteps.StateDropDown.ID)
@staticmethod
@@ -341,10 +423,12 @@ class FEOverview:
Helper.rand_string("randomString")
Click.id(Constants.Dashboard.Overview.NextSteps.Add.Description.ID)
Enter.text_by_id(
- Constants.Dashboard.Overview.NextSteps.Add.Description.ID, ns_description)
+ Constants.Dashboard.Overview.NextSteps.Add.Description.ID,
+ ns_description)
FEWizard.date_picker_add_ns(0)
- Wait.text_by_css(Constants.SubmitButton.CSS,
- Constants.Dashboard.Overview.NextSteps.Add.Button.TEXT)
+ Wait.text_by_css(
+ Constants.SubmitButton.CSS,
+ Constants.Dashboard.Overview.NextSteps.Add.Button.TEXT)
Click.css(Constants.SubmitButton.CSS)
Wait.modal_to_dissappear()
@@ -361,10 +445,14 @@ class FEOverview:
Constants.Dashboard.Overview.TeamMember.RemoveUser.ID)
else:
Click.id(Constants.Dashboard.Overview.TeamMember.RemoveUser.ID)
- Wait.text_by_id(Constants.Dashboard.GeneralPrompt.UpperTitle.ID,
- Constants.Dashboard.Overview.TeamMember.RemoveUser.Title.TEXT % full_name)
- Wait.text_by_id(Constants.Dashboard.GeneralPrompt.Title.ID,
- Constants.Dashboard.Overview.TeamMember.RemoveUser.Message.TEXT)
+ Wait.text_by_id(
+ Constants.Dashboard.GeneralPrompt.UpperTitle.ID,
+ Constants.Dashboard.Overview.TeamMember.RemoveUser.Title.TEXT %
+ full_name)
+ Wait.text_by_id(
+ Constants.Dashboard.GeneralPrompt.Title.ID,
+ Constants.Dashboard.Overview.TeamMember.RemoveUser.
+ Message.TEXT)
Click.id(Constants.Dashboard.GeneralPrompt.ApproveButton.ID)
FEGeneral.refresh()
Wait.id_to_dissappear(
@@ -373,9 +461,18 @@ class FEOverview:
@staticmethod
def invite_and_reopen_link(user_content, other_el_email):
enguuid = DBGeneral.select_where(
- "uuid", "ice_engagement", "engagement_manual_id", user_content['engagement_manual_id'], 1)
+ "uuid",
+ "ice_engagement",
+ "engagement_manual_id",
+ user_content['engagement_manual_id'],
+ 1)
invitation_token = DBUser.select_invitation_token(
- "invitation_token", "ice_invitation", "engagement_uuid", enguuid, other_el_email, 1)
+ "invitation_token",
+ "ice_invitation",
+ "engagement_uuid",
+ enguuid,
+ other_el_email,
+ 1)
inviterURL = Constants.Default.InviteURL.Login.TEXT + invitation_token
FEGeneral.re_open(inviterURL)
@@ -392,14 +489,16 @@ class FEOverview:
def validate_empty_associated_files():
FEOverview.add_next_step()
Click.id(Constants.Dashboard.Overview.NextSteps.AssociatedFiles.ID)
- Wait.text_by_id(Constants.Dashboard.Overview.NextSteps.AssociatedFiles.EmptyMsgID,
- Constants.Dashboard.Overview.NextSteps.AssociatedFiles.EmptyMsg)
+ Wait.text_by_id(
+ Constants.Dashboard.Overview.NextSteps.AssociatedFiles.EmptyMsgID,
+ Constants.Dashboard.Overview.NextSteps.AssociatedFiles.EmptyMsg)
@staticmethod
def validate_associated_files(file_name):
Click.id(Constants.Dashboard.Overview.NextSteps.AssociatedFiles.ID)
Wait.text_by_id(
- Constants.Dashboard.Overview.NextSteps.AssociatedFiles.FileId, file_name)
+ Constants.Dashboard.Overview.NextSteps.AssociatedFiles.FileId,
+ file_name)
@staticmethod
def validate_bucket_url(eng_manual_id, vf_name):
@@ -411,9 +510,10 @@ class FEOverview:
@staticmethod
def verify_validation_dates():
validation_date = Get.by_id(
- Constants.Dashboard.Overview.Progress.ValidationsDates.AIC_ID, True)
- validation_date = datetime.datetime.strptime(
- validation_date, "%m/%d/%y").date()
+ Constants.Dashboard.Overview.Progress.ValidationsDates.AIC_ID,
+ True)
+ validation_date = datetime.datetime.strptime(validation_date,
+ "%m/%d/%y").date()
current_date = timezone.now().date()
Helper.internal_assert(validation_date, current_date)
diff --git a/services/frontend/fe_user.py b/services/frontend/fe_user.py
index eb25d23..1438007 100644
--- a/services/frontend/fe_user.py
+++ b/services/frontend/fe_user.py
@@ -59,7 +59,11 @@ logger = LoggingServiceFactory.get_logger()
class FEUser:
@staticmethod
- def login(email, password, expected_element=Constants.Dashboard.Statuses.Title.ID, element_type="id"):
+ def login(
+ email,
+ password,
+ expected_element=Constants.Dashboard.Statuses.Title.ID,
+ element_type="id"):
try:
logger.debug("Verifying and Insert Login page elements:")
logger.debug("Insert Email " + email)
@@ -81,7 +85,11 @@ class FEUser:
raise Exception(errorMsg, e)
@staticmethod
- def relogin(email, password, expected_element=Constants.Dashboard.Statuses.Title.ID, element_type="id"):
+ def relogin(
+ email,
+ password,
+ expected_element=Constants.Dashboard.Statuses.Title.ID,
+ element_type="id"):
FEGeneral.re_open(Constants.Default.LoginURL.TEXT)
FEUser.login(email, password, expected_element, element_type)
@@ -91,7 +99,11 @@ class FEUser:
Click.link_text(Constants.Dashboard.Avatar.Logout.LINK_TEXT)
@staticmethod
- def activate_and_login(email, password, expected_element=Constants.Dashboard.Statuses.Title.ID, element_type="id"):
+ def activate_and_login(
+ email,
+ password,
+ expected_element=Constants.Dashboard.Statuses.Title.ID,
+ element_type="id"):
activationUrl = DBUser.get_activation_url(email)
FEGeneral.re_open(activationUrl)
FEUser.login(email, password, expected_element, element_type)
@@ -124,7 +136,7 @@ class FEUser:
accountObj = [randomName, phone, password]
return accountObj
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "Failed in update accaunt ."
raise Exception(errorMsg)
raise
@@ -169,15 +181,18 @@ class FEUser:
def click_on_feedback():
Click.id(Constants.Dashboard.Feedback.ID, wait_for_page=True)
Wait.id(
- Constants.Dashboard.Feedback.FeedbackModal.SAVE_BTN_ID, wait_for_page=True)
+ Constants.Dashboard.Feedback.FeedbackModal.SAVE_BTN_ID,
+ wait_for_page=True)
@staticmethod
def validate_feedback(description, user_email):
- query = "SELECT user_id FROM ice_feedback where description = '{desc}'".format(
- desc=description)
+ query = "SELECT user_id FROM ice_feedback where " +\
+ "description = '{desc}'".format(
+ desc=description)
feedback_user_uuid = DBGeneral.select_query(query)
- query = "SELECT id FROM ice_user_profile where email = '{email}'".format(
- email=user_email)
+ query = "SELECT id FROM ice_user_profile where " +\
+ "email = '{email}'".format(
+ email=user_email)
user_uuid = DBGeneral.select_query(query)
Helper.internal_assert(user_uuid, feedback_user_uuid)
@@ -187,7 +202,8 @@ class FEUser:
description = Helper.rand_string("randomString")
Enter.text_by_css("textarea[name=\"description\"]", description)
Click.id(
- Constants.Dashboard.Feedback.FeedbackModal.SAVE_BTN_ID, wait_for_page=True)
+ Constants.Dashboard.Feedback.FeedbackModal.SAVE_BTN_ID,
+ wait_for_page=True)
Wait.text_by_id(Constants.Toast.ID,
"Feedback was sent successfully.", wait_for_page=True)
return description
@@ -206,9 +222,12 @@ class FEUser:
def click_on_notifications():
try:
Click.link_text(
- Constants.Dashboard.Avatar.Notifications.LINK_TEXT, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.Avatar.Notifications.Title.ID,
- Constants.Dashboard.Avatar.Notifications.Title.TEXT, wait_for_page=True)
+ Constants.Dashboard.Avatar.Notifications.LINK_TEXT,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Avatar.Notifications.Title.ID,
+ Constants.Dashboard.Avatar.Notifications.Title.TEXT,
+ wait_for_page=True)
except Exception as e:
errorMsg = "Failed to click_on on Admin."
raise Exception(errorMsg, e)
@@ -231,21 +250,28 @@ class FEUser:
logger.error(
"APIUser should not have assigned next steps at first login.")
raise
- if (Get.by_id("next-steps-list") != "No next steps are assigned to you."):
+ if (Get.by_id("next-steps-list") !=
+ "No next steps are assigned to you."):
logger.error(
- "No assigned next steps and text 'No next steps are assigned to you.' was not found.")
+ "No assigned next steps and text 'No next steps are " +
+ "assigned to you.' was not found.")
raise
token = "token " + APIUser.login_user(user_content['el_email'])
user_content['session_token'] = token
logger.debug(
- "Adding new next step (via api) and assigning it to user " + user_content['full_name'])
+ "Adding new next step (via api) and assigning it to user " +
+ user_content['full_name'])
APIVirtualFunction.add_next_step(user_content)
logger.debug(
- "Refresh page and look for changes in assigned next steps section:")
+ "Refresh page and look for changes in assigned " +
+ "next steps section:")
FEGeneral.refresh()
logger.debug(" > Check if number has changed in 'Assigned To You'")
- Wait.text_by_id(
- "next-steps-header", "Assigned To You (1)", wait_for_page=True)
+ FEUser.logout()
+ FEUser.login(
+ user_content['email'], Constants.Default.Password.TEXT)
+ text = Get.by_id("next-steps-header", True)
+ Helper.internal_assert(text, "Assigned To You (1)")
@staticmethod
def set_ssh_key_from_account(key, is_negative=False):
@@ -254,23 +280,29 @@ class FEUser:
Click.css(Constants.SubmitButton.CSS)
if is_negative:
Wait.text_by_id(
- Constants.Toast.ID, Constants.Dashboard.Avatar.Account.SSHKey.UpdateFailed.TEXT)
+ Constants.Toast.ID,
+ Constants.Dashboard.Avatar.Account.SSHKey.UpdateFailed.TEXT)
else:
Wait.text_by_id(
- Constants.Toast.ID, Constants.Dashboard.Avatar.Account.Update.Success.TEXT)
+ Constants.Toast.ID,
+ Constants.Dashboard.Avatar.Account.Update.Success.TEXT)
@staticmethod
def reset_password():
Wait.text_by_css(
- Constants.UpdatePassword.Title.CSS, Constants.UpdatePassword.Title.TEXT)
+ Constants.UpdatePassword.Title.CSS,
+ Constants.UpdatePassword.Title.TEXT)
Wait.text_by_css(
- Constants.UpdatePassword.SubTitle.CSS, Constants.UpdatePassword.SubTitle.TEXT)
+ Constants.UpdatePassword.SubTitle.CSS,
+ Constants.UpdatePassword.SubTitle.TEXT)
Wait.text_by_css(
Constants.SubmitButton.CSS, Constants.UpdatePassword.Button.TEXT)
Enter.text_by_name(
- Constants.UpdatePassword.Password.NAME, Constants.Default.Password.NewPass.TEXT)
+ Constants.UpdatePassword.Password.NAME,
+ Constants.Default.Password.NewPass.TEXT)
Enter.text_by_name(
- Constants.UpdatePassword.ConfirmPassword.NAME, Constants.Default.Password.NewPass.TEXT)
+ Constants.UpdatePassword.ConfirmPassword.NAME,
+ Constants.Default.Password.NewPass.TEXT)
Click.css(Constants.SubmitButton.CSS)
Wait.text_by_id(
Constants.Toast.ID, Constants.UpdatePassword.Toast.TEXT)
@@ -279,8 +311,8 @@ class FEUser:
def delete_notification(notificationID):
if isinstance(notificationID, tuple):
notificationID = notificationID[0]
- delete_button = Constants.Dashboard.Avatar.Notifications.DeleteNotification.ID + \
- notificationID
+ delete_button = Constants.Dashboard.Avatar.Notifications.\
+ DeleteNotification.ID + notificationID
# Click on delete button.
Click.id(delete_button, wait_for_page=True)
Wait.id_to_dissappear(delete_button)
@@ -292,7 +324,8 @@ class FEUser:
if isinstance(notifID, tuple):
notifID = notifID[0]
ui_list.append(str(Get.by_id(
- Constants.Dashboard.Avatar.Notifications.NotificationColumn.ID + notifID)))
+ Constants.Dashboard.Avatar.Notifications.
+ NotificationColumn.ID + notifID)))
for activity in notification_list:
if not any(activity in s for s in ui_list):
raise AssertionError(
@@ -313,8 +346,9 @@ class FEUser:
def open_invite_team_member_form(vf_left_nav_id):
Click.id(vf_left_nav_id)
Click.id(Constants.Dashboard.Overview.TeamMember.ID)
- Wait.text_by_name(Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME,
- Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT)
+ Wait.text_by_name(
+ Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME,
+ Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT)
@staticmethod
def invite_single_user_to_team(email):
@@ -325,49 +359,64 @@ class FEUser:
def go_to_user_profile_settings():
FEUser.go_to_account()
Click.id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ID, wait_for_page=True)
- Wait.text_by_id(Constants.Dashboard.Avatar.Account.UserProfileSettings.TitleID,
- Constants.Dashboard.Avatar.Account.UserProfileSettings.TitleText, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.ID,
+ wait_for_page=True)
+ Wait.text_by_id(
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.TitleID,
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.TitleText,
+ wait_for_page=True)
@staticmethod
def check_user_profile_settings_checkboxes():
Click.id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.
+ UserProfileSettings.ReceiveEmailsID,
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.
+ ReceiveEmailEveryTimeID,
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.
+ ReceiveDigestEmailID,
+ wait_for_page=True)
Click.id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.UpdateButtonID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.
+ UpdateButtonID,
+ wait_for_page=True)
@staticmethod
def validate_user_profile_settings_checkboxes(checked):
Wait.page_has_loaded()
receive_emails = Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, True)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.
+ ReceiveEmailsID, True)
Helper.internal_assert(receive_emails, checked)
- receive_notifications = \
- Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID, True)
- receive_email_every_time = \
- Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID, True)
+ Get.is_selected_by_id(
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.
+ ReceiveNotificationsID, True)
+ receive_email_every_time = Get.is_selected_by_id(
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.
+ ReceiveEmailEveryTimeID, True)
Helper.internal_assert(receive_email_every_time, checked)
- receive_digest_email = \
- Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, True)
+ receive_digest_email = Get.is_selected_by_id(
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.
+ ReceiveDigestEmailID, True)
Helper.internal_assert(receive_digest_email, not checked)
@staticmethod
def compare_notifications_count_for_user(expected_count):
Wait.text_by_id(
- Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, True)
+ Constants.Dashboard.Avatar.Notifications.Count.ID,
+ expected_count,
+ True)
@staticmethod
def check_notification_number_is_not_presented():
FEGeneral.refresh()
Wait.id_to_dissappear(
- Constants.Dashboard.Avatar.Notifications.Count.ID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Notifications.Count.ID,
+ wait_for_page=True)
@staticmethod
def validate_account_details(full_name, phone_number, ssh_key):
@@ -375,8 +424,8 @@ class FEUser:
Constants.Dashboard.Avatar.Account.FullName.NAME))
Helper.internal_assert(phone_number, Get.value_by_name(
Constants.Dashboard.Avatar.Account.Phone.NAME))
- Helper.internal_assert(
- ssh_key, Get.value_by_name(Constants.Dashboard.Avatar.Account.SSHKey.NAME))
+ Helper.internal_assert(ssh_key, Get.value_by_name(
+ Constants.Dashboard.Avatar.Account.SSHKey.NAME))
@staticmethod
def check_rgwa_access_key(my_key):
@@ -387,14 +436,16 @@ class FEUser:
def check_rgwa_access_secret(my_secret):
Click.id(Constants.Dashboard.Avatar.Account.RGWA.Secret.BUTTON_ID)
Wait.text_by_id(
- Constants.Dashboard.Avatar.Account.RGWA.Secret.SECRET_ID, my_secret)
+ Constants.Dashboard.Avatar.Account.RGWA.Secret.SECRET_ID,
+ my_secret)
@staticmethod
def get_rgwa_access_secret():
Click.id(Constants.Dashboard.Avatar.Account.RGWA.Secret.BUTTON_ID,
wait_for_page=True)
secret = Get.by_id(
- Constants.Dashboard.Avatar.Account.RGWA.Secret.SECRET_ID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.RGWA.Secret.SECRET_ID,
+ wait_for_page=True)
return secret
@staticmethod
diff --git a/services/frontend/fe_wizard.py b/services/frontend/fe_wizard.py
index df5087d..c299a4b 100644
--- a/services/frontend/fe_wizard.py
+++ b/services/frontend/fe_wizard.py
@@ -61,7 +61,9 @@ class FEWizard:
try:
logger.debug("Tab Add Virtual Functions")
Wait.text_by_css(
- Constants.Dashboard.Wizard.Title.CSS, Constants.Dashboard.Wizard.AddVF.Title.TEXT, wait_for_page=True)
+ Constants.Dashboard.Wizard.Title.CSS,
+ Constants.Dashboard.Wizard.AddVF.Title.TEXT,
+ wait_for_page=True)
vfName = "newVF" + Helper.rand_string("randomString")
vfVersion = "newVFVersion" + \
Helper.rand_string(
@@ -70,9 +72,11 @@ class FEWizard:
Enter.text_by_name("VFversion", vfVersion, wait_for_page=True)
FEWizard.date_picker_wizard()
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.Wizard.AddVF.AIC_Version.TEXT)).select_by_visible_text("AIC 3.5")
+ Constants.Dashboard.Wizard.AddVF.AIC_Version.TEXT
+ )).select_by_visible_text("AIC 3.5")
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.Wizard.AddVF.ECOMP_Release.TEXT)).select_by_visible_text("Unknown")
+ Constants.Dashboard.Wizard.AddVF.ECOMP_Release.TEXT
+ )).select_by_visible_text("Unknown")
session.E2Edate = FEWizard.get_lab_entry_date()
Click.css(Constants.SubmitButton.CSS, wait_for_page=True)
Wait.page_has_loaded()
@@ -80,7 +84,8 @@ class FEWizard:
return vfName
# If failed - count the failure and add the error to list of errors.
except Exception as e:
- errorMsg = "Failed to add a Virtual Function via modal window. Exception " + \
+ errorMsg = "Failed to add a Virtual Function via modal window. " +\
+ "Exception " +\
str(e)
raise Exception(errorMsg)
@@ -93,8 +98,10 @@ class FEWizard:
@staticmethod
def add_vendor_contact():
logger.debug("Tab Add Vendor Contact")
- Wait.text_by_css(Constants.Dashboard.Wizard.Title.CSS,
- Constants.Dashboard.Wizard.AddVendorContact.Title.TEXT, wait_for_page=True)
+ Wait.text_by_css(
+ Constants.Dashboard.Wizard.Title.CSS,
+ Constants.Dashboard.Wizard.AddVendorContact.Title.TEXT,
+ wait_for_page=True)
Select(session.ice_driver.find_element_by_name(
"company")).select_by_visible_text("Ericsson")
fullname = Helper.rand_string(
@@ -115,7 +122,10 @@ class FEWizard:
logger.debug(
"Tab Add " + ServiceProvider.MainServiceProvider + " Sponsor")
Wait.text_by_css(
- Constants.Dashboard.Wizard.Title.CSS, "Add " + ServiceProvider.MainServiceProvider + " Sponsor")
+ Constants.Dashboard.Wizard.Title.CSS,
+ "Add " +
+ ServiceProvider.MainServiceProvider +
+ " Sponsor")
fullname = Helper.rand_string(
"randomString") + Helper.rand_string("randomString")
Enter.text_by_name("fullname", fullname)
@@ -127,19 +137,21 @@ class FEWizard:
Enter.text_by_name("phone", phone)
Click.css(Constants.SubmitButton.CSS)
Wait.name_to_dissappear("Add AT&T Sponsor")
- sponsor = {"company": ServiceProvider.MainServiceProvider, "full_name": fullname,
- "email": email, "phone": phone}
+ sponsor = {"company": ServiceProvider.MainServiceProvider,
+ "full_name": fullname, "email": email, "phone": phone}
return sponsor
@staticmethod
def invite_team_members(email):
try:
logger.debug("Tab Invite Team Members")
- Wait.text_by_name(Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME,
- Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT)
+ Wait.text_by_name(
+ Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME,
+ Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT)
Enter.text_by_name("email", email)
Wait.text_by_css(
- Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT)
+ Constants.SubmitButton.CSS,
+ Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT)
Click.css(Constants.SubmitButton.CSS)
Wait.name_to_dissappear(
Constants.Dashboard.Wizard.InviteTeamMembers.Title.NAME)
@@ -162,12 +174,15 @@ class FEWizard:
# Check that the submit button exists.
Wait.text_by_css(
- Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.AddSSHKey.Title.TEXT)
+ Constants.SubmitButton.CSS,
+ Constants.Dashboard.Wizard.AddSSHKey.Title.TEXT)
Click.css(Constants.SubmitButton.CSS) # Click on submit button.
if is_negative:
Wait.text_by_id(
- Constants.Toast.ID, Constants.Dashboard.Avatar.Account.SSHKey.UpdateFailed.TEXT)
+ Constants.Toast.ID,
+ Constants.Dashboard.Avatar.Account
+ .SSHKey.UpdateFailed.TEXT)
else:
Wait.name_to_dissappear(
Constants.Dashboard.Wizard.AddSSHKey.Title.NAME)
@@ -175,7 +190,8 @@ class FEWizard:
return sshKey
# If failed - count the failure and add the error to list of errors.
except Exception as e:
- errorMsg = "Failed to add an SSH Key in the modal window. Exception=" + \
+ errorMsg = "Failed to add an SSH Key in " +\
+ "the modal window. Exception=" + \
str(e)
raise Exception(errorMsg)
@@ -183,12 +199,15 @@ class FEWizard:
def invite_team_members_modal(email, wait_modal_to_disappear=True):
try:
Click.id(
- Constants.Dashboard.Overview.TeamMember.ID, wait_for_page=True)
- Wait.text_by_css(Constants.Dashboard.Wizard.Title.CSS,
- Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT)
+ Constants.Dashboard.Overview.TeamMember.ID,
+ wait_for_page=True)
+ Wait.text_by_css(
+ Constants.Dashboard.Wizard.Title.CSS,
+ Constants.Dashboard.Wizard.InviteTeamMembers.Title.TEXT)
Enter.text_by_name("email", email)
Wait.text_by_css(
- Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT)
+ Constants.SubmitButton.CSS,
+ Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT)
Click.css(".inviteMembers-form button.btn.btn-primary", True)
if wait_modal_to_disappear:
Wait.modal_to_dissappear()
@@ -201,8 +220,14 @@ class FEWizard:
@staticmethod
def date_picker_add_ns(count):
try:
- session.ice_driver.execute_script("var el = angular.element(document.querySelector('.addNextSteps')); el.scope().vm.nextSteps[" + str(
- count) + "].duedate = new Date('" + str(datetime.today().isoformat()) + "')")
+ session.ice_driver.execute_script(
+ "var el = angular.element(document.querySelector" +
+ "('.addNextSteps')); el.scope().vm.nextSteps[" +
+ str(count) +
+ "].duedate = new Date('" +
+ str(
+ datetime.today().isoformat()) +
+ "')")
Click.css("div.modal-content", wait_for_page=True)
except Exception as e:
errorMsg = "Failed to select date with datePicker."
diff --git a/services/helper.py b/services/helper.py
index 418b260..dd779be 100644
--- a/services/helper.py
+++ b/services/helper.py
@@ -1,4 +1,3 @@
-
# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
@@ -42,9 +41,10 @@ import string
import subprocess
import unittest
-from cryptography.hazmat.backends import default_backend as crypto_default_backend, \
- default_backend
-from cryptography.hazmat.primitives import serialization as crypto_serialization
+from cryptography.hazmat.backends import \
+ default_backend as crypto_default_backend, default_backend
+from cryptography.hazmat.primitives import \
+ serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from django.conf import settings
@@ -65,8 +65,11 @@ class Helper:
def rand_string(type_of_value='randomString', numberOfDigits=0):
letters_and_numbers = string.ascii_letters + string.digits
if type_of_value == 'email':
- myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
- 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
+ myEmail = ''.join(
+ random.choice(letters_and_numbers) for _ in range(
+ 4)) + "@" + ''.join(
+ random.choice(
+ string.ascii_uppercase) for _ in range(4)) + ".com"
return "ST" + myEmail
elif type_of_value == 'randomNumber':
randomNumber = ''.join("%s" % random.randint(2, 9)
@@ -93,7 +96,7 @@ class Helper:
key_size=2048,
backend=default_backend()
)
- private_key = key.private_bytes(
+ key.private_bytes(
encoding=crypto_serialization.Encoding.PEM,
format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=crypto_serialization.NoEncryption())
@@ -114,7 +117,8 @@ class Helper:
def check_admin_ssh_existence(path, admin_ssh):
if admin_ssh == open(path).read().rstrip('\n'):
logger.debug(
- "Admin SSH already defined in DB and equal to the one stored on the local system.")
+ "Admin SSH already defined in DB and equal " +
+ "to the one stored on the local system.")
return True
return False
@@ -173,7 +177,8 @@ class Helper:
return user_pub_key
except Exception as error:
logger.error(
- "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
+ "_-_-_-_-_- Unexpected error in " +
+ "get_or_create_rsa_key_for_admin: %s" % error)
raise Exception("Failed to create SSH keys for user admin", error)
@staticmethod
@@ -215,5 +220,5 @@ class Helper:
@staticmethod
def assertTrue(expr, msg=None):
"""Check that the expression is true."""
- if expr != True:
+ if not expr:
raise Exception("AssertionError: \"not expr")
diff --git a/services/session.py b/services/session.py
index 174f54b..0b8e290 100644
--- a/services/session.py
+++ b/services/session.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -58,7 +58,8 @@ class SessionSingletone():
def __init__(self):
self.wait_until_retires = self.positive_wait_until_retires
self.wait_until_time_pause = self.positive_wait_until_time_pause
- self.wait_until_time_pause_long = self.positive_wait_until_time_pause_long
+ self.wait_until_time_pause_long = \
+ self.positive_wait_until_time_pause_long
self.wait_until_implicit_time = self.positive_wait_until_implicit_time
self.test = '123'
self.ice_driver = None
@@ -94,7 +95,7 @@ class SessionSingletone():
self.start_negative()
try:
run_me() # click.css
- except:
+ except BaseException:
# will run if click does NOT succeed
self.errorCounter = 0
self.errorList = ""
@@ -103,4 +104,5 @@ class SessionSingletone():
raise Exception(error_msg)
self.end_negative()
+
session = SessionSingletone()
diff --git a/services/types.py b/services/types.py
index 7dbbe94..db58ca0 100644
--- a/services/types.py
+++ b/services/types.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.