aboutsummaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
Diffstat (limited to 'services')
-rw-r--r--services/api/api_checklist.py22
-rw-r--r--services/api/api_gitlab.py52
-rw-r--r--services/api/api_jenkins.py23
-rw-r--r--services/constants.py27
-rw-r--r--services/database/db_checklist.py18
-rw-r--r--services/frontend/base_actions/get.py22
-rw-r--r--services/frontend/base_actions/wait.py19
-rw-r--r--services/frontend/fe_checklist.py3
-rw-r--r--services/frontend/fe_dashboard.py10
-rw-r--r--services/frontend/fe_detailed_view.py8
-rw-r--r--services/frontend/fe_general.py25
-rw-r--r--services/frontend/fe_invite.py9
-rw-r--r--services/frontend/fe_overview.py5
-rw-r--r--services/frontend/fe_user.py15
-rw-r--r--services/frontend/fe_wizard.py13
-rw-r--r--services/helper.py319
16 files changed, 328 insertions, 262 deletions
diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py
index ef7b8a3..fda0730 100644
--- a/services/api/api_checklist.py
+++ b/services/api/api_checklist.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -47,6 +47,7 @@ from services.constants import Constants
from services.database.db_general import DBGeneral
from services.helper import Helper
from services.logging_service import LoggingServiceFactory
+from services.database.db_checklist import DBChecklist
logger = LoggingServiceFactory.get_logger()
@@ -208,12 +209,21 @@ class APIChecklist:
@staticmethod
def move_cl_to_closed(cl_uuid, vf_staff_emails):
api_checklist_obj = APIChecklist()
-
+ states = [Constants.ChecklistStates.PeerReview.TEXT,
+ Constants.ChecklistStates.Approval.TEXT,
+ Constants.ChecklistStates.Handoff.TEXT,
+ Constants.ChecklistStates.Closed.TEXT]
for i in range(len(vf_staff_emails)):
- logger.debug("Trying to jump state for %s [%s]" % (vf_staff_emails[i], i))
+ logger.debug(
+ "Trying to jump state for %s [%s]" % (vf_staff_emails[i], i))
+ DBChecklist.update_all_decisions_to_approve(cl_uuid)
api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i])
+ logger.debug("Checking state changed to %s" % states[i])
+ DBChecklist.state_changed("uuid", cl_uuid, states[i])
# Move CL to closed state.
- logger.debug("Trying to jump state 'closed' for %s" % vf_staff_emails[0])
+ logger.debug("Trying to jump state 'closed' for %s" %
+ vf_staff_emails[0])
api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[0])
-
+ logger.debug("Checking state changed to %s" % states[-1])
+ DBChecklist.state_changed("uuid", cl_uuid, states[-1])
diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py
index c7b25e0..6c5a2ff 100644
--- a/services/api/api_gitlab.py
+++ b/services/api/api_gitlab.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -56,6 +56,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class APIGitLab:
@staticmethod
@@ -79,13 +80,13 @@ class APIGitLab:
headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
try:
r1 = requests.get(getURL, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
counter = 0
- while r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ while r1.status_code == 404 or r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause)
r1 = requests.get(getURL, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
-
+ logger.debug(
+ "trying to get the git project, yet to succeed (try #%s)" % counter)
+ counter += 1
if r1.content == b'[]':
logger.error("Got an empty list as a response.")
raise
@@ -363,32 +364,15 @@ class APIGitLab:
@staticmethod
def is_gitlab_ready(user_content):
- counter = 1
- gettURL = settings.ICE_EM_URL + '/v1/engmgr/engagement/' + \
- user_content['engagement_uuid'] + '/checklist/new/'
- logger.debug(
- "Get URL to check if GitLab and Jenkins are ready: " + gettURL)
- # Validate with EL
- token = "token " + APIBridge.login_user(user_content['el_email'])
- headers = dict() # Create header for get request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = token
- r1 = requests.get(gettURL, headers=headers, verify=False)
- while (r1.content == b'"Create New checklist is not ready yet"' and counter <=
- Constants.GitLabConstants.RETRIES_NUMBER):
- time.sleep(session.wait_until_time_pause_long)
- logger.debug(
- "GitLab and Jenkins are not ready yet, trying again (%s of %s)" %
- (counter, Constants.GitLabConstants.RETRIES_NUMBER))
- r1 = requests.get(gettURL, headers=headers, verify=False)
- counter += 1
- if r1.status_code != 200:
- if r1.content == "Create New checklist is not ready yet":
- raise Exception("Max retries exceeded, failing test...")
- else:
- raise Exception("Something went wrong while waiting for GitLab and Jenkins. %s %s" % (
- r1.status_code, r1.reason))
- return False
- elif r1.status_code == 200:
- logger.debug("Gitlab and Jenkins are ready to continue!")
+ path_with_namespace = user_content[
+ 'engagement_manual_id'] + "%2F" + user_content['vfName']
+ # If admin user is in project repo, it means the project exists as
+ # well.
+ cont = APIGitLab.validate_git_project_members(
+ path_with_namespace, Constants.Users.Admin.EMAIL)
+ if cont:
+ logger.debug("Gitlab is ready for action, git repo was found!")
return True
+ else:
+ raise Exception(
+ "Something went wrong while waiting for GitLab.")
diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py
index e1e1f6e..b63cb66 100644
--- a/services/api/api_jenkins.py
+++ b/services/api/api_jenkins.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -37,26 +37,36 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
from django.conf import settings
-import requests
from requests.auth import HTTPBasicAuth
-
from services.constants import Constants
from services.helper import Helper
-from services.logging_service import LoggingServiceFactory
+import logging
+import requests
+import time
+from services.session import session
+
+logger = logging.getLogger('ice-ci.logger')
-logger = LoggingServiceFactory.get_logger()
class APIJenkins:
@staticmethod
def get_jenkins_job(job_name):
r1 = None
+ counter = 0
getURL = settings.JENKINS_URL + "job/" + job_name
logger.debug("Get APIJenkins job URL: " + getURL)
try:
r1 = requests.get(getURL, auth=HTTPBasicAuth(
settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD))
+ while r1.status_code != 200 and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ r1 = requests.get(getURL, auth=HTTPBasicAuth(
+ settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD))
+ time.sleep(session.wait_until_time_pause)
+ logger.debug(
+ "try to get jenkins job (try #%s)" % counter)
+ counter += 1
Helper.internal_assert(r1.status_code, 200)
logger.debug("Job was created on APIJenkins!")
except:
@@ -78,4 +88,3 @@ class APIJenkins:
if Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.BUILD_IDENTIFIER in line:
parts = line.partition('jenkins')
return parts[2]
-
diff --git a/services/constants.py b/services/constants.py
index 0be529f..23d7ef1 100644
--- a/services/constants.py
+++ b/services/constants.py
@@ -42,7 +42,7 @@ from django.conf import settings
class ServiceProvider:
PROGRAM_NAME = "VVP"
MainServiceProvider = "ServiceProvider"
- email = "example.com"
+ email = "example-domain.com"
class Constants:
@@ -89,9 +89,21 @@ class Constants:
class Review:
TEXT = "review"
+ class PeerReview:
+ TEXT = "peer_review"
+
+ class Approval:
+ TEXT = "approval"
+
+ class Handoff:
+ TEXT = "handoff"
+
class Archive:
TEXT = "archive"
+ class Closed:
+ TEXT = "closed"
+
class FEConstants:
RETRIES_NUMBER = 120
@@ -108,14 +120,15 @@ class Constants:
class Users:
class Admin:
- EMAIL = "admin@example.com"
+ EMAIL = "admin@" + ServiceProvider.email
FULLNAME = "admin bogus user"
class AdminRO:
- EMAIL = "admin_ro@example.com"
+ EMAIL = "admin_ro@" + ServiceProvider.email
class LongEmailLengthStandardUser:
- EMAIL = "50charslengthemailofstandarduserforinvite@example.com"
+ EMAIL = "50charslengthemailofstandardus@" + \
+ ServiceProvider.email
class Toast:
ID = "toast-successfully-message"
@@ -745,9 +758,9 @@ class Constants:
class FilterByFileDropDown:
ID = "selected-file-filter-dropdown"
ANY_FILE_LINK_TEXT = "Any file"
- FILE0_LINK_TEXT = "file0.yaml"
- FILE1_LINK_TEXT = "file1.yaml"
- FILE2_LINK_TEXT = "file2.yaml"
+ FILE0_LINK_TEXT = "file0"
+ FILE1_LINK_TEXT = "file1"
+ FILE2_LINK_TEXT = "file2"
class StateDropDown:
ID = "selected-state-filter-dropdown"
diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py
index 15851cf..04f8a44 100644
--- a/services/database/db_checklist.py
+++ b/services/database/db_checklist.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -50,6 +50,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class DBChecklist:
@staticmethod
@@ -361,16 +362,18 @@ class DBChecklist:
@staticmethod
def state_changed(identify_field, field_value, expected_state):
- get_state = str(DBGeneral.select_where(
- "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1))
+ get_state = DBGeneral.select_where_order_by_desc(
+ "state", Constants.DBConstants.IceTables.CHECKLIST,
+ identify_field, field_value, "create_time")[0]
counter = 0
while get_state != expected_state and counter <= Constants.DBConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause_long)
logger.debug("Checklist state not changed yet , expecting state: %s, current result: %s (attempt %s of %s)" % (
expected_state, get_state, counter, Constants.DBConstants.RETRIES_NUMBER))
counter += 1
- get_state = str(DBGeneral.select_where(
- "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1))
+ get_state = DBGeneral.select_where_order_by_desc(
+ "state", Constants.DBConstants.IceTables.CHECKLIST,
+ identify_field, field_value, "create_time")[0]
if get_state == expected_state:
logger.debug("Checklist state was successfully changed into: " +
@@ -382,5 +385,6 @@ class DBChecklist:
@staticmethod
def get_recent_checklist_uuid(name):
required_uuid = DBGeneral.select_where_not_and_order_by_desc(
- 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')
+ 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name,
+ 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')
return required_uuid
diff --git a/services/frontend/base_actions/get.py b/services/frontend/base_actions/get.py
index 8735c1b..5fb801a 100644
--- a/services/frontend/base_actions/get.py
+++ b/services/frontend/base_actions/get.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -37,6 +37,7 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
from services.frontend.base_actions.wait import Wait
+from services.helper import Helper
from services.session import session
@@ -98,8 +99,10 @@ class Get:
raise Exception(errorMsg, attr_name_value)
@staticmethod
- def value_by_name(attr_name_value):
+ def value_by_name(attr_name_value, wait_for_page=False):
try:
+ if wait_for_page:
+ Wait.page_has_loaded()
Wait.name(attr_name_value)
return session.ice_driver.find_element_by_name(attr_name_value).get_attribute("value")
except Exception as e:
@@ -125,3 +128,16 @@ class Get:
except Exception as e:
errorMsg = "Failed to get if it's selected by id:" + attr_id_value
raise Exception(errorMsg, attr_id_value)
+
+ @staticmethod
+ def is_checkbox_selected_by_id(attr_id_value, wait_for_page=False):
+ try:
+ if wait_for_page:
+ Wait.page_has_loaded()
+ Wait.id(attr_id_value)
+ return Helper.internal_assert_boolean_true_false(
+ session.ice_driver.find_element_by_id(
+ attr_id_value).get_attribute("value"), "on")
+ except Exception as e:
+ errorMsg = "Failed to get if it's selected by id:" + attr_id_value
+ raise Exception(errorMsg, attr_id_value)
diff --git a/services/frontend/base_actions/wait.py b/services/frontend/base_actions/wait.py
index 50eff08..a699917 100644
--- a/services/frontend/base_actions/wait.py
+++ b/services/frontend/base_actions/wait.py
@@ -196,17 +196,18 @@ class Wait:
@staticmethod
def page_has_loaded():
- countwait_untilelement_to_be_presented_by_id = 0
for _ in range(Constants.FEConstants.RETRIES_NUMBER):
- httpRequests = session.ice_driver.execute_script(
- 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;')
- if(str(httpRequests) == "0"):
+ try:
+ httpRequests = session.ice_driver.execute_script(
+ 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;')
+ if(str(httpRequests) == "0"):
+ time.sleep(session.wait_until_time_pause)
+ return
+ logger.debug(
+ "Checking if {} page is loaded. ".format(session.ice_driver.current_url))
time.sleep(session.wait_until_time_pause)
- return
- logger.debug(
- "Checking if {} page is loaded. ".format(session.ice_driver.current_url))
- time.sleep(session.wait_until_time_pause)
- countwait_untilelement_to_be_presented_by_id += 1
+ except Exception as exception:
+ continue
raise Exception("Page loading took too much time")
diff --git a/services/frontend/fe_checklist.py b/services/frontend/fe_checklist.py
index 75f957a..3afc472 100644
--- a/services/frontend/fe_checklist.py
+++ b/services/frontend/fe_checklist.py
@@ -600,7 +600,7 @@ class FEChecklist:
if checklist_uuid is None:
checklist_uuid = DBGeneral.select_where_not_and_order_by_desc(
'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', checklistName, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')[0]
- Click.id("checklist-" + checklist_uuid)
+ Click.id("checklist-" + checklist_uuid, True)
@staticmethod
def validate_reject_is_enabled():
@@ -752,4 +752,3 @@ class FEChecklist:
"Jenkins log could not be viewed.")
Click.id(Constants.Dashboard.Modal.CLOSE_BUTTON_ID)
return log
-
diff --git a/services/frontend/fe_dashboard.py b/services/frontend/fe_dashboard.py
index 0df66d4..b8d51b5 100644
--- a/services/frontend/fe_dashboard.py
+++ b/services/frontend/fe_dashboard.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -221,21 +221,21 @@ class FEDashboard:
eng_manual_id = user_content['engagement_manual_id'] + ":"
Wait.text_by_id(engSearchID, eng_manual_id)
logger.debug("Engagement found (searched by engagement_manual_id)")
- FEGeneral.refresh()
+ FEGeneral.smart_refresh()
logger.debug("Search engagement by VF name")
# Search by VF name.
Enter.text_by_id(
Constants.Dashboard.Statuses.SearchBox.ID, user_content['vfName'])
Wait.text_by_id(engSearchID, eng_manual_id)
logger.debug("Engagement found (searched by VF name)")
- FEGeneral.refresh()
+ FEGeneral.smart_refresh()
logger.debug("Search engagement by VFC")
# Search by VFC.
Enter.text_by_id(
Constants.Dashboard.Statuses.SearchBox.ID, vfcName)
Wait.text_by_id(engSearchID, eng_manual_id)
logger.debug("Engagement found (searched by VFC)")
- FEGeneral.refresh()
+ FEGeneral.smart_refresh()
logger.debug("Negative search: search by random string")
# Search by VFC.
Enter.text_by_id(Constants.Dashboard.Statuses.SearchBox.ID,
diff --git a/services/frontend/fe_detailed_view.py b/services/frontend/fe_detailed_view.py
index bf9a5bf..556e7a7 100644
--- a/services/frontend/fe_detailed_view.py
+++ b/services/frontend/fe_detailed_view.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -40,7 +40,7 @@ import time
from selenium.webdriver.support.ui import Select
-from services.constants import Constants
+from services.constants import Constants, ServiceProvider
from services.database.db_general import DBGeneral
from services.frontend.base_actions.click import Click
from services.frontend.base_actions.enter import Enter
@@ -237,7 +237,7 @@ class FEDetailedView:
session.ice_driver.find_element_by_name("extRefID").click()
Enter.text_by_name("extRefID", Helper.rand_string("randomNumber"))
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text("AT&T")
+ Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text(ServiceProvider.MainServiceProvider)
Click.id(Constants.Dashboard.DetailedView.VFC.Save_button.ID)
return vfcName
diff --git a/services/frontend/fe_general.py b/services/frontend/fe_general.py
index c6832cb..b3f97b4 100644
--- a/services/frontend/fe_general.py
+++ b/services/frontend/fe_general.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -54,6 +54,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEGeneral(Helper):
@staticmethod
@@ -103,16 +104,32 @@ class FEGeneral(Helper):
@staticmethod
def refresh():
- try: # Click on element in UI, by CSS locator.
+ try:
session.ice_driver.refresh()
Wait.page_has_loaded()
- # If failed - count the failure and add the error to list of errors.
except Exception as e:
errorMsg = "Could not refresh the page."
logger.error(errorMsg)
raise Exception(errorMsg, e)
@staticmethod
+ def smart_refresh():
+ session.ice_driver.refresh()
+ i = 0
+ success = False
+ while not success and i < 2:
+ try:
+ Wait.page_has_loaded()
+ success = True
+ break
+ except:
+ i += 1
+ time.sleep(1)
+ pass
+ if not success:
+ raise Exception("Failed to wait for refresh")
+
+ @staticmethod
def select_vendor_from_list(vendor):
Wait.name(Constants.Signup.Company.NAME)
Select(session.ice_driver.find_element_by_name(
diff --git a/services/frontend/fe_invite.py b/services/frontend/fe_invite.py
index 405581c..cb84262 100644
--- a/services/frontend/fe_invite.py
+++ b/services/frontend/fe_invite.py
@@ -157,7 +157,8 @@ class FEInvite:
user_content['el_email'] = engLeadEmail
uuid = DBGeneral.select_where_email(
"uuid", "ice_user_profile", user_content['email'])
- sponsor = ["AT&T", 'aaaaaa', inviteEmail, '3058000000']
+ sponsor = [ServiceProvider.MainServiceProvider, 'aaaaaa', inviteEmail,
+ '3058000000']
invitation_token = DBUser.select_invitation_token(
"invitation_token", "ice_invitation", "engagement_uuid", engagement_id, inviteEmail, 1)
signUpURLforContact = DBUser.get_contact_signup_url(
@@ -181,7 +182,8 @@ class FEInvite:
@staticmethod
def invite_x_users_and_verify_VF_appers_for_invited(user_content, engName):
- inviteEmail = Helper.rand_string('randomString') + "@intl." + ServiceProvider.email
+ inviteEmail = Helper.rand_string('randomString') + "@" \
+ + ServiceProvider.email
vflist = FEInvite.create_x_vfs(user_content, engName, x=3)
for vfName in vflist:
# Fetch one AT&T user ID.
@@ -192,7 +194,8 @@ class FEInvite:
engName = engagement_manual_id + ": " + vfName
vf_left_nav_id = "clickable-" + engName
Click.id(vf_left_nav_id)
- FEWizard.invite_team_members_modal(inviteEmail)
+ FEWizard.invite_team_members_modal(inviteEmail,
+ wait_modal_to_disappear=False)
FEGeneral.refresh()
# validations
FEInvite.validations_for_user2(user_content, inviteEmail, vflist)
diff --git a/services/frontend/fe_overview.py b/services/frontend/fe_overview.py
index 8d05f0c..3764e2c 100644
--- a/services/frontend/fe_overview.py
+++ b/services/frontend/fe_overview.py
@@ -58,6 +58,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEOverview:
@staticmethod
@@ -65,9 +66,9 @@ class FEOverview:
vfFullName = user_content[
'engagement_manual_id'] + ": " + user_content['vfName']
Enter.text_by_id(Constants.Dashboard.LeftPanel.SearchBox.ID, user_content[
- 'vfName'])
+ 'vfName'], True)
Click.id(Constants.Dashboard.LeftPanel.SearchBox.Results.ID %
- user_content['vfName'])
+ user_content['vfName'], True)
Wait.text_by_id(
Constants.Dashboard.Overview.Title.ID, vfFullName)
diff --git a/services/frontend/fe_user.py b/services/frontend/fe_user.py
index 91cf7eb..eb25d23 100644
--- a/services/frontend/fe_user.py
+++ b/services/frontend/fe_user.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -55,6 +55,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEUser:
@staticmethod
@@ -343,24 +344,24 @@ class FEUser:
def validate_user_profile_settings_checkboxes(checked):
Wait.page_has_loaded()
receive_emails = Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, True)
Helper.internal_assert(receive_emails, checked)
receive_notifications = \
Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID, True)
receive_email_every_time = \
Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID, True)
Helper.internal_assert(receive_email_every_time, checked)
receive_digest_email = \
Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, True)
Helper.internal_assert(receive_digest_email, not checked)
@staticmethod
def compare_notifications_count_for_user(expected_count):
Wait.text_by_id(
- Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, wait_for_page=True)
+ Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, True)
@staticmethod
def check_notification_number_is_not_presented():
diff --git a/services/frontend/fe_wizard.py b/services/frontend/fe_wizard.py
index 777fe52..df5087d 100644
--- a/services/frontend/fe_wizard.py
+++ b/services/frontend/fe_wizard.py
@@ -51,6 +51,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEWizard:
E2Edate = None
@@ -126,7 +127,7 @@ class FEWizard:
Enter.text_by_name("phone", phone)
Click.css(Constants.SubmitButton.CSS)
Wait.name_to_dissappear("Add AT&T Sponsor")
- sponsor = {"company": "AT&T", "full_name": fullname,
+ sponsor = {"company": ServiceProvider.MainServiceProvider, "full_name": fullname,
"email": email, "phone": phone}
return sponsor
@@ -179,7 +180,7 @@ class FEWizard:
raise Exception(errorMsg)
@staticmethod
- def invite_team_members_modal(email):
+ def invite_team_members_modal(email, wait_modal_to_disappear=True):
try:
Click.id(
Constants.Dashboard.Overview.TeamMember.ID, wait_for_page=True)
@@ -188,11 +189,9 @@ class FEWizard:
Enter.text_by_name("email", email)
Wait.text_by_css(
Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT)
- Wait.css_to_dissappear(
- '.inviteMembers-form button[disabled="disabled"].btn.btn-primary')
- Wait.css(".inviteMembers-form button.btn.btn-primary")
- Click.css(".inviteMembers-form button.btn.btn-primary")
- Wait.modal_to_dissappear()
+ Click.css(".inviteMembers-form button.btn.btn-primary", True)
+ if wait_modal_to_disappear:
+ Wait.modal_to_dissappear()
# If failed - count the failure and add the error to list of errors.
except Exception as e:
errorMsg = "FAILED in PopUp Invite Team Members. Exception=" + \
diff --git a/services/helper.py b/services/helper.py
index 58769fe..418b260 100644
--- a/services/helper.py
+++ b/services/helper.py
@@ -1,5 +1,5 @@
-# ============LICENSE_START==========================================
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -54,157 +54,166 @@ from services.logging_service import LoggingServiceFactory
from utils.authentication import JWTAuthentication
-logger = LoggingServiceFactory.get_logger()
-
-class Helper:
-
- tc = unittest.TestCase('__init__')
-
- @staticmethod
- def rand_string(type_of_value='randomString', numberOfDigits=0):
- letters_and_numbers = string.ascii_letters + string.digits
- if type_of_value == 'email':
- myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
- 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
- return "ST" + myEmail
- elif type_of_value == 'randomNumber':
- randomNumber = ''.join("%s" % random.randint(2, 9)
- for _ in range(0, (numberOfDigits + 1)))
- return randomNumber
- elif type_of_value == 'randomString':
- randomString = "".join(random.sample(letters_and_numbers, 5))
- return "ST" + randomString
- else:
- raise Exception("invalid rand type")
-
- @staticmethod
- def rand_invite_email():
- inviteEmail = "automationqatt" + \
- Helper.rand_string("randomString") + "@gmail.com"
- return inviteEmail
-
- @staticmethod
- def generate_sshpub_key():
- try:
- logger.debug("About to generate SSH Public Key")
- key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- backend=default_backend()
- )
- private_key = key.private_bytes(
- encoding=crypto_serialization.Encoding.PEM,
- format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=crypto_serialization.NoEncryption())
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- logger.debug("Generated SSH Public Key: " + public_key)
- except Exception as e: # If failed write to log the error and return 'None'.
- logger.error(e)
- logger.error("Failed to generate SSH Public Key.")
- raise e
- return public_key
-
- @staticmethod
- def check_admin_ssh_existence(path, admin_ssh):
- if admin_ssh == open(path).read().rstrip('\n'):
- logger.debug(
- "Admin SSH already defined in DB and equal to the one stored on the local system.")
- return True
- return False
-
- @staticmethod
- def get_or_create_rsa_key_for_admin():
- try: # Create pair of keys for the given user and return his public key.
- ssh_folder = Constants.Paths.SSH.PATH
- public_file = ssh_folder + "id_rsa.pub"
- privateFile = ssh_folder + "id_rsa"
- admin_ssh_exist_and_equal = False
- admin_ssh = None
- if not os.path.exists(ssh_folder):
- os.makedirs(ssh_folder)
- elif os.path.exists(public_file):
- admin_ssh = DBUser.retrieve_admin_ssh_from_db()
- admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence(
- public_file, admin_ssh)
- # TODO find pending gitlab bug causing old ssh key not be updated in gitlab cache
- if False and admin_ssh_exist_and_equal:
- return admin_ssh
- else:
- logger.debug("Private key file: " + privateFile +
- "\nPublice key file: " + public_file)
- key = rsa.generate_private_key(
- backend=crypto_default_backend(),
- public_exponent=65537,
- key_size=2048
- )
- private_key = key.private_bytes(
- crypto_serialization.Encoding.PEM,
- crypto_serialization.PrivateFormat.PKCS8,
- crypto_serialization.NoEncryption()).decode("utf-8")
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- with open(privateFile, 'w') as content_file:
- os.chmod(privateFile, 0o600)
- content_file.write(private_key) # Save private key to file.
- logger.debug(
- "Private key created successfully for admin")
- user_pub_key = public_key
- with open(public_file, 'w') as content_file:
- content_file.write(public_key) # Save public key to file.
- logger.debug("Public key created successfully for admin")
- cmd = 'ssh-keyscan ' + \
- settings.GITLAB_URL[7:-1] + ' >> ' + \
- ssh_folder + 'known_hosts'
- # Create known_hosts file and add GitLab to it.
- subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- logger.debug("Added GitLab to known_hosts")
- return user_pub_key
- except Exception as error:
- logger.error(
- "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
- raise Exception("Failed to create SSH keys for user admin", error)
-
- @staticmethod
- def internal_assert(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_assert_boolean(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- return True
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_not_equal(x, y):
- try:
- Helper.tc.assertNotEqual(x, y)
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def get_reset_passw_url(email):
- jwtObj = JWTAuthentication()
- token = jwtObj.createPersonalTokenWithExpiration(email)
- resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token
- return resetPasswURL
-
- @staticmethod
- def assertTrue(expr, msg=None):
- """Check that the expression is true."""
- if expr != True:
- raise Exception("AssertionError: \"not expr")
+logger = LoggingServiceFactory.get_logger()
+
+
+class Helper:
+
+ tc = unittest.TestCase('__init__')
+
+ @staticmethod
+ def rand_string(type_of_value='randomString', numberOfDigits=0):
+ letters_and_numbers = string.ascii_letters + string.digits
+ if type_of_value == 'email':
+ myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
+ 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
+ return "ST" + myEmail
+ elif type_of_value == 'randomNumber':
+ randomNumber = ''.join("%s" % random.randint(2, 9)
+ for _ in range(0, (numberOfDigits + 1)))
+ return randomNumber
+ elif type_of_value == 'randomString':
+ randomString = "".join(random.sample(letters_and_numbers, 5))
+ return "ST" + randomString
+ else:
+ raise Exception("invalid rand type")
+
+ @staticmethod
+ def rand_invite_email():
+ inviteEmail = "automationqatt" + \
+ Helper.rand_string("randomString") + "@gmail.com"
+ return inviteEmail
+
+ @staticmethod
+ def generate_sshpub_key():
+ try:
+ logger.debug("About to generate SSH Public Key")
+ key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ private_key = key.private_bytes(
+ encoding=crypto_serialization.Encoding.PEM,
+ format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=crypto_serialization.NoEncryption())
+ public_key = key.public_key().public_bytes(
+ crypto_serialization.Encoding.OpenSSH,
+ crypto_serialization.PublicFormat.OpenSSH
+ ).decode("utf-8")
+
+ logger.debug("Generated SSH Public Key: " + public_key)
+ # If failed write to log the error and return 'None'.
+ except Exception as e:
+ logger.error(e)
+ logger.error("Failed to generate SSH Public Key.")
+ raise e
+ return public_key
+
+ @staticmethod
+ def check_admin_ssh_existence(path, admin_ssh):
+ if admin_ssh == open(path).read().rstrip('\n'):
+ logger.debug(
+ "Admin SSH already defined in DB and equal to the one stored on the local system.")
+ return True
+ return False
+
+ @staticmethod
+ def get_or_create_rsa_key_for_admin():
+ # Create pair of keys for the given user and return his public key.
+ try:
+ ssh_folder = Constants.Paths.SSH.PATH
+ public_file = ssh_folder + "id_rsa.pub"
+ privateFile = ssh_folder + "id_rsa"
+ admin_ssh_exist_and_equal = False
+ admin_ssh = None
+ if not os.path.exists(ssh_folder):
+ os.makedirs(ssh_folder)
+ elif os.path.exists(public_file):
+ admin_ssh = DBUser.retrieve_admin_ssh_from_db()
+ admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence(
+ public_file, admin_ssh)
+ # TODO find pending gitlab bug causing old ssh key not be updated
+ # in gitlab cache
+ if False and admin_ssh_exist_and_equal:
+ return admin_ssh
+ else:
+ logger.debug("Private key file: " + privateFile +
+ "\nPublice key file: " + public_file)
+ key = rsa.generate_private_key(
+ backend=crypto_default_backend(),
+ public_exponent=65537,
+ key_size=2048
+ )
+ private_key = key.private_bytes(
+ crypto_serialization.Encoding.PEM,
+ crypto_serialization.PrivateFormat.PKCS8,
+ crypto_serialization.NoEncryption()).decode("utf-8")
+ public_key = key.public_key().public_bytes(
+ crypto_serialization.Encoding.OpenSSH,
+ crypto_serialization.PublicFormat.OpenSSH
+ ).decode("utf-8")
+
+ with open(privateFile, 'w') as content_file:
+ os.chmod(privateFile, 0o600)
+ # Save private key to file.
+ content_file.write(private_key)
+ logger.debug(
+ "Private key created successfully for admin")
+ user_pub_key = public_key
+ with open(public_file, 'w') as content_file:
+ content_file.write(public_key) # Save public key to file.
+ logger.debug("Public key created successfully for admin")
+ cmd = 'ssh-keyscan ' + \
+ settings.GITLAB_URL[7:-1] + ' >> ' + \
+ ssh_folder + 'known_hosts'
+ # Create known_hosts file and add GitLab to it.
+ subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
+ logger.debug("Added GitLab to known_hosts")
+ return user_pub_key
+ except Exception as error:
+ logger.error(
+ "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
+ raise Exception("Failed to create SSH keys for user admin", error)
+
+ @staticmethod
+ def internal_assert(x, y):
+ try:
+ Helper.tc.assertEqual(str(x), str(y))
+ except Exception as e:
+ raise Exception("AssertionError: \"" + str(x) +
+ "\" != \"" + str(y) + "\"", e)
+
+ @staticmethod
+ def internal_assert_boolean(x, y):
+ try:
+ Helper.tc.assertEqual(str(x), str(y))
+ return True
+ except Exception as e:
+ raise Exception("AssertionError: \"" + str(x) +
+ "\" != \"" + str(y) + "\"", e)
+
+ @staticmethod
+ def internal_assert_boolean_true_false(x, y):
+ return Helper.tc.assertEqual(str(x), str(y))
+
+ @staticmethod
+ def internal_not_equal(x, y):
+ try:
+ Helper.tc.assertNotEqual(x, y)
+ except Exception as e:
+ raise Exception("AssertionError: \"" + str(x) +
+ "\" != \"" + str(y) + "\"", e)
+
+ @staticmethod
+ def get_reset_passw_url(email):
+ jwtObj = JWTAuthentication()
+ token = jwtObj.createPersonalTokenWithExpiration(email)
+ resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token
+ return resetPasswURL
+
+ @staticmethod
+ def assertTrue(expr, msg=None):
+ """Check that the expression is true."""
+ if expr != True:
+ raise Exception("AssertionError: \"not expr")