diff options
29 files changed, 475 insertions, 394 deletions
diff --git a/.maven-dockerignore b/.maven-dockerignore new file mode 100644 index 0000000..52d95d7 --- /dev/null +++ b/.maven-dockerignore @@ -0,0 +1,2 @@ +target/docker/ + diff --git a/iceci/mail.py b/iceci/mail.py index 9f23b90..80660b5 100644 --- a/iceci/mail.py +++ b/iceci/mail.py @@ -50,7 +50,7 @@ from django.conf import settings from django.core.mail import send_mail from django.utils import timezone -from services.constants import Constants +from services.constants import Constants, ServiceProvider from services.logging_service import LoggingServiceFactory @@ -59,15 +59,17 @@ admin_mail_from = settings.ICE_CONTACT_FROM_ADDRESS param = "1" logger = LoggingServiceFactory.get_logger() + def sendMail(param,email, data, mail_body, mail_subject, mail_from=admin_mail_from): logger.debug("about to send mail to " + email) - try: -# lastBuild = param + try: html_msg = mail_body.substitute(data) mail_subject = mail_subject.substitute(data) - #send mail with template - send_mail(mail_subject, '', Constants.FEGeneral.ProgramName.name +"-CI Report Test Team <" + mail_from + ">",settings.ICE_CONTACT_EMAILS , fail_silently=False, html_message=html_msg) + send_mail(mail_subject, '', ServiceProvider.PROGRAM_NAME + + "-CI Report Test Team <" + mail_from + ">", + settings.ICE_CONTACT_EMAILS , fail_silently=False, + html_message=html_msg) logger.debug("Looks like email delivery to "+email+" has succeeded") except Exception: traceback.print_exc() diff --git a/iceci/views.py b/iceci/views.py index 7e0505d..6a5a9b8 100644 --- a/iceci/views.py +++ b/iceci/views.py @@ -47,7 +47,7 @@ from rest_framework.renderers import JSONRenderer from iceci import mail from iceci.mail import testsResults_mail_body -from services.constants import Constants +from services.constants import Constants, ServiceProvider from services.logging_service import LoggingServiceFactory from .models import TestResults @@ -60,7 +60,8 @@ LAST_BUILD_REPORT_NUM = None logger = LoggingServiceFactory.get_logger() def index(request): - return HttpResponse("Hello, world. You're at the "+Constants.FEGeneral.ProgramName.name+" ci index.") + return HttpResponse("Hello, world. You're at the " + + ServiceProvider.PROGRAM_NAME + " ci index.") @csrf_exempt def testResult_list(request): # List all tests, or create a new test. diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py index ef7b8a3..fda0730 100644 --- a/services/api/api_checklist.py +++ b/services/api/api_checklist.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -47,6 +47,7 @@ from services.constants import Constants from services.database.db_general import DBGeneral from services.helper import Helper from services.logging_service import LoggingServiceFactory +from services.database.db_checklist import DBChecklist logger = LoggingServiceFactory.get_logger() @@ -208,12 +209,21 @@ class APIChecklist: @staticmethod def move_cl_to_closed(cl_uuid, vf_staff_emails): api_checklist_obj = APIChecklist() - + states = [Constants.ChecklistStates.PeerReview.TEXT, + Constants.ChecklistStates.Approval.TEXT, + Constants.ChecklistStates.Handoff.TEXT, + Constants.ChecklistStates.Closed.TEXT] for i in range(len(vf_staff_emails)): - logger.debug("Trying to jump state for %s [%s]" % (vf_staff_emails[i], i)) + logger.debug( + "Trying to jump state for %s [%s]" % (vf_staff_emails[i], i)) + DBChecklist.update_all_decisions_to_approve(cl_uuid) api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i]) + logger.debug("Checking state changed to %s" % states[i]) + DBChecklist.state_changed("uuid", cl_uuid, states[i]) # Move CL to closed state. - logger.debug("Trying to jump state 'closed' for %s" % vf_staff_emails[0]) + logger.debug("Trying to jump state 'closed' for %s" % + vf_staff_emails[0]) api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[0]) - + logger.debug("Checking state changed to %s" % states[-1]) + DBChecklist.state_changed("uuid", cl_uuid, states[-1]) diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py index c7b25e0..6c5a2ff 100644 --- a/services/api/api_gitlab.py +++ b/services/api/api_gitlab.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -56,6 +56,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class APIGitLab: @staticmethod @@ -79,13 +80,13 @@ class APIGitLab: headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN try: r1 = requests.get(getURL, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) counter = 0 - while r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + while r1.status_code == 404 or r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause) r1 = requests.get(getURL, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - + logger.debug( + "trying to get the git project, yet to succeed (try #%s)" % counter) + counter += 1 if r1.content == b'[]': logger.error("Got an empty list as a response.") raise @@ -363,32 +364,15 @@ class APIGitLab: @staticmethod def is_gitlab_ready(user_content): - counter = 1 - gettURL = settings.ICE_EM_URL + '/v1/engmgr/engagement/' + \ - user_content['engagement_uuid'] + '/checklist/new/' - logger.debug( - "Get URL to check if GitLab and Jenkins are ready: " + gettURL) - # Validate with EL - token = "token " + APIBridge.login_user(user_content['el_email']) - headers = dict() # Create header for get request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = token - r1 = requests.get(gettURL, headers=headers, verify=False) - while (r1.content == b'"Create New checklist is not ready yet"' and counter <= - Constants.GitLabConstants.RETRIES_NUMBER): - time.sleep(session.wait_until_time_pause_long) - logger.debug( - "GitLab and Jenkins are not ready yet, trying again (%s of %s)" % - (counter, Constants.GitLabConstants.RETRIES_NUMBER)) - r1 = requests.get(gettURL, headers=headers, verify=False) - counter += 1 - if r1.status_code != 200: - if r1.content == "Create New checklist is not ready yet": - raise Exception("Max retries exceeded, failing test...") - else: - raise Exception("Something went wrong while waiting for GitLab and Jenkins. %s %s" % ( - r1.status_code, r1.reason)) - return False - elif r1.status_code == 200: - logger.debug("Gitlab and Jenkins are ready to continue!") + path_with_namespace = user_content[ + 'engagement_manual_id'] + "%2F" + user_content['vfName'] + # If admin user is in project repo, it means the project exists as + # well. + cont = APIGitLab.validate_git_project_members( + path_with_namespace, Constants.Users.Admin.EMAIL) + if cont: + logger.debug("Gitlab is ready for action, git repo was found!") return True + else: + raise Exception( + "Something went wrong while waiting for GitLab.") diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py index e1e1f6e..b63cb66 100644 --- a/services/api/api_jenkins.py +++ b/services/api/api_jenkins.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -37,26 +37,36 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. from django.conf import settings -import requests from requests.auth import HTTPBasicAuth - from services.constants import Constants from services.helper import Helper -from services.logging_service import LoggingServiceFactory +import logging +import requests +import time +from services.session import session + +logger = logging.getLogger('ice-ci.logger') -logger = LoggingServiceFactory.get_logger() class APIJenkins: @staticmethod def get_jenkins_job(job_name): r1 = None + counter = 0 getURL = settings.JENKINS_URL + "job/" + job_name logger.debug("Get APIJenkins job URL: " + getURL) try: r1 = requests.get(getURL, auth=HTTPBasicAuth( settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD)) + while r1.status_code != 200 and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + r1 = requests.get(getURL, auth=HTTPBasicAuth( + settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD)) + time.sleep(session.wait_until_time_pause) + logger.debug( + "try to get jenkins job (try #%s)" % counter) + counter += 1 Helper.internal_assert(r1.status_code, 200) logger.debug("Job was created on APIJenkins!") except: @@ -78,4 +88,3 @@ class APIJenkins: if Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.BUILD_IDENTIFIER in line: parts = line.partition('jenkins') return parts[2] - diff --git a/services/constants.py b/services/constants.py index 0be529f..23d7ef1 100644 --- a/services/constants.py +++ b/services/constants.py @@ -42,7 +42,7 @@ from django.conf import settings class ServiceProvider: PROGRAM_NAME = "VVP" MainServiceProvider = "ServiceProvider" - email = "example.com" + email = "example-domain.com" class Constants: @@ -89,9 +89,21 @@ class Constants: class Review: TEXT = "review" + class PeerReview: + TEXT = "peer_review" + + class Approval: + TEXT = "approval" + + class Handoff: + TEXT = "handoff" + class Archive: TEXT = "archive" + class Closed: + TEXT = "closed" + class FEConstants: RETRIES_NUMBER = 120 @@ -108,14 +120,15 @@ class Constants: class Users: class Admin: - EMAIL = "admin@example.com" + EMAIL = "admin@" + ServiceProvider.email FULLNAME = "admin bogus user" class AdminRO: - EMAIL = "admin_ro@example.com" + EMAIL = "admin_ro@" + ServiceProvider.email class LongEmailLengthStandardUser: - EMAIL = "50charslengthemailofstandarduserforinvite@example.com" + EMAIL = "50charslengthemailofstandardus@" + \ + ServiceProvider.email class Toast: ID = "toast-successfully-message" @@ -745,9 +758,9 @@ class Constants: class FilterByFileDropDown: ID = "selected-file-filter-dropdown" ANY_FILE_LINK_TEXT = "Any file" - FILE0_LINK_TEXT = "file0.yaml" - FILE1_LINK_TEXT = "file1.yaml" - FILE2_LINK_TEXT = "file2.yaml" + FILE0_LINK_TEXT = "file0" + FILE1_LINK_TEXT = "file1" + FILE2_LINK_TEXT = "file2" class StateDropDown: ID = "selected-state-filter-dropdown" diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py index 15851cf..04f8a44 100644 --- a/services/database/db_checklist.py +++ b/services/database/db_checklist.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -50,6 +50,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class DBChecklist: @staticmethod @@ -361,16 +362,18 @@ class DBChecklist: @staticmethod def state_changed(identify_field, field_value, expected_state): - get_state = str(DBGeneral.select_where( - "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1)) + get_state = DBGeneral.select_where_order_by_desc( + "state", Constants.DBConstants.IceTables.CHECKLIST, + identify_field, field_value, "create_time")[0] counter = 0 while get_state != expected_state and counter <= Constants.DBConstants.RETRIES_NUMBER: time.sleep(session.wait_until_time_pause_long) logger.debug("Checklist state not changed yet , expecting state: %s, current result: %s (attempt %s of %s)" % ( expected_state, get_state, counter, Constants.DBConstants.RETRIES_NUMBER)) counter += 1 - get_state = str(DBGeneral.select_where( - "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1)) + get_state = DBGeneral.select_where_order_by_desc( + "state", Constants.DBConstants.IceTables.CHECKLIST, + identify_field, field_value, "create_time")[0] if get_state == expected_state: logger.debug("Checklist state was successfully changed into: " + @@ -382,5 +385,6 @@ class DBChecklist: @staticmethod def get_recent_checklist_uuid(name): required_uuid = DBGeneral.select_where_not_and_order_by_desc( - 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time') + 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name, + 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time') return required_uuid diff --git a/services/frontend/base_actions/get.py b/services/frontend/base_actions/get.py index 8735c1b..5fb801a 100644 --- a/services/frontend/base_actions/get.py +++ b/services/frontend/base_actions/get.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -37,6 +37,7 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. from services.frontend.base_actions.wait import Wait +from services.helper import Helper from services.session import session @@ -98,8 +99,10 @@ class Get: raise Exception(errorMsg, attr_name_value) @staticmethod - def value_by_name(attr_name_value): + def value_by_name(attr_name_value, wait_for_page=False): try: + if wait_for_page: + Wait.page_has_loaded() Wait.name(attr_name_value) return session.ice_driver.find_element_by_name(attr_name_value).get_attribute("value") except Exception as e: @@ -125,3 +128,16 @@ class Get: except Exception as e: errorMsg = "Failed to get if it's selected by id:" + attr_id_value raise Exception(errorMsg, attr_id_value) + + @staticmethod + def is_checkbox_selected_by_id(attr_id_value, wait_for_page=False): + try: + if wait_for_page: + Wait.page_has_loaded() + Wait.id(attr_id_value) + return Helper.internal_assert_boolean_true_false( + session.ice_driver.find_element_by_id( + attr_id_value).get_attribute("value"), "on") + except Exception as e: + errorMsg = "Failed to get if it's selected by id:" + attr_id_value + raise Exception(errorMsg, attr_id_value) diff --git a/services/frontend/base_actions/wait.py b/services/frontend/base_actions/wait.py index 50eff08..a699917 100644 --- a/services/frontend/base_actions/wait.py +++ b/services/frontend/base_actions/wait.py @@ -196,17 +196,18 @@ class Wait: @staticmethod def page_has_loaded(): - countwait_untilelement_to_be_presented_by_id = 0 for _ in range(Constants.FEConstants.RETRIES_NUMBER): - httpRequests = session.ice_driver.execute_script( - 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;') - if(str(httpRequests) == "0"): + try: + httpRequests = session.ice_driver.execute_script( + 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;') + if(str(httpRequests) == "0"): + time.sleep(session.wait_until_time_pause) + return + logger.debug( + "Checking if {} page is loaded. ".format(session.ice_driver.current_url)) time.sleep(session.wait_until_time_pause) - return - logger.debug( - "Checking if {} page is loaded. ".format(session.ice_driver.current_url)) - time.sleep(session.wait_until_time_pause) - countwait_untilelement_to_be_presented_by_id += 1 + except Exception as exception: + continue raise Exception("Page loading took too much time") diff --git a/services/frontend/fe_checklist.py b/services/frontend/fe_checklist.py index 75f957a..3afc472 100644 --- a/services/frontend/fe_checklist.py +++ b/services/frontend/fe_checklist.py @@ -600,7 +600,7 @@ class FEChecklist: if checklist_uuid is None: checklist_uuid = DBGeneral.select_where_not_and_order_by_desc( 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', checklistName, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')[0] - Click.id("checklist-" + checklist_uuid) + Click.id("checklist-" + checklist_uuid, True) @staticmethod def validate_reject_is_enabled(): @@ -752,4 +752,3 @@ class FEChecklist: "Jenkins log could not be viewed.") Click.id(Constants.Dashboard.Modal.CLOSE_BUTTON_ID) return log - diff --git a/services/frontend/fe_dashboard.py b/services/frontend/fe_dashboard.py index 0df66d4..b8d51b5 100644 --- a/services/frontend/fe_dashboard.py +++ b/services/frontend/fe_dashboard.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -221,21 +221,21 @@ class FEDashboard: eng_manual_id = user_content['engagement_manual_id'] + ":" Wait.text_by_id(engSearchID, eng_manual_id) logger.debug("Engagement found (searched by engagement_manual_id)") - FEGeneral.refresh() + FEGeneral.smart_refresh() logger.debug("Search engagement by VF name") # Search by VF name. Enter.text_by_id( Constants.Dashboard.Statuses.SearchBox.ID, user_content['vfName']) Wait.text_by_id(engSearchID, eng_manual_id) logger.debug("Engagement found (searched by VF name)") - FEGeneral.refresh() + FEGeneral.smart_refresh() logger.debug("Search engagement by VFC") # Search by VFC. Enter.text_by_id( Constants.Dashboard.Statuses.SearchBox.ID, vfcName) Wait.text_by_id(engSearchID, eng_manual_id) logger.debug("Engagement found (searched by VFC)") - FEGeneral.refresh() + FEGeneral.smart_refresh() logger.debug("Negative search: search by random string") # Search by VFC. Enter.text_by_id(Constants.Dashboard.Statuses.SearchBox.ID, diff --git a/services/frontend/fe_detailed_view.py b/services/frontend/fe_detailed_view.py index bf9a5bf..556e7a7 100644 --- a/services/frontend/fe_detailed_view.py +++ b/services/frontend/fe_detailed_view.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -40,7 +40,7 @@ import time from selenium.webdriver.support.ui import Select -from services.constants import Constants +from services.constants import Constants, ServiceProvider from services.database.db_general import DBGeneral from services.frontend.base_actions.click import Click from services.frontend.base_actions.enter import Enter @@ -237,7 +237,7 @@ class FEDetailedView: session.ice_driver.find_element_by_name("extRefID").click() Enter.text_by_name("extRefID", Helper.rand_string("randomNumber")) Select(session.ice_driver.find_element_by_id( - Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text("AT&T") + Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text(ServiceProvider.MainServiceProvider) Click.id(Constants.Dashboard.DetailedView.VFC.Save_button.ID) return vfcName diff --git a/services/frontend/fe_general.py b/services/frontend/fe_general.py index c6832cb..b3f97b4 100644 --- a/services/frontend/fe_general.py +++ b/services/frontend/fe_general.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -54,6 +54,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEGeneral(Helper): @staticmethod @@ -103,16 +104,32 @@ class FEGeneral(Helper): @staticmethod def refresh(): - try: # Click on element in UI, by CSS locator. + try: session.ice_driver.refresh() Wait.page_has_loaded() - # If failed - count the failure and add the error to list of errors. except Exception as e: errorMsg = "Could not refresh the page." logger.error(errorMsg) raise Exception(errorMsg, e) @staticmethod + def smart_refresh(): + session.ice_driver.refresh() + i = 0 + success = False + while not success and i < 2: + try: + Wait.page_has_loaded() + success = True + break + except: + i += 1 + time.sleep(1) + pass + if not success: + raise Exception("Failed to wait for refresh") + + @staticmethod def select_vendor_from_list(vendor): Wait.name(Constants.Signup.Company.NAME) Select(session.ice_driver.find_element_by_name( diff --git a/services/frontend/fe_invite.py b/services/frontend/fe_invite.py index 405581c..cb84262 100644 --- a/services/frontend/fe_invite.py +++ b/services/frontend/fe_invite.py @@ -157,7 +157,8 @@ class FEInvite: user_content['el_email'] = engLeadEmail uuid = DBGeneral.select_where_email( "uuid", "ice_user_profile", user_content['email']) - sponsor = ["AT&T", 'aaaaaa', inviteEmail, '3058000000'] + sponsor = [ServiceProvider.MainServiceProvider, 'aaaaaa', inviteEmail, + '3058000000'] invitation_token = DBUser.select_invitation_token( "invitation_token", "ice_invitation", "engagement_uuid", engagement_id, inviteEmail, 1) signUpURLforContact = DBUser.get_contact_signup_url( @@ -181,7 +182,8 @@ class FEInvite: @staticmethod def invite_x_users_and_verify_VF_appers_for_invited(user_content, engName): - inviteEmail = Helper.rand_string('randomString') + "@intl." + ServiceProvider.email + inviteEmail = Helper.rand_string('randomString') + "@" \ + + ServiceProvider.email vflist = FEInvite.create_x_vfs(user_content, engName, x=3) for vfName in vflist: # Fetch one AT&T user ID. @@ -192,7 +194,8 @@ class FEInvite: engName = engagement_manual_id + ": " + vfName vf_left_nav_id = "clickable-" + engName Click.id(vf_left_nav_id) - FEWizard.invite_team_members_modal(inviteEmail) + FEWizard.invite_team_members_modal(inviteEmail, + wait_modal_to_disappear=False) FEGeneral.refresh() # validations FEInvite.validations_for_user2(user_content, inviteEmail, vflist) diff --git a/services/frontend/fe_overview.py b/services/frontend/fe_overview.py index 8d05f0c..3764e2c 100644 --- a/services/frontend/fe_overview.py +++ b/services/frontend/fe_overview.py @@ -58,6 +58,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEOverview: @staticmethod @@ -65,9 +66,9 @@ class FEOverview: vfFullName = user_content[ 'engagement_manual_id'] + ": " + user_content['vfName'] Enter.text_by_id(Constants.Dashboard.LeftPanel.SearchBox.ID, user_content[ - 'vfName']) + 'vfName'], True) Click.id(Constants.Dashboard.LeftPanel.SearchBox.Results.ID % - user_content['vfName']) + user_content['vfName'], True) Wait.text_by_id( Constants.Dashboard.Overview.Title.ID, vfFullName) diff --git a/services/frontend/fe_user.py b/services/frontend/fe_user.py index 91cf7eb..eb25d23 100644 --- a/services/frontend/fe_user.py +++ b/services/frontend/fe_user.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -55,6 +55,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEUser: @staticmethod @@ -343,24 +344,24 @@ class FEUser: def validate_user_profile_settings_checkboxes(checked): Wait.page_has_loaded() receive_emails = Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, wait_for_page=True) + Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, True) Helper.internal_assert(receive_emails, checked) receive_notifications = \ Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID) + Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID, True) receive_email_every_time = \ Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID) + Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID, True) Helper.internal_assert(receive_email_every_time, checked) receive_digest_email = \ Get.is_selected_by_id( - Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, wait_for_page=True) + Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, True) Helper.internal_assert(receive_digest_email, not checked) @staticmethod def compare_notifications_count_for_user(expected_count): Wait.text_by_id( - Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, wait_for_page=True) + Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, True) @staticmethod def check_notification_number_is_not_presented(): diff --git a/services/frontend/fe_wizard.py b/services/frontend/fe_wizard.py index 777fe52..df5087d 100644 --- a/services/frontend/fe_wizard.py +++ b/services/frontend/fe_wizard.py @@ -51,6 +51,7 @@ from services.session import session logger = LoggingServiceFactory.get_logger() + class FEWizard: E2Edate = None @@ -126,7 +127,7 @@ class FEWizard: Enter.text_by_name("phone", phone) Click.css(Constants.SubmitButton.CSS) Wait.name_to_dissappear("Add AT&T Sponsor") - sponsor = {"company": "AT&T", "full_name": fullname, + sponsor = {"company": ServiceProvider.MainServiceProvider, "full_name": fullname, "email": email, "phone": phone} return sponsor @@ -179,7 +180,7 @@ class FEWizard: raise Exception(errorMsg) @staticmethod - def invite_team_members_modal(email): + def invite_team_members_modal(email, wait_modal_to_disappear=True): try: Click.id( Constants.Dashboard.Overview.TeamMember.ID, wait_for_page=True) @@ -188,11 +189,9 @@ class FEWizard: Enter.text_by_name("email", email) Wait.text_by_css( Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT) - Wait.css_to_dissappear( - '.inviteMembers-form button[disabled="disabled"].btn.btn-primary') - Wait.css(".inviteMembers-form button.btn.btn-primary") - Click.css(".inviteMembers-form button.btn.btn-primary") - Wait.modal_to_dissappear() + Click.css(".inviteMembers-form button.btn.btn-primary", True) + if wait_modal_to_disappear: + Wait.modal_to_dissappear() # If failed - count the failure and add the error to list of errors. except Exception as e: errorMsg = "FAILED in PopUp Invite Team Members. Exception=" + \ diff --git a/services/helper.py b/services/helper.py index 58769fe..418b260 100644 --- a/services/helper.py +++ b/services/helper.py @@ -1,5 +1,5 @@ -# ============LICENSE_START========================================== +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -54,157 +54,166 @@ from services.logging_service import LoggingServiceFactory from utils.authentication import JWTAuthentication -logger = LoggingServiceFactory.get_logger()
-
-class Helper:
-
- tc = unittest.TestCase('__init__')
-
- @staticmethod
- def rand_string(type_of_value='randomString', numberOfDigits=0):
- letters_and_numbers = string.ascii_letters + string.digits
- if type_of_value == 'email':
- myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
- 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
- return "ST" + myEmail
- elif type_of_value == 'randomNumber':
- randomNumber = ''.join("%s" % random.randint(2, 9)
- for _ in range(0, (numberOfDigits + 1)))
- return randomNumber
- elif type_of_value == 'randomString':
- randomString = "".join(random.sample(letters_and_numbers, 5))
- return "ST" + randomString
- else:
- raise Exception("invalid rand type")
-
- @staticmethod
- def rand_invite_email():
- inviteEmail = "automationqatt" + \
- Helper.rand_string("randomString") + "@gmail.com"
- return inviteEmail
-
- @staticmethod
- def generate_sshpub_key():
- try:
- logger.debug("About to generate SSH Public Key")
- key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- backend=default_backend()
- )
- private_key = key.private_bytes(
- encoding=crypto_serialization.Encoding.PEM,
- format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=crypto_serialization.NoEncryption())
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- logger.debug("Generated SSH Public Key: " + public_key)
- except Exception as e: # If failed write to log the error and return 'None'.
- logger.error(e)
- logger.error("Failed to generate SSH Public Key.")
- raise e
- return public_key
-
- @staticmethod
- def check_admin_ssh_existence(path, admin_ssh):
- if admin_ssh == open(path).read().rstrip('\n'):
- logger.debug(
- "Admin SSH already defined in DB and equal to the one stored on the local system.")
- return True
- return False
-
- @staticmethod
- def get_or_create_rsa_key_for_admin():
- try: # Create pair of keys for the given user and return his public key.
- ssh_folder = Constants.Paths.SSH.PATH
- public_file = ssh_folder + "id_rsa.pub"
- privateFile = ssh_folder + "id_rsa"
- admin_ssh_exist_and_equal = False
- admin_ssh = None
- if not os.path.exists(ssh_folder):
- os.makedirs(ssh_folder)
- elif os.path.exists(public_file):
- admin_ssh = DBUser.retrieve_admin_ssh_from_db()
- admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence(
- public_file, admin_ssh)
- # TODO find pending gitlab bug causing old ssh key not be updated in gitlab cache
- if False and admin_ssh_exist_and_equal:
- return admin_ssh
- else:
- logger.debug("Private key file: " + privateFile +
- "\nPublice key file: " + public_file)
- key = rsa.generate_private_key(
- backend=crypto_default_backend(),
- public_exponent=65537,
- key_size=2048
- )
- private_key = key.private_bytes(
- crypto_serialization.Encoding.PEM,
- crypto_serialization.PrivateFormat.PKCS8,
- crypto_serialization.NoEncryption()).decode("utf-8")
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- with open(privateFile, 'w') as content_file:
- os.chmod(privateFile, 0o600)
- content_file.write(private_key) # Save private key to file.
- logger.debug(
- "Private key created successfully for admin")
- user_pub_key = public_key
- with open(public_file, 'w') as content_file:
- content_file.write(public_key) # Save public key to file.
- logger.debug("Public key created successfully for admin")
- cmd = 'ssh-keyscan ' + \
- settings.GITLAB_URL[7:-1] + ' >> ' + \
- ssh_folder + 'known_hosts'
- # Create known_hosts file and add GitLab to it.
- subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- logger.debug("Added GitLab to known_hosts")
- return user_pub_key
- except Exception as error:
- logger.error(
- "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
- raise Exception("Failed to create SSH keys for user admin", error)
-
- @staticmethod
- def internal_assert(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_assert_boolean(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- return True
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_not_equal(x, y):
- try:
- Helper.tc.assertNotEqual(x, y)
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def get_reset_passw_url(email):
- jwtObj = JWTAuthentication()
- token = jwtObj.createPersonalTokenWithExpiration(email)
- resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token
- return resetPasswURL
-
- @staticmethod
- def assertTrue(expr, msg=None):
- """Check that the expression is true."""
- if expr != True:
- raise Exception("AssertionError: \"not expr")
+logger = LoggingServiceFactory.get_logger() + + +class Helper: + + tc = unittest.TestCase('__init__') + + @staticmethod + def rand_string(type_of_value='randomString', numberOfDigits=0): + letters_and_numbers = string.ascii_letters + string.digits + if type_of_value == 'email': + myEmail = ''.join(random.choice(letters_and_numbers) for _ in range( + 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com" + return "ST" + myEmail + elif type_of_value == 'randomNumber': + randomNumber = ''.join("%s" % random.randint(2, 9) + for _ in range(0, (numberOfDigits + 1))) + return randomNumber + elif type_of_value == 'randomString': + randomString = "".join(random.sample(letters_and_numbers, 5)) + return "ST" + randomString + else: + raise Exception("invalid rand type") + + @staticmethod + def rand_invite_email(): + inviteEmail = "automationqatt" + \ + Helper.rand_string("randomString") + "@gmail.com" + return inviteEmail + + @staticmethod + def generate_sshpub_key(): + try: + logger.debug("About to generate SSH Public Key") + key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + private_key = key.private_bytes( + encoding=crypto_serialization.Encoding.PEM, + format=crypto_serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=crypto_serialization.NoEncryption()) + public_key = key.public_key().public_bytes( + crypto_serialization.Encoding.OpenSSH, + crypto_serialization.PublicFormat.OpenSSH + ).decode("utf-8") + + logger.debug("Generated SSH Public Key: " + public_key) + # If failed write to log the error and return 'None'. + except Exception as e: + logger.error(e) + logger.error("Failed to generate SSH Public Key.") + raise e + return public_key + + @staticmethod + def check_admin_ssh_existence(path, admin_ssh): + if admin_ssh == open(path).read().rstrip('\n'): + logger.debug( + "Admin SSH already defined in DB and equal to the one stored on the local system.") + return True + return False + + @staticmethod + def get_or_create_rsa_key_for_admin(): + # Create pair of keys for the given user and return his public key. + try: + ssh_folder = Constants.Paths.SSH.PATH + public_file = ssh_folder + "id_rsa.pub" + privateFile = ssh_folder + "id_rsa" + admin_ssh_exist_and_equal = False + admin_ssh = None + if not os.path.exists(ssh_folder): + os.makedirs(ssh_folder) + elif os.path.exists(public_file): + admin_ssh = DBUser.retrieve_admin_ssh_from_db() + admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence( + public_file, admin_ssh) + # TODO find pending gitlab bug causing old ssh key not be updated + # in gitlab cache + if False and admin_ssh_exist_and_equal: + return admin_ssh + else: + logger.debug("Private key file: " + privateFile + + "\nPublice key file: " + public_file) + key = rsa.generate_private_key( + backend=crypto_default_backend(), + public_exponent=65537, + key_size=2048 + ) + private_key = key.private_bytes( + crypto_serialization.Encoding.PEM, + crypto_serialization.PrivateFormat.PKCS8, + crypto_serialization.NoEncryption()).decode("utf-8") + public_key = key.public_key().public_bytes( + crypto_serialization.Encoding.OpenSSH, + crypto_serialization.PublicFormat.OpenSSH + ).decode("utf-8") + + with open(privateFile, 'w') as content_file: + os.chmod(privateFile, 0o600) + # Save private key to file. + content_file.write(private_key) + logger.debug( + "Private key created successfully for admin") + user_pub_key = public_key + with open(public_file, 'w') as content_file: + content_file.write(public_key) # Save public key to file. + logger.debug("Public key created successfully for admin") + cmd = 'ssh-keyscan ' + \ + settings.GITLAB_URL[7:-1] + ' >> ' + \ + ssh_folder + 'known_hosts' + # Create known_hosts file and add GitLab to it. + subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) + logger.debug("Added GitLab to known_hosts") + return user_pub_key + except Exception as error: + logger.error( + "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error) + raise Exception("Failed to create SSH keys for user admin", error) + + @staticmethod + def internal_assert(x, y): + try: + Helper.tc.assertEqual(str(x), str(y)) + except Exception as e: + raise Exception("AssertionError: \"" + str(x) + + "\" != \"" + str(y) + "\"", e) + + @staticmethod + def internal_assert_boolean(x, y): + try: + Helper.tc.assertEqual(str(x), str(y)) + return True + except Exception as e: + raise Exception("AssertionError: \"" + str(x) + + "\" != \"" + str(y) + "\"", e) + + @staticmethod + def internal_assert_boolean_true_false(x, y): + return Helper.tc.assertEqual(str(x), str(y)) + + @staticmethod + def internal_not_equal(x, y): + try: + Helper.tc.assertNotEqual(x, y) + except Exception as e: + raise Exception("AssertionError: \"" + str(x) + + "\" != \"" + str(y) + "\"", e) + + @staticmethod + def get_reset_passw_url(email): + jwtObj = JWTAuthentication() + token = jwtObj.createPersonalTokenWithExpiration(email) + resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token + return resetPasswURL + + @staticmethod + def assertTrue(expr, msg=None): + """Check that the expression is true.""" + if expr != True: + raise Exception("AssertionError: \"not expr") diff --git a/tests/signalTests/test_checklist_signal.py b/tests/signalTests/test_checklist_signal.py index ae22578..7d3cd1c 100644 --- a/tests/signalTests/test_checklist_signal.py +++ b/tests/signalTests/test_checklist_signal.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -36,33 +36,33 @@ # ============LICENSE_END============================================ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -'''
-Created on 16 Nov 2016
-'''
-from django.conf import settings
-
-from iceci.decorator.exception_decor import exception
-from services.constants import Constants
-from services.logging_service import LoggingServiceFactory
-from services.types import API, DB
-from tests.signalTests.test_signal_base import TestSignalBase
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class TestChecklistSignal(TestSignalBase):
-
- @exception()
- def test_archive_checklist_after_editing_files(self):
- if settings.DATABASE_TYPE == 'local':
- logger.debug("Local environment, skipping test...")
- else:
- user_content = API.VirtualFunction.create_engagement()
- API.GitLab.git_clone_push(user_content, yaml=True)
- token = "token " + API.User.login_user(user_content['el_email'])
- user_content['session_token'] = token
- cl_content = API.Checklist.retrieve_heat_checklist(user_content)
- API.GitLab.git_push_commit(user_content, yaml=True)
- DB.Checklist.state_changed(
- "uuid", cl_content['uuid'], Constants.ChecklistStates.Archive.TEXT)
+''' +Created on 16 Nov 2016 +''' +from django.conf import settings + +from iceci.decorator.exception_decor import exception +from services.constants import Constants +from services.logging_service import LoggingServiceFactory +from services.types import API, DB +from tests.signalTests.test_signal_base import TestSignalBase + + +logger = LoggingServiceFactory.get_logger() + + +class TestChecklistSignal(TestSignalBase): + + @exception() + def test_archive_checklist_after_editing_files(self): + if settings.DATABASE_TYPE == 'local': + logger.debug("Local environment, skipping test...") + else: + user_content = API.VirtualFunction.create_engagement() + API.GitLab.git_clone_push(user_content) + token = "token " + API.User.login_user(user_content['el_email']) + user_content['session_token'] = token + cl_content = API.Checklist.create_checklist(user_content) + API.GitLab.git_push_commit(user_content) + DB.Checklist.state_changed( + "uuid", cl_content['uuid'], Constants.ChecklistStates.Archive.TEXT) diff --git a/tests/signalTests/test_git_signal.py b/tests/signalTests/test_git_signal.py index d008787..63290d2 100644 --- a/tests/signalTests/test_git_signal.py +++ b/tests/signalTests/test_git_signal.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -301,20 +301,3 @@ class TestGitSignal(TestSignalBase): "Invited user: " + email + " and" + second_invited_email['full_name'] + " found in GitLab.") logger.debug( "Inviter and invited users were created successfully on GitLab!") - - @exception() - def test_push_yaml_files_to_repo_check_decline_of_cl(self): - - if settings.DATABASE_TYPE == 'local': - logger.debug("Local environment, skipping test...") - else: - user_content = API.VirtualFunction.create_engagement() - token = "token " + API.User.login_user(user_content['el_email']) - user_content['session_token'] = token - cl_content_before_push_and_decline = API.Checklist.retrieve_heat_checklist( - user_content) - API.GitLab.git_clone_push(user_content, yaml=True) - cl_content_after_push_and_decline = API.Checklist.retrieve_heat_checklist( - user_content) - Helper.internal_not_equal( - cl_content_before_push_and_decline, cl_content_after_push_and_decline) diff --git a/tests/uiTests/test_bucket_e2e.py b/tests/uiTests/test_bucket_e2e.py index d72af48..73e18ca 100644 --- a/tests/uiTests/test_bucket_e2e.py +++ b/tests/uiTests/test_bucket_e2e.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -54,87 +54,95 @@ logger = LoggingServiceFactory.get_logger() class TestBucketE2E(TestUiBase): + def create_bucket_and_validate_users(self): user_content = API.VirtualFunction.create_engagement( wait_for_gitlab=True) - - print("***********STAGE = ",user_content['vfStage']) - API.VirtualFunction.set_eng_stage(user_content, Constants.EngagementStages.ACTIVE) - bucket_id = user_content['engagement_manual_id']+"_"+user_content['vfName'].lower() - print("***********bucket_id = ",bucket_id) + API.VirtualFunction.set_eng_stage( + user_content, Constants.EngagementStages.ACTIVE) + bucket_id = user_content[ + 'engagement_manual_id'] + "_" + user_content['vfName'].lower() bucket = API.Rados.get_bucket(bucket_id) assertTrue(API.Rados.is_bucket_ready(bucket_id)) assertTrue(bucket != "None") - assertTrue(API.Rados.users_of_bucket_ready_after_created(bucket_id,user_content['full_name'])) - #validate users added to bucket + assertTrue(API.Rados.users_of_bucket_ready_after_created( + bucket_id, user_content['full_name'])) + # validate users added to bucket grants = API.Rados.get_bucket_grants(bucket_id) count = 0 for g in grants: if g.id == user_content['full_name']: count = +1 - + assertTrue(count > 0) return bucket, user_content @exception() def test_validate_bucket_created(self): bucket, user_content = self.create_bucket_and_validate_users() - #create upload file - str_content = Helper.rand_string("randomString") + Helper.rand_string("randomNumber") + # create upload file + str_content = Helper.rand_string( + "randomString") + Helper.rand_string("randomNumber") fileName = Helper.rand_string("randomString") - bucket_id = user_content['engagement_manual_id']+"_"+user_content['vfName'].lower() + bucket_id = user_content[ + 'engagement_manual_id'] + "_" + user_content['vfName'].lower() bucket = API.Rados.get_bucket(bucket_id) assertTrue(API.Rados.is_bucket_ready(bucket_id)) - key = bucket.new_key(fileName+'.dat') + key = bucket.new_key(fileName + '.dat') key.set_contents_from_string(str_content) pprint(key.generate_url(expires_in=400)) # DOWNLOAD AN OBJECT (TO A FILE) - key = bucket.get_key(fileName+'.dat') - key.get_contents_to_filename('/home/'+fileName+'.dat') + key = bucket.get_key(fileName + '.dat') + key.get_contents_to_filename('/home/' + fileName + '.dat') key.delete() - + @exception() def test_validate_bucket_removed(self): bucket, user_content = self.create_bucket_and_validate_users() - #set Completed Stage + # set Completed Stage API.VirtualFunction.set_eng_stage( user_content, Constants.EngagementStages.COMPLETED) - #validate users removed from bucket - bucket_id = user_content['engagement_manual_id']+"_"+user_content['vfName'].lower() - assertTrue(API.Rados.users_of_bucket_ready_after_complete(bucket_id,user_content['full_name'])) + # validate users removed from bucket + bucket_id = user_content[ + 'engagement_manual_id'] + "_" + user_content['vfName'].lower() + assertTrue(API.Rados.users_of_bucket_ready_after_complete( + bucket_id, user_content['full_name'])) assertTrue(API.Rados.is_bucket_ready(bucket_id)) assertTrue(bucket != "None") - #try create upload file - must failed - str_content = Helper.rand_string("randomString") + Helper.rand_string("randomNumber") + # try create upload file - must failed + str_content = Helper.rand_string( + "randomString") + Helper.rand_string("randomNumber") fileName = Helper.rand_string("randomString") bucket = API.Rados.get_bucket(bucket_id) assertTrue(API.Rados.is_bucket_ready(bucket_id)) - key = bucket.new_key(fileName+'.dat') + key = bucket.new_key(fileName + '.dat') key.set_contents_from_string(str_content) pprint(key.generate_url(expires_in=400)) # DOWNLOAD AN OBJECT (TO A FILE) - key = bucket.get_key(fileName+'.dat') - key.get_contents_to_filename('/home/'+fileName+'.dat') + key = bucket.get_key(fileName + '.dat') + key.get_contents_to_filename('/home/' + fileName + '.dat') key.delete() - + @exception() def test_validate_upload_download_image_with_bucket_user(self): bucket, user_content = self.create_bucket_and_validate_users() - #connect to bucket with specific user - bucket_id = user_content['engagement_manual_id']+"_"+user_content['vfName'].lower() + # connect to bucket with specific user + bucket_id = user_content[ + 'engagement_manual_id'] + "_" + user_content['vfName'].lower() access_key = DBUser.get_access_key(user_content['uuid']) secret_key = DBUser.get_access_secret(user_content['uuid']) secret = CryptographyText.decrypt(secret_key) - bucket_for_specific_user = API.Rados.get_bucketfor_specific_user(bucket_id,access_key,secret) - assertTrue(bucket_for_specific_user != None) - #create upload file with user - str_content = Helper.rand_string("randomString") + Helper.rand_string("randomNumber") + bucket_for_specific_user = API.Rados.get_bucketfor_specific_user( + bucket_id, access_key, secret) + assertTrue(bucket_for_specific_user != None) + # create upload file with user + str_content = Helper.rand_string( + "randomString") + Helper.rand_string("randomNumber") fileName = Helper.rand_string("randomString") - key = bucket_for_specific_user.new_key(fileName+'.dat') + key = bucket_for_specific_user.new_key(fileName + '.dat') key.set_contents_from_string(str_content) pprint(key.generate_url(expires_in=3600)) # DOWNLOAD AN OBJECT (TO A FILE) - key = bucket_for_specific_user.get_key(fileName+'.dat') - key.get_contents_to_filename('/home/'+fileName+'.dat') + key = bucket_for_specific_user.get_key(fileName + '.dat') + key.get_contents_to_filename('/home/' + fileName + '.dat') key.delete() - diff --git a/tests/uiTests/test_checklist_validations.py b/tests/uiTests/test_checklist_validations.py index c89fa25..07d303c 100644 --- a/tests/uiTests/test_checklist_validations.py +++ b/tests/uiTests/test_checklist_validations.py @@ -105,7 +105,8 @@ class TestChecklistValidations(TestUiBase): engagement_manual_id = newObjWithChecklist[2] actualVfNameid = newObjWithChecklist[3] checklistName = newObjWithChecklist[5] - DB.Checklist.state_changed("uuid", checklistUuid, 'review') + DB.Checklist.state_changed( + "uuid", checklistUuid, Constants.ChecklistStates.Review.TEXT) DB.Checklist.update_decisions(checklistUuid, checklistName) Frontend.User.relogin( @@ -154,7 +155,8 @@ class TestChecklistValidations(TestUiBase): engagement_manual_id = newObjWithChecklist[2] actualVfNameid = newObjWithChecklist[3] checklistName = newObjWithChecklist[5] - DB.Checklist.state_changed("uuid", checklistUuid, 'review') + DB.Checklist.state_changed( + "uuid", checklistUuid, Constants.ChecklistStates.Review.TEXT) DB.Checklist.update_decisions(checklistUuid, checklistName) Frontend.User.relogin( engLeadEmail, Constants.Default.Password.TEXT, engagement_manual_id) @@ -186,7 +188,8 @@ class TestChecklistValidations(TestUiBase): actualVfNameid = newObjWithChecklist[3] myVfName = newObjWithChecklist[4] checklistName = newObjWithChecklist[5] - DB.Checklist.state_changed("uuid", checklistUuid, 'review') + DB.Checklist.state_changed( + "uuid", checklistUuid, Constants.ChecklistStates.Review.TEXT) DB.Checklist.update_decisions(checklistUuid, checklistName) Frontend.User.relogin( @@ -228,7 +231,8 @@ class TestChecklistValidations(TestUiBase): @exception() def test_reject_anytime_checklist(self): cl_content = API.Checklist.create_checklist(self.user_content_api) - DB.Checklist.state_changed("name", cl_content['name'], 'review') + DB.Checklist.state_changed( + "name", cl_content['name'], Constants.ChecklistStates.Review.TEXT) Frontend.User.login( self.user_content_api['el_email'], Constants.Default.Password.TEXT) Frontend.Checklist.search_by_manual_id( @@ -238,7 +242,8 @@ class TestChecklistValidations(TestUiBase): Frontend.Checklist.click_on_checklist( self.user_content_api, cl_content['name'], recent_checklist_uuid) Frontend.Checklist.reject("Reject checklist on review state.") - DB.Checklist.state_changed("uuid", recent_checklist_uuid, 'archive') + DB.Checklist.state_changed( + "uuid", recent_checklist_uuid, Constants.ChecklistStates.Archive.TEXT) @exception() def test_clone_decision_auditlogs(self): diff --git a/tests/uiTests/test_left_nav_panel.py b/tests/uiTests/test_left_nav_panel.py index 74a78ad..236097d 100644 --- a/tests/uiTests/test_left_nav_panel.py +++ b/tests/uiTests/test_left_nav_panel.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -150,7 +150,7 @@ class TestLeftNavPanel(TestUiBase): myVfName = self.user_content[ 'engagement_manual_id'] + ": " + self.user_content['vfName'] actualVfNameid = "clickable-" + myVfName - actualVfName = Get.by_id(actualVfNameid) + actualVfName = Get.by_id(actualVfNameid, True) Helper.internal_assert(myVfName, actualVfName) Click.id(actualVfNameid) uuid = DB.General.select_where_email( @@ -176,7 +176,6 @@ class TestLeftNavPanel(TestUiBase): DB.VirtualFunction.remove_engagement_from_recent( self.user_content['vf_uuid']) Frontend.General.refresh() - Wait.id_to_dissappear(self.left_panel_eng_id) Frontend.Dashboard.statuses_search_vf( self.user_content['engagement_manual_id'], self.user_content['vfName']) Wait.text_by_id(self.left_panel_eng_id, self.eng_title) diff --git a/tests/uiTests/test_login_with_new_user.py b/tests/uiTests/test_login_with_new_user.py index 8018b1a..d7d1bda 100644 --- a/tests/uiTests/test_login_with_new_user.py +++ b/tests/uiTests/test_login_with_new_user.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -137,7 +137,7 @@ class TestLoginPageWithNewUser(TestUiBase): vfFullName = user_content[ 'engagement_manual_id'] + ": " + user_content['vfName'] actualVfNameid = "clickable-" + vfFullName - Click.id(actualVfNameid,wait_for_page=True) + Click.id(actualVfNameid, wait_for_page=True) Wait.id(Constants.Dashboard.Overview.TeamMember.ID) Frontend.Wizard.invite_team_members_modal(second_user_content['email']) enguuid = DB.General.select_where("uuid", "ice_engagement", "engagement_manual_id", user_content[ @@ -203,13 +203,15 @@ class TestLoginPageWithNewUser(TestUiBase): service_provider_internal["full_name"], service_provider_internal["phone"], service_provider_internal["company"]) Frontend.General.re_open(signUpURLforContact) actualInvitedEmail = Get.value_by_name(Constants.Signup.Email.NAME) - Helper.internal_assert(service_provider_internal["email"], actualInvitedEmail) + Helper.internal_assert( + service_provider_internal["email"], actualInvitedEmail) Helper.internal_assert( "+" + service_provider_internal["phone"], Get.value_by_name(Constants.Signup.Phone.NAME)) Helper.internal_assert( service_provider_internal["full_name"], Get.value_by_name(Constants.Signup.FullName.NAME)) + signupCompany = Get.value_by_name(Constants.Signup.Company.NAME, True) Helper.internal_assert( - service_provider_internal["company"], Get.value_by_name(Constants.Signup.Company.NAME)) + service_provider_internal["company"], signupCompany) @exception() def test_create_2_new_users(self): @@ -295,11 +297,8 @@ class TestLoginPageWithNewUser(TestUiBase): engLeadEmail = DB.User.select_el_email(vfName) engagement_manual_id = DB.General.select_where( "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) - # Fetch one is_service_provider_contact user ID. - enguuid = DB.General.select_where( - "uuid", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) invitation_token = DB.User.select_invitation_token("invitation_token", "ice_invitation", "engagement_uuid", - enguuid, vendor_contact["email"], 1) + engagement_id, vendor_contact["email"], 1) signUpURLforContact = DB.User.get_contact_signup_url(invitation_token, uuid, vendor_contact["email"], vendor_contact["full_name"], vendor_contact["phone"], vendor_contact["company"]) Frontend.General.re_open(signUpURLforContact) @@ -313,6 +312,8 @@ class TestLoginPageWithNewUser(TestUiBase): vendor_contact["company"], Get.value_by_name(Constants.Signup.Company.NAME)) # SignUp for VendorContact user_content['engagement_uuid'] = engagement_id + user_content['engagement_manual_id'] = engagement_manual_id + user_content['vfName'] = vfName user_content['el_email'] = engLeadEmail API.User.signup_invited_user(vendor_contact["company"], vendor_contact["email"], invitation_token, signUpURLforContact, user_content, True) @@ -384,18 +385,24 @@ class TestLoginPageWithNewUser(TestUiBase): # SignUp for MainServiceProviderSponsorContact engagement_manual_id = DB.General.select_where( "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) - # Fetch one is_service_provider_contact user ID. - enguuid = DB.General.select_where( - "uuid", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1) + invitation_token = DB.User.select_invitation_token("invitation_token", "ice_invitation", "engagement_uuid", - enguuid, service_provider_internal["email"], 1) + engagement_id, service_provider_internal["email"], 1) engLeadEmail = DB.User.select_el_email(vfName) user_content['engagement_uuid'] = engagement_id + user_content['engagement_manual_id'] = engagement_manual_id + user_content['vfName'] = vfName user_content['el_email'] = engLeadEmail + API.User.signup_invited_user(service_provider_internal["company"], service_provider_internal["email"], invitation_token, signUpURLforContact, user_content, True) - activationUrl2 = DB.User.get_activation_url(service_provider_internal["email"]) + activationUrl2 = DB.User.get_activation_url( + service_provider_internal["email"]) # Activate for VendorContact + + + + engagement_manual_id = DB.General.select_where( "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1) # Validate opened right VF for VendorContact diff --git a/tests/uiTests/test_next_step.py b/tests/uiTests/test_next_step.py index 036d466..cf1e00c 100644 --- a/tests/uiTests/test_next_step.py +++ b/tests/uiTests/test_next_step.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -204,7 +204,8 @@ class TestNextStep(TestUiBase): API.User.login_user(user_content['el_email']) Wait.page_has_loaded() cl_content = API.Checklist.create_checklist(user_content) - DB.Checklist.state_changed("name", cl_content['name'], 'review') + DB.Checklist.state_changed( + "name", cl_content['name'], Constants.ChecklistStates.Review.TEXT) new_cl_uuid = DB.Checklist.get_recent_checklist_uuid(cl_content['name'])[ 0] API.Checklist.add_checklist_next_step(user_content, new_cl_uuid) @@ -221,7 +222,8 @@ class TestNextStep(TestUiBase): user_content['session_token'] = "token " + \ API.User.login_user(user_content['el_email']) checklist = API.Checklist.create_checklist(user_content) - DB.Checklist.state_changed("uuid", checklist['uuid'], 'review') + DB.Checklist.state_changed( + "uuid", checklist['uuid'], Constants.ChecklistStates.Review.TEXT) Frontend.User.relogin( user_content['el_email'], 'iceusers') eng_id = "clickable-%s: %s" % ( diff --git a/tests/uiTests/test_overview.py b/tests/uiTests/test_overview.py index 2045acc..31db2f7 100644 --- a/tests/uiTests/test_overview.py +++ b/tests/uiTests/test_overview.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -45,7 +45,6 @@ from services.logging_service import LoggingServiceFactory from services.types import API, Frontend, DB from tests.uiTests.test_ui_base import TestUiBase - logger = LoggingServiceFactory.get_logger() @@ -54,9 +53,10 @@ class TestOverview(TestUiBase): @exception() def test_engagement_validation_details_update_when_cl_closed(self): user_content = API.VirtualFunction.create_engagement() - API.GitLab.git_clone_push(user_content) - cl_uuid = DB.General.select_where_and('uuid', Constants.DBConstants.IceTables.CHECKLIST, 'engagement_id', user_content['engagement_uuid'], - 'name', Constants.Dashboard.Checklist.ChecklistDefaultNames.AIC_INSTANTIATION, 1) + cl_name = Constants.Dashboard.Checklist.ChecklistDefaultNames.AIC_INSTANTIATION + DB.Checklist.state_changed( + "name", cl_name, Constants.ChecklistStates.Review.TEXT) + cl_uuid = DB.Checklist.get_recent_checklist_uuid(cl_name)[0] vf_staff_emails = [user_content['el_email'], user_content[ 'pr_email'], Constants.Users.Admin.EMAIL] API.Checklist.move_cl_to_closed(cl_uuid, vf_staff_emails) diff --git a/tests/uiTests/test_sanity.py b/tests/uiTests/test_sanity.py index 8399e12..1a32215 100644 --- a/tests/uiTests/test_sanity.py +++ b/tests/uiTests/test_sanity.py @@ -1,5 +1,5 @@ - -# ============LICENSE_START========================================== + +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -71,7 +71,8 @@ class TestSanity(TestUiBase): engagement_manual_id = newObjWithChecklist[2] actualVfNameid = newObjWithChecklist[3] checklistName = newObjWithChecklist[5] - DB.Checklist.state_changed("uuid", checklistUuid, 'review') + DB.Checklist.state_changed( + "uuid", checklistUuid, Constants.ChecklistStates.Review.TEXT) DB.Checklist.update_decisions(checklistUuid, checklistName) Frontend.User.relogin( @@ -79,7 +80,7 @@ class TestSanity(TestUiBase): Frontend.Checklist.click_on_checklist(user_content, checklistName) Frontend.Checklist.validate_reject_is_enabled() Frontend.Checklist.review_state_actions_and_validations( - checklistName, user_content['vfName'], "review") + checklistName, user_content['vfName'], Constants.ChecklistStates.Review.TEXT) Frontend.Checklist.cl_to_next_stage(actualVfNameid) engPreeRiviewerLeadEmail = DB.Checklist.get_pr_email(checklistUuid) @@ -17,3 +17,8 @@ basepython=python2.7 [testenv:py3] basepython=python3.6 + +[flake8] +show-source = True +exclude=venv-tox,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build + |