diff options
author | Edan Binshtok <eb578m@intl.att.com> | 2017-10-18 07:56:58 +0300 |
---|---|---|
committer | Edan Binshtok <eb578m@intl.att.com> | 2017-10-18 07:56:58 +0300 |
commit | 433a8256e31f755f5e236491bbe39d3db24d6d6d (patch) | |
tree | 45f483eab1ea1654ee21a3b51c5b8bf1a8ebaffa /services | |
parent | f8907f0c4fc0ba4bb97a1d636a50c5b40c2642f2 (diff) |
Align CI test test and JJB
Add vendor agnostic CI test to align
Add Tox and maven docker
Issue Id: VVP-15
Change-Id: I69f0c1036e6f72b62bddc822544c55200af7b37d
Signed-off-by: Edan Binshtok <eb578m@intl.att.com>
Diffstat (limited to 'services')
-rw-r--r-- | services/api/api_checklist.py | 22 | ||||
-rw-r--r-- | services/api/api_gitlab.py | 52 | ||||
-rw-r--r-- | services/api/api_jenkins.py | 23 | ||||
-rw-r--r-- | services/constants.py | 27 | ||||
-rw-r--r-- | services/database/db_checklist.py | 18 | ||||
-rw-r--r-- | services/frontend/base_actions/get.py | 22 | ||||
-rw-r--r-- | services/frontend/base_actions/wait.py | 19 | ||||
-rw-r--r-- | services/frontend/fe_checklist.py | 3 | ||||
-rw-r--r-- | services/frontend/fe_dashboard.py | 10 | ||||
-rw-r--r-- | services/frontend/fe_detailed_view.py | 8 | ||||
-rw-r--r-- | services/frontend/fe_general.py | 25 | ||||
-rw-r--r-- | services/frontend/fe_invite.py | 9 | ||||
-rw-r--r-- | services/frontend/fe_overview.py | 5 | ||||
-rw-r--r-- | services/frontend/fe_user.py | 15 | ||||
-rw-r--r-- | services/frontend/fe_wizard.py | 13 | ||||
-rw-r--r-- | services/helper.py | 319 |
16 files changed, 328 insertions, 262 deletions
diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py index ef7b8a3..fda0730 100644 --- a/services/api/api_checklist.py +++ b/services/api/api_checklist.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -47,6 +47,7 @@ from services.constants import Constants from services.database.db_general import DBGeneral from services.helper import Helper from services.logging_service import LoggingServiceFactory +from services.database.db_checklist import DBChecklist logger = LoggingServiceFactory.get_logger() @@ -208,12 +209,21 @@ class APIChecklist: @staticmethod def move_cl_to_closed(cl_uuid, vf_staff_emails): api_checklist_obj = APIChecklist() - + states = [Constants.ChecklistStates.PeerReview.TEXT, + Constants.ChecklistStates.Approval.TEXT, + Constants.ChecklistStates.Handoff.TEXT, + Constants.ChecklistStates.Closed.TEXT] for i in range(len(vf_staff_emails)): - logger.debug("Trying to jump state for %s [%s]" % (vf_staff_emails[i], i)) + logger.debug( + "Trying to jump state for %s [%s]" % (vf_staff_emails[i], i)) + DBChecklist.update_all_decisions_to_approve(cl_uuid) api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i]) + logger.debug("Checking state changed to %s" % states[i]) + DBChecklist.state_changed("uuid", cl_uuid, states[i]) # Move CL to closed state. - logger.debug("Trying to jump state 'closed' for %s" % vf_staff_emails[0]) + logger.debug("Trying to jump state 'closed' for %s" % + vf_staff_emails[0]) api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[0]) - + logger.debug("Checking state changed to %s" % states[-1]) + DBChecklist.state_changed("uuid", cl_uuid, states[-1]) diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py index c7b25e0..6c5a2ff 100644 --- a/services/api/api_gitlab.py +++ b/services/api/api_gitlab.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -56,6 +56,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class APIGitLab: @staticmethod @@ -79,13 +80,13 @@ class APIGitLab: headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN try: r1 = requests.get(getURL, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) counter = 0 - while r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + while r1.status_code == 404 or r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause) r1 = requests.get(getURL, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - + logger.debug( + "trying to get the git project, yet to succeed (try #%s)" % counter) + counter += 1 if r1.content == b'[]': logger.error("Got an empty list as a response.") raise @@ -363,32 +364,15 @@ class APIGitLab: @staticmethod def is_gitlab_ready(user_content): - counter = 1 - gettURL = settings.ICE_EM_URL + '/v1/engmgr/engagement/' + \ - user_content['engagement_uuid'] + '/checklist/new/' - logger.debug( - "Get URL to check if GitLab and Jenkins are ready: " + gettURL) - # Validate with EL - token = "token " + APIBridge.login_user(user_content['el_email']) - headers = dict() # Create header for get request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = token - r1 = requests.get(gettURL, headers=headers, verify=False) - while (r1.content == b'"Create New checklist is not ready yet"' and counter <= - Constants.GitLabConstants.RETRIES_NUMBER): - time.sleep(session.wait_until_time_pause_long) - logger.debug( - "GitLab and Jenkins are not ready yet, trying again (%s of %s)" % - (counter, Constants.GitLabConstants.RETRIES_NUMBER)) - r1 = requests.get(gettURL, headers=headers, verify=False) - counter += 1 - if r1.status_code != 200: - if r1.content == "Create New checklist is not ready yet": - raise Exception("Max retries exceeded, failing test...") - else: - raise Exception("Something went wrong while waiting for GitLab and Jenkins. %s %s" % ( - r1.status_code, r1.reason)) - return False - elif r1.status_code == 200: - logger.debug("Gitlab and Jenkins are ready to continue!") + path_with_namespace = user_content[ + 'engagement_manual_id'] + "%2F" + user_content['vfName'] + # If admin user is in project repo, it means the project exists as + # well. + cont = APIGitLab.validate_git_project_members( + path_with_namespace, Constants.Users.Admin.EMAIL) + if cont: + logger.debug("Gitlab is ready for action, git repo was found!") return True + else: + raise Exception( + "Something went wrong while waiting for GitLab.") diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py index e1e1f6e..b63cb66 100644 --- a/services/api/api_jenkins.py +++ b/services/api/api_jenkins.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -37,26 +37,36 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. from django.conf import settings -import requests from requests.auth import HTTPBasicAuth - from services.constants import Constants from services.helper import Helper -from services.logging_service import LoggingServiceFactory +import logging +import requests +import time +from services.session import session + +logger = logging.getLogger('ice-ci.logger') -logger = LoggingServiceFactory.get_logger() class APIJenkins: @staticmethod def get_jenkins_job(job_name): r1 = None + counter = 0 getURL = settings.JENKINS_URL + "job/" + job_name logger.debug("Get APIJenkins job URL: " + getURL) try: r1 = requests.get(getURL, auth=HTTPBasicAuth( settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD)) + while r1.status_code != 200 and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + r1 = requests.get(getURL, auth=HTTPBasicAuth( + settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD)) + time.sleep(session.wait_until_time_pause) + logger.debug( + "try to get jenkins job (try #%s)" % counter) + counter += 1 Helper.internal_assert(r1.status_code, 200) logger.debug("Job was created on APIJenkins!") except: @@ -78,4 +88,3 @@ class APIJenkins: if Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.BUILD_IDENTIFIER in line: parts = line.partition('jenkins') return parts[2] - diff --git a/services/constants.py b/services/constants.py index 0be529f..23d7ef1 100644 --- a/services/constants.py +++ b/services/constants.py @@ -42,7 +42,7 @@ from django.conf import settings class ServiceProvider: PROGRAM_NAME = "VVP" MainServiceProvider = "ServiceProvider" - email = "example.com" + email = "example-domain.com" class Constants: @@ -89,9 +89,21 @@ class Constants: class Review: TEXT = "review" + class PeerReview: + TEXT = "peer_review" + + class Approval: + TEXT = "approval" + + class Handoff: + TEXT = "handoff" + class Archive: TEXT = "archive" + class Closed: + TEXT = "closed" + class FEConstants: RETRIES_NUMBER = 120 @@ -108,14 +120,15 @@ class Constants: class Users: class Admin: - EMAIL = "admin@example.com" + EMAIL = "admin@" + ServiceProvider.email FULLNAME = "admin bogus user" class AdminRO: - EMAIL = "admin_ro@example.com" + EMAIL = "admin_ro@" + ServiceProvider.email class LongEmailLengthStandardUser: - EMAIL = "50charslengthemailofstandarduserforinvite@example.com" + EMAIL = "50charslengthemailofstandardus@" + \ + ServiceProvider.email class Toast: ID = "toast-successfully-message" @@ -745,9 +758,9 @@ class Constants: class FilterByFileDropDown: ID = "selected-file-filter-dropdown" ANY_FILE_LINK_TEXT = "Any file" - FILE0_LINK_TEXT = "file0.yaml" - FILE1_LINK_TEXT = "file1.yaml" - FILE2_LINK_TEXT = "file2.yaml" + FILE0_LINK_TEXT = "file0" + FILE1_LINK_TEXT = "file1" + FILE2_LINK_TEXT = "file2" class StateDropDown: ID = "selected-state-filter-dropdown" diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py index 15851cf..04f8a44 100644 --- a/services/database/db_checklist.py +++ b/services/database/db_checklist.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -50,6 +50,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class DBChecklist: @staticmethod @@ -361,16 +362,18 @@ class DBChecklist: @staticmethod def state_changed(identify_field, field_value, expected_state): - get_state = str(DBGeneral.select_where( - "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1)) + get_state = DBGeneral.select_where_order_by_desc( + "state", Constants.DBConstants.IceTables.CHECKLIST, + identify_field, field_value, "create_time")[0] counter = 0 while get_state != expected_state and counter <= Constants.DBConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause_long) logger.debug("Checklist state not changed yet , expecting state: %s, current result: %s (attempt %s of %s)" % ( expected_state, get_state, counter, Constants.DBConstants.RETRIES_NUMBER)) counter += 1 - get_state = str(DBGeneral.select_where( - "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1)) + get_state = DBGeneral.select_where_order_by_desc( + "state", Constants.DBConstants.IceTables.CHECKLIST, + identify_field, field_value, "create_time")[0] if get_state == expected_state: logger.debug("Checklist state was successfully changed into: " + @@ -382,5 +385,6 @@ class DBChecklist: @staticmethod def get_recent_checklist_uuid(name): required_uuid = DBGeneral.select_where_not_and_order_by_desc( - 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time') + 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name, + 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time') return required_uuid diff --git a/services/frontend/base_actions/get.py b/services/frontend/base_actions/get.py index 8735c1b..5fb801a 100644 --- a/services/frontend/base_actions/get.py +++ b/services/frontend/base_actions/get.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -37,6 +37,7 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. from services.frontend.base_actions.wait import Wait +from services.helper import Helper from services.session import session @@ -98,8 +99,10 @@ class Get: raise Exception(errorMsg, attr_name_value) @staticmethod - def value_by_name(attr_name_value): + def value_by_name(attr_name_value, wait_for_page=False): try: + if wait_for_page: + Wait.page_has_loaded() Wait.name(attr_name_value) return session.ice_driver.find_element_by_name(attr_name_value).get_attribute("value") except Exception as e: @@ -125,3 +128,16 @@ class Get: except Exception as e: errorMsg = "Failed to get if it's selected by id:" + attr_id_value raise Exception(errorMsg, attr_id_value) + + @staticmethod + def is_checkbox_selected_by_id(attr_id_value, wait_for_page=False): + try: + if wait_for_page: + Wait.page_has_loaded() + Wait.id(attr_id_value) + return Helper.internal_assert_boolean_true_false( + session.ice_driver.find_element_by_id( + attr_id_value).get_attribute("value"), "on") + except Exception as e: + errorMsg = "Failed to get if it's selected by id:" + attr_id_value + raise Exception(errorMsg, attr_id_value) diff --git a/services/frontend/base_actions/wait.py b/services/frontend/base_actions/wait.py index 50eff08..a699917 100644 --- a/services/frontend/base_actions/wait.py +++ b/services/frontend/base_actions/wait.py @@ -196,17 +196,18 @@ class Wait: @staticmethod def page_has_loaded(): - countwait_untilelement_to_be_presented_by_id = 0 for _ in range(Constants.FEConstants.RETRIES_NUMBER): - httpRequests = session.ice_driver.execute_script( - 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;') - if(str(httpRequests) == "0"): + try: + httpRequests = session.ice_driver.execute_script( + 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;') + if(str(httpRequests) == "0"): + time.sleep(session.wait_until_time_pause) + return + logger.debug( + "Checking if {} page is loaded. ".format(session.ice_driver.current_url)) time.sleep(session.wait_until_time_pause) - return - logger.debug( - "Checking if {} page is loaded. ".format(session.ice_driver.current_url)) - time.sleep(session.wait_until_time_pause) - countwait_untilelement_to_be_presented_by_id += 1 + except Exception as exception: + continue raise Exception("Page loading took too much time") diff --git a/services/frontend/fe_checklist.py b/services/frontend/fe_checklist.py index 75f957a..3afc472 100644 --- a/services/frontend/fe_checklist.py +++ b/services/frontend/fe_checklist.py @@ -600,7 +600,7 @@ class FEChecklist: if checklist_uuid is None: checklist_uuid = DBGeneral.select_where_not_and_order_by_desc( 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', checklistName, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')[0] - Click.id("checklist-" + checklist_uuid) + Click.id("checklist-" + checklist_uuid, True) @staticmethod def validate_reject_is_enabled(): @@ -752,4 +752,3 @@ class FEChecklist: "Jenkins log could not be viewed.") Click.id(Constants.Dashboard.Modal.CLOSE_BUTTON_ID) return log - diff --git a/services/frontend/fe_dashboard.py b/services/frontend/fe_dashboard.py index 0df66d4..b8d51b5 100644 --- a/services/frontend/fe_dashboard.py +++ b/services/frontend/fe_dashboard.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -221,21 +221,21 @@ class FEDashboard: eng_manual_id = user_content['engagement_manual_id'] + ":" Wait.text_by_id(engSearchID, eng_manual_id) logger.debug("Engagement found (searched by engagement_manual_id)") - FEGeneral.refresh() + FEGeneral.smart_refresh() logger.debug("Search engagement by VF name") # Search by VF name. Enter.text_by_id( Constants.Dashboard.Statuses.SearchBox.ID, user_content['vfName']) Wait.text_by_id(engSearchID, eng_manual_id) logger.debug("Engagement found (searched by VF name)") - FEGeneral.refresh() + FEGeneral.smart_refresh() logger.debug("Search engagement by VFC") # Search by VFC. Enter.text_by_id( Constants.Dashboard.Statuses.SearchBox.ID, vfcName) Wait.text_by_id(engSearchID, eng_manual_id) logger.debug("Engagement found (searched by VFC)") - FEGeneral.refresh() + FEGeneral.smart_refresh() logger.debug("Negative search: search by random string") # Search by VFC. Enter.text_by_id(Constants.Dashboard.Statuses.SearchBox.ID, diff --git a/services/frontend/fe_detailed_view.py b/services/frontend/fe_detailed_view.py index bf9a5bf..556e7a7 100644 --- a/services/frontend/fe_detailed_view.py +++ b/services/frontend/fe_detailed_view.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -40,7 +40,7 @@ import time from selenium.webdriver.support.ui import Select -from services.constants import Constants +from services.constants import Constants, ServiceProvider from services.database.db_general import DBGeneral from services.frontend.base_actions.click import Click from services.frontend.base_actions.enter import Enter @@ -237,7 +237,7 @@ class FEDetailedView: session.ice_driver.find_element_by_name("extRefID").click() Enter.text_by_name("extRefID", Helper.rand_string("randomNumber")) Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text("AT&T") + Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text(ServiceProvider.MainServiceProvider) Click.id(Constants.Dashboard.DetailedView.VFC.Save_button.ID) return vfcName diff --git a/services/frontend/fe_general.py b/services/frontend/fe_general.py index c6832cb..b3f97b4 100644 --- a/services/frontend/fe_general.py +++ b/services/frontend/fe_general.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -54,6 +54,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEGeneral(Helper): @staticmethod @@ -103,16 +104,32 @@ class FEGeneral(Helper): @staticmethod def refresh(): - try: # Click on element in UI, by CSS locator. + try: session.ice_driver.refresh() Wait.page_has_loaded() - # If failed - count the failure and add the error to list of errors. except Exception as e: errorMsg = "Could not refresh the page." logger.error(errorMsg) raise Exception(errorMsg, e) @staticmethod + def smart_refresh(): + session.ice_driver.refresh() + i = 0 + success = False + while not success and i < 2: + try: + Wait.page_has_loaded() + success = True + break + except: + i += 1 + time.sleep(1) + pass + if not success: + raise Exception("Failed to wait for refresh") + + @staticmethod def select_vendor_from_list(vendor): Wait.name(Constants.Signup.Company.NAME) Select(session.ice_driver.find_element_by_name( diff --git a/services/frontend/fe_invite.py b/services/frontend/fe_invite.py index 405581c..cb84262 100644 --- a/services/frontend/fe_invite.py +++ b/services/frontend/fe_invite.py @@ -157,7 +157,8 @@ class FEInvite: user_content['el_email'] = engLeadEmail uuid = DBGeneral.select_where_email( "uuid", "ice_user_profile", user_content['email']) - sponsor = ["AT&T", 'aaaaaa', inviteEmail, '3058000000'] + sponsor = [ServiceProvider.MainServiceProvider, 'aaaaaa', inviteEmail, + '3058000000'] invitation_token = DBUser.select_invitation_token( "invitation_token", "ice_invitation", "engagement_uuid", engagement_id, inviteEmail, 1) signUpURLforContact = DBUser.get_contact_signup_url( @@ -181,7 +182,8 @@ class FEInvite: @staticmethod def invite_x_users_and_verify_VF_appers_for_invited(user_content, engName): - inviteEmail = Helper.rand_string('randomString') + "@intl." + ServiceProvider.email + inviteEmail = Helper.rand_string('randomString') + "@" \ + + ServiceProvider.email vflist = FEInvite.create_x_vfs(user_content, engName, x=3) for vfName in vflist: # Fetch one AT&T user ID. @@ -192,7 +194,8 @@ class FEInvite: engName = engagement_manual_id + ": " + vfName vf_left_nav_id = "clickable-" + engName Click.id(vf_left_nav_id) - FEWizard.invite_team_members_modal(inviteEmail) + FEWizard.invite_team_members_modal(inviteEmail, + wait_modal_to_disappear=False) FEGeneral.refresh() # validations FEInvite.validations_for_user2(user_content, inviteEmail, vflist) diff --git a/services/frontend/fe_overview.py b/services/frontend/fe_overview.py index 8d05f0c..3764e2c 100644 --- a/services/frontend/fe_overview.py +++ b/services/frontend/fe_overview.py @@ -58,6 +58,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEOverview: @staticmethod @@ -65,9 +66,9 @@ class FEOverview: vfFullName = user_content[ 'engagement_manual_id'] + ": " + user_content['vfName'] Enter.text_by_id(Constants.Dashboard.LeftPanel.SearchBox.ID, user_content[ - 'vfName']) + 'vfName'], True) Click.id(Constants.Dashboard.LeftPanel.SearchBox.Results.ID % - user_content['vfName']) + user_content['vfName'], True) Wait.text_by_id( Constants.Dashboard.Overview.Title.ID, vfFullName) diff --git a/services/frontend/fe_user.py b/services/frontend/fe_user.py index 91cf7eb..eb25d23 100644 --- a/services/frontend/fe_user.py +++ b/services/frontend/fe_user.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -55,6 +55,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEUser: @staticmethod @@ -343,24 +344,24 @@ class FEUser: def validate_user_profile_settings_checkboxes(checked): Wait.page_has_loaded() receive_emails = Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, wait_for_page=True) + Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, True) Helper.internal_assert(receive_emails, checked) receive_notifications = \ Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID) + Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID, True) receive_email_every_time = \ Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID) + Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID, True) Helper.internal_assert(receive_email_every_time, checked) receive_digest_email = \ Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, wait_for_page=True) + Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, True) Helper.internal_assert(receive_digest_email, not checked) @staticmethod def compare_notifications_count_for_user(expected_count): Wait.text_by_id( - Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, wait_for_page=True) + Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, True) @staticmethod def check_notification_number_is_not_presented(): diff --git a/services/frontend/fe_wizard.py b/services/frontend/fe_wizard.py index 777fe52..df5087d 100644 --- a/services/frontend/fe_wizard.py +++ b/services/frontend/fe_wizard.py @@ -51,6 +51,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEWizard: E2Edate = None @@ -126,7 +127,7 @@ class FEWizard: Enter.text_by_name("phone", phone) Click.css(Constants.SubmitButton.CSS) Wait.name_to_dissappear("Add AT&T Sponsor") - sponsor = {"company": "AT&T", "full_name": fullname, + sponsor = {"company": ServiceProvider.MainServiceProvider, "full_name": fullname, "email": email, "phone": phone} return sponsor @@ -179,7 +180,7 @@ class FEWizard: raise Exception(errorMsg) @staticmethod - def invite_team_members_modal(email): + def invite_team_members_modal(email, wait_modal_to_disappear=True): try: Click.id( Constants.Dashboard.Overview.TeamMember.ID, wait_for_page=True) @@ -188,11 +189,9 @@ class FEWizard: Enter.text_by_name("email", email) Wait.text_by_css( Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT) - Wait.css_to_dissappear( - '.inviteMembers-form button[disabled="disabled"].btn.btn-primary') - Wait.css(".inviteMembers-form button.btn.btn-primary") - Click.css(".inviteMembers-form button.btn.btn-primary") - Wait.modal_to_dissappear() + Click.css(".inviteMembers-form button.btn.btn-primary", True) + if wait_modal_to_disappear: + Wait.modal_to_dissappear() # If failed - count the failure and add the error to list of errors. except Exception as e: errorMsg = "FAILED in PopUp Invite Team Members. Exception=" + \ diff --git a/services/helper.py b/services/helper.py index 58769fe..418b260 100644 --- a/services/helper.py +++ b/services/helper.py @@ -1,5 +1,5 @@ -# ============LICENSE_START========================================== +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -54,157 +54,166 @@ from services.logging_service import LoggingServiceFactory from utils.authentication import JWTAuthentication -logger = LoggingServiceFactory.get_logger()
-
-class Helper:
-
- tc = unittest.TestCase('__init__')
-
- @staticmethod
- def rand_string(type_of_value='randomString', numberOfDigits=0):
- letters_and_numbers = string.ascii_letters + string.digits
- if type_of_value == 'email':
- myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
- 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
- return "ST" + myEmail
- elif type_of_value == 'randomNumber':
- randomNumber = ''.join("%s" % random.randint(2, 9)
- for _ in range(0, (numberOfDigits + 1)))
- return randomNumber
- elif type_of_value == 'randomString':
- randomString = "".join(random.sample(letters_and_numbers, 5))
- return "ST" + randomString
- else:
- raise Exception("invalid rand type")
-
- @staticmethod
- def rand_invite_email():
- inviteEmail = "automationqatt" + \
- Helper.rand_string("randomString") + "@gmail.com"
- return inviteEmail
-
- @staticmethod
- def generate_sshpub_key():
- try:
- logger.debug("About to generate SSH Public Key")
- key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- backend=default_backend()
- )
- private_key = key.private_bytes(
- encoding=crypto_serialization.Encoding.PEM,
- format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=crypto_serialization.NoEncryption())
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- logger.debug("Generated SSH Public Key: " + public_key)
- except Exception as e: # If failed write to log the error and return 'None'.
- logger.error(e)
- logger.error("Failed to generate SSH Public Key.")
- raise e
- return public_key
-
- @staticmethod
- def check_admin_ssh_existence(path, admin_ssh):
- if admin_ssh == open(path).read().rstrip('\n'):
- logger.debug(
- "Admin SSH already defined in DB and equal to the one stored on the local system.")
- return True
- return False
-
- @staticmethod
- def get_or_create_rsa_key_for_admin():
- try: # Create pair of keys for the given user and return his public key.
- ssh_folder = Constants.Paths.SSH.PATH
- public_file = ssh_folder + "id_rsa.pub"
- privateFile = ssh_folder + "id_rsa"
- admin_ssh_exist_and_equal = False
- admin_ssh = None
- if not os.path.exists(ssh_folder):
- os.makedirs(ssh_folder)
- elif os.path.exists(public_file):
- admin_ssh = DBUser.retrieve_admin_ssh_from_db()
- admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence(
- public_file, admin_ssh)
- # TODO find pending gitlab bug causing old ssh key not be updated in gitlab cache
- if False and admin_ssh_exist_and_equal:
- return admin_ssh
- else:
- logger.debug("Private key file: " + privateFile +
- "\nPublice key file: " + public_file)
- key = rsa.generate_private_key(
- backend=crypto_default_backend(),
- public_exponent=65537,
- key_size=2048
- )
- private_key = key.private_bytes(
- crypto_serialization.Encoding.PEM,
- crypto_serialization.PrivateFormat.PKCS8,
- crypto_serialization.NoEncryption()).decode("utf-8")
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- with open(privateFile, 'w') as content_file:
- os.chmod(privateFile, 0o600)
- content_file.write(private_key) # Save private key to file.
- logger.debug(
- "Private key created successfully for admin")
- user_pub_key = public_key
- with open(public_file, 'w') as content_file:
- content_file.write(public_key) # Save public key to file.
- logger.debug("Public key created successfully for admin")
- cmd = 'ssh-keyscan ' + \
- settings.GITLAB_URL[7:-1] + ' >> ' + \
- ssh_folder + 'known_hosts'
- # Create known_hosts file and add GitLab to it.
- subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- logger.debug("Added GitLab to known_hosts")
- return user_pub_key
- except Exception as error:
- logger.error(
- "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
- raise Exception("Failed to create SSH keys for user admin", error)
-
- @staticmethod
- def internal_assert(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_assert_boolean(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- return True
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_not_equal(x, y):
- try:
- Helper.tc.assertNotEqual(x, y)
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def get_reset_passw_url(email):
- jwtObj = JWTAuthentication()
- token = jwtObj.createPersonalTokenWithExpiration(email)
- resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token
- return resetPasswURL
-
- @staticmethod
- def assertTrue(expr, msg=None):
- """Check that the expression is true."""
- if expr != True:
- raise Exception("AssertionError: \"not expr")
+logger = LoggingServiceFactory.get_logger() + + +class Helper: + + tc = unittest.TestCase('__init__') + + @staticmethod + def rand_string(type_of_value='randomString', numberOfDigits=0): + letters_and_numbers = string.ascii_letters + string.digits + if type_of_value == 'email': + myEmail = ''.join(random.choice(letters_and_numbers) for _ in range( + 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com" + return "ST" + myEmail + elif type_of_value == 'randomNumber': + randomNumber = ''.join("%s" % random.randint(2, 9) + for _ in range(0, (numberOfDigits + 1))) + return randomNumber + elif type_of_value == 'randomString': + randomString = "".join(random.sample(letters_and_numbers, 5)) + return "ST" + randomString + else: + raise Exception("invalid rand type") + + @staticmethod + def rand_invite_email(): + inviteEmail = "automationqatt" + \ + Helper.rand_string("randomString") + "@gmail.com" + return inviteEmail + + @staticmethod + def generate_sshpub_key(): + try: + logger.debug("About to generate SSH Public Key") + key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + private_key = key.private_bytes( + encoding=crypto_serialization.Encoding.PEM, + format=crypto_serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=crypto_serialization.NoEncryption()) + public_key = key.public_key().public_bytes( + crypto_serialization.Encoding.OpenSSH, + crypto_serialization.PublicFormat.OpenSSH + ).decode("utf-8") + + logger.debug("Generated SSH Public Key: " + public_key) + # If failed write to log the error and return 'None'. + except Exception as e: + logger.error(e) + logger.error("Failed to generate SSH Public Key.") + raise e + return public_key + + @staticmethod + def check_admin_ssh_existence(path, admin_ssh): + if admin_ssh == open(path).read().rstrip('\n'): + logger.debug( + "Admin SSH already defined in DB and equal to the one stored on the local system.") + return True + return False + + @staticmethod + def get_or_create_rsa_key_for_admin(): + # Create pair of keys for the given user and return his public key. + try: + ssh_folder = Constants.Paths.SSH.PATH + public_file = ssh_folder + "id_rsa.pub" + privateFile = ssh_folder + "id_rsa" + admin_ssh_exist_and_equal = False + admin_ssh = None + if not os.path.exists(ssh_folder): + os.makedirs(ssh_folder) + elif os.path.exists(public_file): + admin_ssh = DBUser.retrieve_admin_ssh_from_db() + admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence( + public_file, admin_ssh) + # TODO find pending gitlab bug causing old ssh key not be updated + # in gitlab cache + if False and admin_ssh_exist_and_equal: + return admin_ssh + else: + logger.debug("Private key file: " + privateFile + + "\nPublice key file: " + public_file) + key = rsa.generate_private_key( + backend=crypto_default_backend(), + public_exponent=65537, + key_size=2048 + ) + private_key = key.private_bytes( + crypto_serialization.Encoding.PEM, + crypto_serialization.PrivateFormat.PKCS8, + crypto_serialization.NoEncryption()).decode("utf-8") + public_key = key.public_key().public_bytes( + crypto_serialization.Encoding.OpenSSH, + crypto_serialization.PublicFormat.OpenSSH + ).decode("utf-8") + + with open(privateFile, 'w') as content_file: + os.chmod(privateFile, 0o600) + # Save private key to file. + content_file.write(private_key) + logger.debug( + "Private key created successfully for admin") + user_pub_key = public_key + with open(public_file, 'w') as content_file: + content_file.write(public_key) # Save public key to file. + logger.debug("Public key created successfully for admin") + cmd = 'ssh-keyscan ' + \ + settings.GITLAB_URL[7:-1] + ' >> ' + \ + ssh_folder + 'known_hosts' + # Create known_hosts file and add GitLab to it. + subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) + logger.debug("Added GitLab to known_hosts") + return user_pub_key + except Exception as error: + logger.error( + "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error) + raise Exception("Failed to create SSH keys for user admin", error) + + @staticmethod + def internal_assert(x, y): + try: + Helper.tc.assertEqual(str(x), str(y)) + except Exception as e: + raise Exception("AssertionError: \"" + str(x) + + "\" != \"" + str(y) + "\"", e) + + @staticmethod + def internal_assert_boolean(x, y): + try: + Helper.tc.assertEqual(str(x), str(y)) + return True + except Exception as e: + raise Exception("AssertionError: \"" + str(x) + + "\" != \"" + str(y) + "\"", e) + + @staticmethod + def internal_assert_boolean_true_false(x, y): + return Helper.tc.assertEqual(str(x), str(y)) + + @staticmethod + def internal_not_equal(x, y): + try: + Helper.tc.assertNotEqual(x, y) + except Exception as e: + raise Exception("AssertionError: \"" + str(x) + + "\" != \"" + str(y) + "\"", e) + + @staticmethod + def get_reset_passw_url(email): + jwtObj = JWTAuthentication() + token = jwtObj.createPersonalTokenWithExpiration(email) + resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token + return resetPasswURL + + @staticmethod + def assertTrue(expr, msg=None): + """Check that the expression is true.""" + if expr != True: + raise Exception("AssertionError: \"not expr") |