aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEdan Binshtok <eb578m@intl.att.com>2017-10-18 07:56:58 +0300
committerEdan Binshtok <eb578m@intl.att.com>2017-10-18 07:56:58 +0300
commit433a8256e31f755f5e236491bbe39d3db24d6d6d (patch)
tree45f483eab1ea1654ee21a3b51c5b8bf1a8ebaffa
parentf8907f0c4fc0ba4bb97a1d636a50c5b40c2642f2 (diff)
Align CI test test and JJB
Add vendor agnostic CI test to align Add Tox and maven docker Issue Id: VVP-15 Change-Id: I69f0c1036e6f72b62bddc822544c55200af7b37d Signed-off-by: Edan Binshtok <eb578m@intl.att.com>
-rw-r--r--.maven-dockerignore2
-rw-r--r--iceci/mail.py12
-rw-r--r--iceci/views.py5
-rw-r--r--services/api/api_checklist.py22
-rw-r--r--services/api/api_gitlab.py52
-rw-r--r--services/api/api_jenkins.py23
-rw-r--r--services/constants.py27
-rw-r--r--services/database/db_checklist.py18
-rw-r--r--services/frontend/base_actions/get.py22
-rw-r--r--services/frontend/base_actions/wait.py19
-rw-r--r--services/frontend/fe_checklist.py3
-rw-r--r--services/frontend/fe_dashboard.py10
-rw-r--r--services/frontend/fe_detailed_view.py8
-rw-r--r--services/frontend/fe_general.py25
-rw-r--r--services/frontend/fe_invite.py9
-rw-r--r--services/frontend/fe_overview.py5
-rw-r--r--services/frontend/fe_user.py15
-rw-r--r--services/frontend/fe_wizard.py13
-rw-r--r--services/helper.py319
-rw-r--r--tests/signalTests/test_checklist_signal.py64
-rw-r--r--tests/signalTests/test_git_signal.py21
-rw-r--r--tests/uiTests/test_bucket_e2e.py82
-rw-r--r--tests/uiTests/test_checklist_validations.py15
-rw-r--r--tests/uiTests/test_left_nav_panel.py7
-rw-r--r--tests/uiTests/test_login_with_new_user.py35
-rw-r--r--tests/uiTests/test_next_step.py10
-rw-r--r--tests/uiTests/test_overview.py12
-rw-r--r--tests/uiTests/test_sanity.py9
-rw-r--r--tox.ini5
29 files changed, 475 insertions, 394 deletions
diff --git a/.maven-dockerignore b/.maven-dockerignore
new file mode 100644
index 0000000..52d95d7
--- /dev/null
+++ b/.maven-dockerignore
@@ -0,0 +1,2 @@
+target/docker/
+
diff --git a/iceci/mail.py b/iceci/mail.py
index 9f23b90..80660b5 100644
--- a/iceci/mail.py
+++ b/iceci/mail.py
@@ -50,7 +50,7 @@ from django.conf import settings
from django.core.mail import send_mail
from django.utils import timezone
-from services.constants import Constants
+from services.constants import Constants, ServiceProvider
from services.logging_service import LoggingServiceFactory
@@ -59,15 +59,17 @@ admin_mail_from = settings.ICE_CONTACT_FROM_ADDRESS
param = "1"
logger = LoggingServiceFactory.get_logger()
+
def sendMail(param,email, data, mail_body, mail_subject, mail_from=admin_mail_from):
logger.debug("about to send mail to " + email)
- try:
-# lastBuild = param
+ try:
html_msg = mail_body.substitute(data)
mail_subject = mail_subject.substitute(data)
- #send mail with template
- send_mail(mail_subject, '', Constants.FEGeneral.ProgramName.name +"-CI Report Test Team <" + mail_from + ">",settings.ICE_CONTACT_EMAILS , fail_silently=False, html_message=html_msg)
+ send_mail(mail_subject, '', ServiceProvider.PROGRAM_NAME
+ + "-CI Report Test Team <" + mail_from + ">",
+ settings.ICE_CONTACT_EMAILS , fail_silently=False,
+ html_message=html_msg)
logger.debug("Looks like email delivery to "+email+" has succeeded")
except Exception:
traceback.print_exc()
diff --git a/iceci/views.py b/iceci/views.py
index 7e0505d..6a5a9b8 100644
--- a/iceci/views.py
+++ b/iceci/views.py
@@ -47,7 +47,7 @@ from rest_framework.renderers import JSONRenderer
from iceci import mail
from iceci.mail import testsResults_mail_body
-from services.constants import Constants
+from services.constants import Constants, ServiceProvider
from services.logging_service import LoggingServiceFactory
from .models import TestResults
@@ -60,7 +60,8 @@ LAST_BUILD_REPORT_NUM = None
logger = LoggingServiceFactory.get_logger()
def index(request):
- return HttpResponse("Hello, world. You're at the "+Constants.FEGeneral.ProgramName.name+" ci index.")
+ return HttpResponse("Hello, world. You're at the "
+ + ServiceProvider.PROGRAM_NAME + " ci index.")
@csrf_exempt
def testResult_list(request): # List all tests, or create a new test.
diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py
index ef7b8a3..fda0730 100644
--- a/services/api/api_checklist.py
+++ b/services/api/api_checklist.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -47,6 +47,7 @@ from services.constants import Constants
from services.database.db_general import DBGeneral
from services.helper import Helper
from services.logging_service import LoggingServiceFactory
+from services.database.db_checklist import DBChecklist
logger = LoggingServiceFactory.get_logger()
@@ -208,12 +209,21 @@ class APIChecklist:
@staticmethod
def move_cl_to_closed(cl_uuid, vf_staff_emails):
api_checklist_obj = APIChecklist()
-
+ states = [Constants.ChecklistStates.PeerReview.TEXT,
+ Constants.ChecklistStates.Approval.TEXT,
+ Constants.ChecklistStates.Handoff.TEXT,
+ Constants.ChecklistStates.Closed.TEXT]
for i in range(len(vf_staff_emails)):
- logger.debug("Trying to jump state for %s [%s]" % (vf_staff_emails[i], i))
+ logger.debug(
+ "Trying to jump state for %s [%s]" % (vf_staff_emails[i], i))
+ DBChecklist.update_all_decisions_to_approve(cl_uuid)
api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i])
+ logger.debug("Checking state changed to %s" % states[i])
+ DBChecklist.state_changed("uuid", cl_uuid, states[i])
# Move CL to closed state.
- logger.debug("Trying to jump state 'closed' for %s" % vf_staff_emails[0])
+ logger.debug("Trying to jump state 'closed' for %s" %
+ vf_staff_emails[0])
api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[0])
-
+ logger.debug("Checking state changed to %s" % states[-1])
+ DBChecklist.state_changed("uuid", cl_uuid, states[-1])
diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py
index c7b25e0..6c5a2ff 100644
--- a/services/api/api_gitlab.py
+++ b/services/api/api_gitlab.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -56,6 +56,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class APIGitLab:
@staticmethod
@@ -79,13 +80,13 @@ class APIGitLab:
headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
try:
r1 = requests.get(getURL, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
counter = 0
- while r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ while r1.status_code == 404 or r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause)
r1 = requests.get(getURL, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
-
+ logger.debug(
+ "trying to get the git project, yet to succeed (try #%s)" % counter)
+ counter += 1
if r1.content == b'[]':
logger.error("Got an empty list as a response.")
raise
@@ -363,32 +364,15 @@ class APIGitLab:
@staticmethod
def is_gitlab_ready(user_content):
- counter = 1
- gettURL = settings.ICE_EM_URL + '/v1/engmgr/engagement/' + \
- user_content['engagement_uuid'] + '/checklist/new/'
- logger.debug(
- "Get URL to check if GitLab and Jenkins are ready: " + gettURL)
- # Validate with EL
- token = "token " + APIBridge.login_user(user_content['el_email'])
- headers = dict() # Create header for get request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = token
- r1 = requests.get(gettURL, headers=headers, verify=False)
- while (r1.content == b'"Create New checklist is not ready yet"' and counter <=
- Constants.GitLabConstants.RETRIES_NUMBER):
- time.sleep(session.wait_until_time_pause_long)
- logger.debug(
- "GitLab and Jenkins are not ready yet, trying again (%s of %s)" %
- (counter, Constants.GitLabConstants.RETRIES_NUMBER))
- r1 = requests.get(gettURL, headers=headers, verify=False)
- counter += 1
- if r1.status_code != 200:
- if r1.content == "Create New checklist is not ready yet":
- raise Exception("Max retries exceeded, failing test...")
- else:
- raise Exception("Something went wrong while waiting for GitLab and Jenkins. %s %s" % (
- r1.status_code, r1.reason))
- return False
- elif r1.status_code == 200:
- logger.debug("Gitlab and Jenkins are ready to continue!")
+ path_with_namespace = user_content[
+ 'engagement_manual_id'] + "%2F" + user_content['vfName']
+ # If admin user is in project repo, it means the project exists as
+ # well.
+ cont = APIGitLab.validate_git_project_members(
+ path_with_namespace, Constants.Users.Admin.EMAIL)
+ if cont:
+ logger.debug("Gitlab is ready for action, git repo was found!")
return True
+ else:
+ raise Exception(
+ "Something went wrong while waiting for GitLab.")
diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py
index e1e1f6e..b63cb66 100644
--- a/services/api/api_jenkins.py
+++ b/services/api/api_jenkins.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -37,26 +37,36 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
from django.conf import settings
-import requests
from requests.auth import HTTPBasicAuth
-
from services.constants import Constants
from services.helper import Helper
-from services.logging_service import LoggingServiceFactory
+import logging
+import requests
+import time
+from services.session import session
+
+logger = logging.getLogger('ice-ci.logger')
-logger = LoggingServiceFactory.get_logger()
class APIJenkins:
@staticmethod
def get_jenkins_job(job_name):
r1 = None
+ counter = 0
getURL = settings.JENKINS_URL + "job/" + job_name
logger.debug("Get APIJenkins job URL: " + getURL)
try:
r1 = requests.get(getURL, auth=HTTPBasicAuth(
settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD))
+ while r1.status_code != 200 and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ r1 = requests.get(getURL, auth=HTTPBasicAuth(
+ settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD))
+ time.sleep(session.wait_until_time_pause)
+ logger.debug(
+ "try to get jenkins job (try #%s)" % counter)
+ counter += 1
Helper.internal_assert(r1.status_code, 200)
logger.debug("Job was created on APIJenkins!")
except:
@@ -78,4 +88,3 @@ class APIJenkins:
if Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.BUILD_IDENTIFIER in line:
parts = line.partition('jenkins')
return parts[2]
-
diff --git a/services/constants.py b/services/constants.py
index 0be529f..23d7ef1 100644
--- a/services/constants.py
+++ b/services/constants.py
@@ -42,7 +42,7 @@ from django.conf import settings
class ServiceProvider:
PROGRAM_NAME = "VVP"
MainServiceProvider = "ServiceProvider"
- email = "example.com"
+ email = "example-domain.com"
class Constants:
@@ -89,9 +89,21 @@ class Constants:
class Review:
TEXT = "review"
+ class PeerReview:
+ TEXT = "peer_review"
+
+ class Approval:
+ TEXT = "approval"
+
+ class Handoff:
+ TEXT = "handoff"
+
class Archive:
TEXT = "archive"
+ class Closed:
+ TEXT = "closed"
+
class FEConstants:
RETRIES_NUMBER = 120
@@ -108,14 +120,15 @@ class Constants:
class Users:
class Admin:
- EMAIL = "admin@example.com"
+ EMAIL = "admin@" + ServiceProvider.email
FULLNAME = "admin bogus user"
class AdminRO:
- EMAIL = "admin_ro@example.com"
+ EMAIL = "admin_ro@" + ServiceProvider.email
class LongEmailLengthStandardUser:
- EMAIL = "50charslengthemailofstandarduserforinvite@example.com"
+ EMAIL = "50charslengthemailofstandardus@" + \
+ ServiceProvider.email
class Toast:
ID = "toast-successfully-message"
@@ -745,9 +758,9 @@ class Constants:
class FilterByFileDropDown:
ID = "selected-file-filter-dropdown"
ANY_FILE_LINK_TEXT = "Any file"
- FILE0_LINK_TEXT = "file0.yaml"
- FILE1_LINK_TEXT = "file1.yaml"
- FILE2_LINK_TEXT = "file2.yaml"
+ FILE0_LINK_TEXT = "file0"
+ FILE1_LINK_TEXT = "file1"
+ FILE2_LINK_TEXT = "file2"
class StateDropDown:
ID = "selected-state-filter-dropdown"
diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py
index 15851cf..04f8a44 100644
--- a/services/database/db_checklist.py
+++ b/services/database/db_checklist.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -50,6 +50,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class DBChecklist:
@staticmethod
@@ -361,16 +362,18 @@ class DBChecklist:
@staticmethod
def state_changed(identify_field, field_value, expected_state):
- get_state = str(DBGeneral.select_where(
- "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1))
+ get_state = DBGeneral.select_where_order_by_desc(
+ "state", Constants.DBConstants.IceTables.CHECKLIST,
+ identify_field, field_value, "create_time")[0]
counter = 0
while get_state != expected_state and counter <= Constants.DBConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause_long)
logger.debug("Checklist state not changed yet , expecting state: %s, current result: %s (attempt %s of %s)" % (
expected_state, get_state, counter, Constants.DBConstants.RETRIES_NUMBER))
counter += 1
- get_state = str(DBGeneral.select_where(
- "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1))
+ get_state = DBGeneral.select_where_order_by_desc(
+ "state", Constants.DBConstants.IceTables.CHECKLIST,
+ identify_field, field_value, "create_time")[0]
if get_state == expected_state:
logger.debug("Checklist state was successfully changed into: " +
@@ -382,5 +385,6 @@ class DBChecklist:
@staticmethod
def get_recent_checklist_uuid(name):
required_uuid = DBGeneral.select_where_not_and_order_by_desc(
- 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')
+ 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name,
+ 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')
return required_uuid
diff --git a/services/frontend/base_actions/get.py b/services/frontend/base_actions/get.py
index 8735c1b..5fb801a 100644
--- a/services/frontend/base_actions/get.py
+++ b/services/frontend/base_actions/get.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -37,6 +37,7 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
from services.frontend.base_actions.wait import Wait
+from services.helper import Helper
from services.session import session
@@ -98,8 +99,10 @@ class Get:
raise Exception(errorMsg, attr_name_value)
@staticmethod
- def value_by_name(attr_name_value):
+ def value_by_name(attr_name_value, wait_for_page=False):
try:
+ if wait_for_page:
+ Wait.page_has_loaded()
Wait.name(attr_name_value)
return session.ice_driver.find_element_by_name(attr_name_value).get_attribute("value")
except Exception as e:
@@ -125,3 +128,16 @@ class Get:
except Exception as e:
errorMsg = "Failed to get if it's selected by id:" + attr_id_value
raise Exception(errorMsg, attr_id_value)
+
+ @staticmethod
+ def is_checkbox_selected_by_id(attr_id_value, wait_for_page=False):
+ try:
+ if wait_for_page:
+ Wait.page_has_loaded()
+ Wait.id(attr_id_value)
+ return Helper.internal_assert_boolean_true_false(
+ session.ice_driver.find_element_by_id(
+ attr_id_value).get_attribute("value"), "on")
+ except Exception as e:
+ errorMsg = "Failed to get if it's selected by id:" + attr_id_value
+ raise Exception(errorMsg, attr_id_value)
diff --git a/services/frontend/base_actions/wait.py b/services/frontend/base_actions/wait.py
index 50eff08..a699917 100644
--- a/services/frontend/base_actions/wait.py
+++ b/services/frontend/base_actions/wait.py
@@ -196,17 +196,18 @@ class Wait:
@staticmethod
def page_has_loaded():
- countwait_untilelement_to_be_presented_by_id = 0
for _ in range(Constants.FEConstants.RETRIES_NUMBER):
- httpRequests = session.ice_driver.execute_script(
- 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;')
- if(str(httpRequests) == "0"):
+ try:
+ httpRequests = session.ice_driver.execute_script(
+ 'return window.angular ? window.angular.element("body").injector().get("$http").pendingRequests.length : 1;')
+ if(str(httpRequests) == "0"):
+ time.sleep(session.wait_until_time_pause)
+ return
+ logger.debug(
+ "Checking if {} page is loaded. ".format(session.ice_driver.current_url))
time.sleep(session.wait_until_time_pause)
- return
- logger.debug(
- "Checking if {} page is loaded. ".format(session.ice_driver.current_url))
- time.sleep(session.wait_until_time_pause)
- countwait_untilelement_to_be_presented_by_id += 1
+ except Exception as exception:
+ continue
raise Exception("Page loading took too much time")
diff --git a/services/frontend/fe_checklist.py b/services/frontend/fe_checklist.py
index 75f957a..3afc472 100644
--- a/services/frontend/fe_checklist.py
+++ b/services/frontend/fe_checklist.py
@@ -600,7 +600,7 @@ class FEChecklist:
if checklist_uuid is None:
checklist_uuid = DBGeneral.select_where_not_and_order_by_desc(
'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', checklistName, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')[0]
- Click.id("checklist-" + checklist_uuid)
+ Click.id("checklist-" + checklist_uuid, True)
@staticmethod
def validate_reject_is_enabled():
@@ -752,4 +752,3 @@ class FEChecklist:
"Jenkins log could not be viewed.")
Click.id(Constants.Dashboard.Modal.CLOSE_BUTTON_ID)
return log
-
diff --git a/services/frontend/fe_dashboard.py b/services/frontend/fe_dashboard.py
index 0df66d4..b8d51b5 100644
--- a/services/frontend/fe_dashboard.py
+++ b/services/frontend/fe_dashboard.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -221,21 +221,21 @@ class FEDashboard:
eng_manual_id = user_content['engagement_manual_id'] + ":"
Wait.text_by_id(engSearchID, eng_manual_id)
logger.debug("Engagement found (searched by engagement_manual_id)")
- FEGeneral.refresh()
+ FEGeneral.smart_refresh()
logger.debug("Search engagement by VF name")
# Search by VF name.
Enter.text_by_id(
Constants.Dashboard.Statuses.SearchBox.ID, user_content['vfName'])
Wait.text_by_id(engSearchID, eng_manual_id)
logger.debug("Engagement found (searched by VF name)")
- FEGeneral.refresh()
+ FEGeneral.smart_refresh()
logger.debug("Search engagement by VFC")
# Search by VFC.
Enter.text_by_id(
Constants.Dashboard.Statuses.SearchBox.ID, vfcName)
Wait.text_by_id(engSearchID, eng_manual_id)
logger.debug("Engagement found (searched by VFC)")
- FEGeneral.refresh()
+ FEGeneral.smart_refresh()
logger.debug("Negative search: search by random string")
# Search by VFC.
Enter.text_by_id(Constants.Dashboard.Statuses.SearchBox.ID,
diff --git a/services/frontend/fe_detailed_view.py b/services/frontend/fe_detailed_view.py
index bf9a5bf..556e7a7 100644
--- a/services/frontend/fe_detailed_view.py
+++ b/services/frontend/fe_detailed_view.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -40,7 +40,7 @@ import time
from selenium.webdriver.support.ui import Select
-from services.constants import Constants
+from services.constants import Constants, ServiceProvider
from services.database.db_general import DBGeneral
from services.frontend.base_actions.click import Click
from services.frontend.base_actions.enter import Enter
@@ -237,7 +237,7 @@ class FEDetailedView:
session.ice_driver.find_element_by_name("extRefID").click()
Enter.text_by_name("extRefID", Helper.rand_string("randomNumber"))
Select(session.ice_driver.find_element_by_id(
- Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text("AT&T")
+ Constants.Dashboard.DetailedView.VFC.Choose_Company.ID)).select_by_visible_text(ServiceProvider.MainServiceProvider)
Click.id(Constants.Dashboard.DetailedView.VFC.Save_button.ID)
return vfcName
diff --git a/services/frontend/fe_general.py b/services/frontend/fe_general.py
index c6832cb..b3f97b4 100644
--- a/services/frontend/fe_general.py
+++ b/services/frontend/fe_general.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -54,6 +54,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEGeneral(Helper):
@staticmethod
@@ -103,16 +104,32 @@ class FEGeneral(Helper):
@staticmethod
def refresh():
- try: # Click on element in UI, by CSS locator.
+ try:
session.ice_driver.refresh()
Wait.page_has_loaded()
- # If failed - count the failure and add the error to list of errors.
except Exception as e:
errorMsg = "Could not refresh the page."
logger.error(errorMsg)
raise Exception(errorMsg, e)
@staticmethod
+ def smart_refresh():
+ session.ice_driver.refresh()
+ i = 0
+ success = False
+ while not success and i < 2:
+ try:
+ Wait.page_has_loaded()
+ success = True
+ break
+ except:
+ i += 1
+ time.sleep(1)
+ pass
+ if not success:
+ raise Exception("Failed to wait for refresh")
+
+ @staticmethod
def select_vendor_from_list(vendor):
Wait.name(Constants.Signup.Company.NAME)
Select(session.ice_driver.find_element_by_name(
diff --git a/services/frontend/fe_invite.py b/services/frontend/fe_invite.py
index 405581c..cb84262 100644
--- a/services/frontend/fe_invite.py
+++ b/services/frontend/fe_invite.py
@@ -157,7 +157,8 @@ class FEInvite:
user_content['el_email'] = engLeadEmail
uuid = DBGeneral.select_where_email(
"uuid", "ice_user_profile", user_content['email'])
- sponsor = ["AT&T", 'aaaaaa', inviteEmail, '3058000000']
+ sponsor = [ServiceProvider.MainServiceProvider, 'aaaaaa', inviteEmail,
+ '3058000000']
invitation_token = DBUser.select_invitation_token(
"invitation_token", "ice_invitation", "engagement_uuid", engagement_id, inviteEmail, 1)
signUpURLforContact = DBUser.get_contact_signup_url(
@@ -181,7 +182,8 @@ class FEInvite:
@staticmethod
def invite_x_users_and_verify_VF_appers_for_invited(user_content, engName):
- inviteEmail = Helper.rand_string('randomString') + "@intl." + ServiceProvider.email
+ inviteEmail = Helper.rand_string('randomString') + "@" \
+ + ServiceProvider.email
vflist = FEInvite.create_x_vfs(user_content, engName, x=3)
for vfName in vflist:
# Fetch one AT&T user ID.
@@ -192,7 +194,8 @@ class FEInvite:
engName = engagement_manual_id + ": " + vfName
vf_left_nav_id = "clickable-" + engName
Click.id(vf_left_nav_id)
- FEWizard.invite_team_members_modal(inviteEmail)
+ FEWizard.invite_team_members_modal(inviteEmail,
+ wait_modal_to_disappear=False)
FEGeneral.refresh()
# validations
FEInvite.validations_for_user2(user_content, inviteEmail, vflist)
diff --git a/services/frontend/fe_overview.py b/services/frontend/fe_overview.py
index 8d05f0c..3764e2c 100644
--- a/services/frontend/fe_overview.py
+++ b/services/frontend/fe_overview.py
@@ -58,6 +58,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEOverview:
@staticmethod
@@ -65,9 +66,9 @@ class FEOverview:
vfFullName = user_content[
'engagement_manual_id'] + ": " + user_content['vfName']
Enter.text_by_id(Constants.Dashboard.LeftPanel.SearchBox.ID, user_content[
- 'vfName'])
+ 'vfName'], True)
Click.id(Constants.Dashboard.LeftPanel.SearchBox.Results.ID %
- user_content['vfName'])
+ user_content['vfName'], True)
Wait.text_by_id(
Constants.Dashboard.Overview.Title.ID, vfFullName)
diff --git a/services/frontend/fe_user.py b/services/frontend/fe_user.py
index 91cf7eb..eb25d23 100644
--- a/services/frontend/fe_user.py
+++ b/services/frontend/fe_user.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -55,6 +55,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEUser:
@staticmethod
@@ -343,24 +344,24 @@ class FEUser:
def validate_user_profile_settings_checkboxes(checked):
Wait.page_has_loaded()
receive_emails = Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailsID, True)
Helper.internal_assert(receive_emails, checked)
receive_notifications = \
Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveNotificationsID, True)
receive_email_every_time = \
Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveEmailEveryTimeID, True)
Helper.internal_assert(receive_email_every_time, checked)
receive_digest_email = \
Get.is_selected_by_id(
- Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, wait_for_page=True)
+ Constants.Dashboard.Avatar.Account.UserProfileSettings.ReceiveDigestEmailID, True)
Helper.internal_assert(receive_digest_email, not checked)
@staticmethod
def compare_notifications_count_for_user(expected_count):
Wait.text_by_id(
- Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, wait_for_page=True)
+ Constants.Dashboard.Avatar.Notifications.Count.ID, expected_count, True)
@staticmethod
def check_notification_number_is_not_presented():
diff --git a/services/frontend/fe_wizard.py b/services/frontend/fe_wizard.py
index 777fe52..df5087d 100644
--- a/services/frontend/fe_wizard.py
+++ b/services/frontend/fe_wizard.py
@@ -51,6 +51,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class FEWizard:
E2Edate = None
@@ -126,7 +127,7 @@ class FEWizard:
Enter.text_by_name("phone", phone)
Click.css(Constants.SubmitButton.CSS)
Wait.name_to_dissappear("Add AT&T Sponsor")
- sponsor = {"company": "AT&T", "full_name": fullname,
+ sponsor = {"company": ServiceProvider.MainServiceProvider, "full_name": fullname,
"email": email, "phone": phone}
return sponsor
@@ -179,7 +180,7 @@ class FEWizard:
raise Exception(errorMsg)
@staticmethod
- def invite_team_members_modal(email):
+ def invite_team_members_modal(email, wait_modal_to_disappear=True):
try:
Click.id(
Constants.Dashboard.Overview.TeamMember.ID, wait_for_page=True)
@@ -188,11 +189,9 @@ class FEWizard:
Enter.text_by_name("email", email)
Wait.text_by_css(
Constants.SubmitButton.CSS, Constants.Dashboard.Wizard.InviteTeamMembers.Button.TEXT)
- Wait.css_to_dissappear(
- '.inviteMembers-form button[disabled="disabled"].btn.btn-primary')
- Wait.css(".inviteMembers-form button.btn.btn-primary")
- Click.css(".inviteMembers-form button.btn.btn-primary")
- Wait.modal_to_dissappear()
+ Click.css(".inviteMembers-form button.btn.btn-primary", True)
+ if wait_modal_to_disappear:
+ Wait.modal_to_dissappear()
# If failed - count the failure and add the error to list of errors.
except Exception as e:
errorMsg = "FAILED in PopUp Invite Team Members. Exception=" + \
diff --git a/services/helper.py b/services/helper.py
index 58769fe..418b260 100644
--- a/services/helper.py
+++ b/services/helper.py
@@ -1,5 +1,5 @@
-# ============LICENSE_START==========================================
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -54,157 +54,166 @@ from services.logging_service import LoggingServiceFactory
from utils.authentication import JWTAuthentication
-logger = LoggingServiceFactory.get_logger()
-
-class Helper:
-
- tc = unittest.TestCase('__init__')
-
- @staticmethod
- def rand_string(type_of_value='randomString', numberOfDigits=0):
- letters_and_numbers = string.ascii_letters + string.digits
- if type_of_value == 'email':
- myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
- 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
- return "ST" + myEmail
- elif type_of_value == 'randomNumber':
- randomNumber = ''.join("%s" % random.randint(2, 9)
- for _ in range(0, (numberOfDigits + 1)))
- return randomNumber
- elif type_of_value == 'randomString':
- randomString = "".join(random.sample(letters_and_numbers, 5))
- return "ST" + randomString
- else:
- raise Exception("invalid rand type")
-
- @staticmethod
- def rand_invite_email():
- inviteEmail = "automationqatt" + \
- Helper.rand_string("randomString") + "@gmail.com"
- return inviteEmail
-
- @staticmethod
- def generate_sshpub_key():
- try:
- logger.debug("About to generate SSH Public Key")
- key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- backend=default_backend()
- )
- private_key = key.private_bytes(
- encoding=crypto_serialization.Encoding.PEM,
- format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=crypto_serialization.NoEncryption())
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- logger.debug("Generated SSH Public Key: " + public_key)
- except Exception as e: # If failed write to log the error and return 'None'.
- logger.error(e)
- logger.error("Failed to generate SSH Public Key.")
- raise e
- return public_key
-
- @staticmethod
- def check_admin_ssh_existence(path, admin_ssh):
- if admin_ssh == open(path).read().rstrip('\n'):
- logger.debug(
- "Admin SSH already defined in DB and equal to the one stored on the local system.")
- return True
- return False
-
- @staticmethod
- def get_or_create_rsa_key_for_admin():
- try: # Create pair of keys for the given user and return his public key.
- ssh_folder = Constants.Paths.SSH.PATH
- public_file = ssh_folder + "id_rsa.pub"
- privateFile = ssh_folder + "id_rsa"
- admin_ssh_exist_and_equal = False
- admin_ssh = None
- if not os.path.exists(ssh_folder):
- os.makedirs(ssh_folder)
- elif os.path.exists(public_file):
- admin_ssh = DBUser.retrieve_admin_ssh_from_db()
- admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence(
- public_file, admin_ssh)
- # TODO find pending gitlab bug causing old ssh key not be updated in gitlab cache
- if False and admin_ssh_exist_and_equal:
- return admin_ssh
- else:
- logger.debug("Private key file: " + privateFile +
- "\nPublice key file: " + public_file)
- key = rsa.generate_private_key(
- backend=crypto_default_backend(),
- public_exponent=65537,
- key_size=2048
- )
- private_key = key.private_bytes(
- crypto_serialization.Encoding.PEM,
- crypto_serialization.PrivateFormat.PKCS8,
- crypto_serialization.NoEncryption()).decode("utf-8")
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- with open(privateFile, 'w') as content_file:
- os.chmod(privateFile, 0o600)
- content_file.write(private_key) # Save private key to file.
- logger.debug(
- "Private key created successfully for admin")
- user_pub_key = public_key
- with open(public_file, 'w') as content_file:
- content_file.write(public_key) # Save public key to file.
- logger.debug("Public key created successfully for admin")
- cmd = 'ssh-keyscan ' + \
- settings.GITLAB_URL[7:-1] + ' >> ' + \
- ssh_folder + 'known_hosts'
- # Create known_hosts file and add GitLab to it.
- subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- logger.debug("Added GitLab to known_hosts")
- return user_pub_key
- except Exception as error:
- logger.error(
- "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
- raise Exception("Failed to create SSH keys for user admin", error)
-
- @staticmethod
- def internal_assert(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_assert_boolean(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- return True
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_not_equal(x, y):
- try:
- Helper.tc.assertNotEqual(x, y)
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def get_reset_passw_url(email):
- jwtObj = JWTAuthentication()
- token = jwtObj.createPersonalTokenWithExpiration(email)
- resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token
- return resetPasswURL
-
- @staticmethod
- def assertTrue(expr, msg=None):
- """Check that the expression is true."""
- if expr != True:
- raise Exception("AssertionError: \"not expr")
+logger = LoggingServiceFactory.get_logger()
+
+
+class Helper:
+
+ tc = unittest.TestCase('__init__')
+
+ @staticmethod
+ def rand_string(type_of_value='randomString', numberOfDigits=0):
+ letters_and_numbers = string.ascii_letters + string.digits
+ if type_of_value == 'email':
+ myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
+ 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
+ return "ST" + myEmail
+ elif type_of_value == 'randomNumber':
+ randomNumber = ''.join("%s" % random.randint(2, 9)
+ for _ in range(0, (numberOfDigits + 1)))
+ return randomNumber
+ elif type_of_value == 'randomString':
+ randomString = "".join(random.sample(letters_and_numbers, 5))
+ return "ST" + randomString
+ else:
+ raise Exception("invalid rand type")
+
+ @staticmethod
+ def rand_invite_email():
+ inviteEmail = "automationqatt" + \
+ Helper.rand_string("randomString") + "@gmail.com"
+ return inviteEmail
+
+ @staticmethod
+ def generate_sshpub_key():
+ try:
+ logger.debug("About to generate SSH Public Key")
+ key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ private_key = key.private_bytes(
+ encoding=crypto_serialization.Encoding.PEM,
+ format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=crypto_serialization.NoEncryption())
+ public_key = key.public_key().public_bytes(
+ crypto_serialization.Encoding.OpenSSH,
+ crypto_serialization.PublicFormat.OpenSSH
+ ).decode("utf-8")
+
+ logger.debug("Generated SSH Public Key: " + public_key)
+ # If failed write to log the error and return 'None'.
+ except Exception as e:
+ logger.error(e)
+ logger.error("Failed to generate SSH Public Key.")
+ raise e
+ return public_key
+
+ @staticmethod
+ def check_admin_ssh_existence(path, admin_ssh):
+ if admin_ssh == open(path).read().rstrip('\n'):
+ logger.debug(
+ "Admin SSH already defined in DB and equal to the one stored on the local system.")
+ return True
+ return False
+
+ @staticmethod
+ def get_or_create_rsa_key_for_admin():
+ # Create pair of keys for the given user and return his public key.
+ try:
+ ssh_folder = Constants.Paths.SSH.PATH
+ public_file = ssh_folder + "id_rsa.pub"
+ privateFile = ssh_folder + "id_rsa"
+ admin_ssh_exist_and_equal = False
+ admin_ssh = None
+ if not os.path.exists(ssh_folder):
+ os.makedirs(ssh_folder)
+ elif os.path.exists(public_file):
+ admin_ssh = DBUser.retrieve_admin_ssh_from_db()
+ admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence(
+ public_file, admin_ssh)
+ # TODO find pending gitlab bug causing old ssh key not be updated
+ # in gitlab cache
+ if False and admin_ssh_exist_and_equal:
+ return admin_ssh
+ else:
+ logger.debug("Private key file: " + privateFile +
+ "\nPublice key file: " + public_file)
+ key = rsa.generate_private_key(
+ backend=crypto_default_backend(),
+ public_exponent=65537,
+ key_size=2048
+ )
+ private_key = key.private_bytes(
+ crypto_serialization.Encoding.PEM,
+ crypto_serialization.PrivateFormat.PKCS8,
+ crypto_serialization.NoEncryption()).decode("utf-8")
+ public_key = key.public_key().public_bytes(
+ crypto_serialization.Encoding.OpenSSH,
+ crypto_serialization.PublicFormat.OpenSSH
+ ).decode("utf-8")
+
+ with open(privateFile, 'w') as content_file:
+ os.chmod(privateFile, 0o600)
+ # Save private key to file.
+ content_file.write(private_key)
+ logger.debug(
+ "Private key created successfully for admin")
+ user_pub_key = public_key
+ with open(public_file, 'w') as content_file:
+ content_file.write(public_key) # Save public key to file.
+ logger.debug("Public key created successfully for admin")
+ cmd = 'ssh-keyscan ' + \
+ settings.GITLAB_URL[7:-1] + ' >> ' + \
+ ssh_folder + 'known_hosts'
+ # Create known_hosts file and add GitLab to it.
+ subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
+ logger.debug("Added GitLab to known_hosts")
+ return user_pub_key
+ except Exception as error:
+ logger.error(
+ "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
+ raise Exception("Failed to create SSH keys for user admin", error)
+
+ @staticmethod
+ def internal_assert(x, y):
+ try:
+ Helper.tc.assertEqual(str(x), str(y))
+ except Exception as e:
+ raise Exception("AssertionError: \"" + str(x) +
+ "\" != \"" + str(y) + "\"", e)
+
+ @staticmethod
+ def internal_assert_boolean(x, y):
+ try:
+ Helper.tc.assertEqual(str(x), str(y))
+ return True
+ except Exception as e:
+ raise Exception("AssertionError: \"" + str(x) +
+ "\" != \"" + str(y) + "\"", e)
+
+ @staticmethod
+ def internal_assert_boolean_true_false(x, y):
+ return Helper.tc.assertEqual(str(x), str(y))
+
+ @staticmethod
+ def internal_not_equal(x, y):
+ try:
+ Helper.tc.assertNotEqual(x, y)
+ except Exception as e:
+ raise Exception("AssertionError: \"" + str(x) +
+ "\" != \"" + str(y) + "\"", e)
+
+ @staticmethod
+ def get_reset_passw_url(email):
+ jwtObj = JWTAuthentication()
+ token = jwtObj.createPersonalTokenWithExpiration(email)
+ resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token
+ return resetPasswURL
+
+ @staticmethod
+ def assertTrue(expr, msg=None):
+ """Check that the expression is true."""
+ if expr != True:
+ raise Exception("AssertionError: \"not expr")
diff --git a/tests/signalTests/test_checklist_signal.py b/tests/signalTests/test_checklist_signal.py
index ae22578..7d3cd1c 100644
--- a/tests/signalTests/test_checklist_signal.py
+++ b/tests/signalTests/test_checklist_signal.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -36,33 +36,33 @@
# ============LICENSE_END============================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-'''
-Created on 16 Nov 2016
-'''
-from django.conf import settings
-
-from iceci.decorator.exception_decor import exception
-from services.constants import Constants
-from services.logging_service import LoggingServiceFactory
-from services.types import API, DB
-from tests.signalTests.test_signal_base import TestSignalBase
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class TestChecklistSignal(TestSignalBase):
-
- @exception()
- def test_archive_checklist_after_editing_files(self):
- if settings.DATABASE_TYPE == 'local':
- logger.debug("Local environment, skipping test...")
- else:
- user_content = API.VirtualFunction.create_engagement()
- API.GitLab.git_clone_push(user_content, yaml=True)
- token = "token " + API.User.login_user(user_content['el_email'])
- user_content['session_token'] = token
- cl_content = API.Checklist.retrieve_heat_checklist(user_content)
- API.GitLab.git_push_commit(user_content, yaml=True)
- DB.Checklist.state_changed(
- "uuid", cl_content['uuid'], Constants.ChecklistStates.Archive.TEXT)
+'''
+Created on 16 Nov 2016
+'''
+from django.conf import settings
+
+from iceci.decorator.exception_decor import exception
+from services.constants import Constants
+from services.logging_service import LoggingServiceFactory
+from services.types import API, DB
+from tests.signalTests.test_signal_base import TestSignalBase
+
+
+logger = LoggingServiceFactory.get_logger()
+
+
+class TestChecklistSignal(TestSignalBase):
+
+ @exception()
+ def test_archive_checklist_after_editing_files(self):
+ if settings.DATABASE_TYPE == 'local':
+ logger.debug("Local environment, skipping test...")
+ else:
+ user_content = API.VirtualFunction.create_engagement()
+ API.GitLab.git_clone_push(user_content)
+ token = "token " + API.User.login_user(user_content['el_email'])
+ user_content['session_token'] = token
+ cl_content = API.Checklist.create_checklist(user_content)
+ API.GitLab.git_push_commit(user_content)
+ DB.Checklist.state_changed(
+ "uuid", cl_content['uuid'], Constants.ChecklistStates.Archive.TEXT)
diff --git a/tests/signalTests/test_git_signal.py b/tests/signalTests/test_git_signal.py
index d008787..63290d2 100644
--- a/tests/signalTests/test_git_signal.py
+++ b/tests/signalTests/test_git_signal.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -301,20 +301,3 @@ class TestGitSignal(TestSignalBase):
"Invited user: " + email + " and" + second_invited_email['full_name'] + " found in GitLab.")
logger.debug(
"Inviter and invited users were created successfully on GitLab!")
-
- @exception()
- def test_push_yaml_files_to_repo_check_decline_of_cl(self):
-
- if settings.DATABASE_TYPE == 'local':
- logger.debug("Local environment, skipping test...")
- else:
- user_content = API.VirtualFunction.create_engagement()
- token = "token " + API.User.login_user(user_content['el_email'])
- user_content['session_token'] = token
- cl_content_before_push_and_decline = API.Checklist.retrieve_heat_checklist(
- user_content)
- API.GitLab.git_clone_push(user_content, yaml=True)
- cl_content_after_push_and_decline = API.Checklist.retrieve_heat_checklist(
- user_content)
- Helper.internal_not_equal(
- cl_content_before_push_and_decline, cl_content_after_push_and_decline)
diff --git a/tests/uiTests/test_bucket_e2e.py b/tests/uiTests/test_bucket_e2e.py
index d72af48..73e18ca 100644
--- a/tests/uiTests/test_bucket_e2e.py
+++ b/tests/uiTests/test_bucket_e2e.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -54,87 +54,95 @@ logger = LoggingServiceFactory.get_logger()
class TestBucketE2E(TestUiBase):
+
def create_bucket_and_validate_users(self):
user_content = API.VirtualFunction.create_engagement(
wait_for_gitlab=True)
-
- print("***********STAGE = ",user_content['vfStage'])
- API.VirtualFunction.set_eng_stage(user_content, Constants.EngagementStages.ACTIVE)
- bucket_id = user_content['engagement_manual_id']+"_"+user_content['vfName'].lower()
- print("***********bucket_id = ",bucket_id)
+ API.VirtualFunction.set_eng_stage(
+ user_content, Constants.EngagementStages.ACTIVE)
+ bucket_id = user_content[
+ 'engagement_manual_id'] + "_" + user_content['vfName'].lower()
bucket = API.Rados.get_bucket(bucket_id)
assertTrue(API.Rados.is_bucket_ready(bucket_id))
assertTrue(bucket != "None")
- assertTrue(API.Rados.users_of_bucket_ready_after_created(bucket_id,user_content['full_name']))
- #validate users added to bucket
+ assertTrue(API.Rados.users_of_bucket_ready_after_created(
+ bucket_id, user_content['full_name']))
+ # validate users added to bucket
grants = API.Rados.get_bucket_grants(bucket_id)
count = 0
for g in grants:
if g.id == user_content['full_name']:
count = +1
-
+
assertTrue(count > 0)
return bucket, user_content
@exception()
def test_validate_bucket_created(self):
bucket, user_content = self.create_bucket_and_validate_users()
- #create upload file
- str_content = Helper.rand_string("randomString") + Helper.rand_string("randomNumber")
+ # create upload file
+ str_content = Helper.rand_string(
+ "randomString") + Helper.rand_string("randomNumber")
fileName = Helper.rand_string("randomString")
- bucket_id = user_content['engagement_manual_id']+"_"+user_content['vfName'].lower()
+ bucket_id = user_content[
+ 'engagement_manual_id'] + "_" + user_content['vfName'].lower()
bucket = API.Rados.get_bucket(bucket_id)
assertTrue(API.Rados.is_bucket_ready(bucket_id))
- key = bucket.new_key(fileName+'.dat')
+ key = bucket.new_key(fileName + '.dat')
key.set_contents_from_string(str_content)
pprint(key.generate_url(expires_in=400))
# DOWNLOAD AN OBJECT (TO A FILE)
- key = bucket.get_key(fileName+'.dat')
- key.get_contents_to_filename('/home/'+fileName+'.dat')
+ key = bucket.get_key(fileName + '.dat')
+ key.get_contents_to_filename('/home/' + fileName + '.dat')
key.delete()
-
+
@exception()
def test_validate_bucket_removed(self):
bucket, user_content = self.create_bucket_and_validate_users()
- #set Completed Stage
+ # set Completed Stage
API.VirtualFunction.set_eng_stage(
user_content, Constants.EngagementStages.COMPLETED)
- #validate users removed from bucket
- bucket_id = user_content['engagement_manual_id']+"_"+user_content['vfName'].lower()
- assertTrue(API.Rados.users_of_bucket_ready_after_complete(bucket_id,user_content['full_name']))
+ # validate users removed from bucket
+ bucket_id = user_content[
+ 'engagement_manual_id'] + "_" + user_content['vfName'].lower()
+ assertTrue(API.Rados.users_of_bucket_ready_after_complete(
+ bucket_id, user_content['full_name']))
assertTrue(API.Rados.is_bucket_ready(bucket_id))
assertTrue(bucket != "None")
- #try create upload file - must failed
- str_content = Helper.rand_string("randomString") + Helper.rand_string("randomNumber")
+ # try create upload file - must failed
+ str_content = Helper.rand_string(
+ "randomString") + Helper.rand_string("randomNumber")
fileName = Helper.rand_string("randomString")
bucket = API.Rados.get_bucket(bucket_id)
assertTrue(API.Rados.is_bucket_ready(bucket_id))
- key = bucket.new_key(fileName+'.dat')
+ key = bucket.new_key(fileName + '.dat')
key.set_contents_from_string(str_content)
pprint(key.generate_url(expires_in=400))
# DOWNLOAD AN OBJECT (TO A FILE)
- key = bucket.get_key(fileName+'.dat')
- key.get_contents_to_filename('/home/'+fileName+'.dat')
+ key = bucket.get_key(fileName + '.dat')
+ key.get_contents_to_filename('/home/' + fileName + '.dat')
key.delete()
-
+
@exception()
def test_validate_upload_download_image_with_bucket_user(self):
bucket, user_content = self.create_bucket_and_validate_users()
- #connect to bucket with specific user
- bucket_id = user_content['engagement_manual_id']+"_"+user_content['vfName'].lower()
+ # connect to bucket with specific user
+ bucket_id = user_content[
+ 'engagement_manual_id'] + "_" + user_content['vfName'].lower()
access_key = DBUser.get_access_key(user_content['uuid'])
secret_key = DBUser.get_access_secret(user_content['uuid'])
secret = CryptographyText.decrypt(secret_key)
- bucket_for_specific_user = API.Rados.get_bucketfor_specific_user(bucket_id,access_key,secret)
- assertTrue(bucket_for_specific_user != None)
- #create upload file with user
- str_content = Helper.rand_string("randomString") + Helper.rand_string("randomNumber")
+ bucket_for_specific_user = API.Rados.get_bucketfor_specific_user(
+ bucket_id, access_key, secret)
+ assertTrue(bucket_for_specific_user != None)
+ # create upload file with user
+ str_content = Helper.rand_string(
+ "randomString") + Helper.rand_string("randomNumber")
fileName = Helper.rand_string("randomString")
- key = bucket_for_specific_user.new_key(fileName+'.dat')
+ key = bucket_for_specific_user.new_key(fileName + '.dat')
key.set_contents_from_string(str_content)
pprint(key.generate_url(expires_in=3600))
# DOWNLOAD AN OBJECT (TO A FILE)
- key = bucket_for_specific_user.get_key(fileName+'.dat')
- key.get_contents_to_filename('/home/'+fileName+'.dat')
+ key = bucket_for_specific_user.get_key(fileName + '.dat')
+ key.get_contents_to_filename('/home/' + fileName + '.dat')
key.delete()
-
diff --git a/tests/uiTests/test_checklist_validations.py b/tests/uiTests/test_checklist_validations.py
index c89fa25..07d303c 100644
--- a/tests/uiTests/test_checklist_validations.py
+++ b/tests/uiTests/test_checklist_validations.py
@@ -105,7 +105,8 @@ class TestChecklistValidations(TestUiBase):
engagement_manual_id = newObjWithChecklist[2]
actualVfNameid = newObjWithChecklist[3]
checklistName = newObjWithChecklist[5]
- DB.Checklist.state_changed("uuid", checklistUuid, 'review')
+ DB.Checklist.state_changed(
+ "uuid", checklistUuid, Constants.ChecklistStates.Review.TEXT)
DB.Checklist.update_decisions(checklistUuid, checklistName)
Frontend.User.relogin(
@@ -154,7 +155,8 @@ class TestChecklistValidations(TestUiBase):
engagement_manual_id = newObjWithChecklist[2]
actualVfNameid = newObjWithChecklist[3]
checklistName = newObjWithChecklist[5]
- DB.Checklist.state_changed("uuid", checklistUuid, 'review')
+ DB.Checklist.state_changed(
+ "uuid", checklistUuid, Constants.ChecklistStates.Review.TEXT)
DB.Checklist.update_decisions(checklistUuid, checklistName)
Frontend.User.relogin(
engLeadEmail, Constants.Default.Password.TEXT, engagement_manual_id)
@@ -186,7 +188,8 @@ class TestChecklistValidations(TestUiBase):
actualVfNameid = newObjWithChecklist[3]
myVfName = newObjWithChecklist[4]
checklistName = newObjWithChecklist[5]
- DB.Checklist.state_changed("uuid", checklistUuid, 'review')
+ DB.Checklist.state_changed(
+ "uuid", checklistUuid, Constants.ChecklistStates.Review.TEXT)
DB.Checklist.update_decisions(checklistUuid, checklistName)
Frontend.User.relogin(
@@ -228,7 +231,8 @@ class TestChecklistValidations(TestUiBase):
@exception()
def test_reject_anytime_checklist(self):
cl_content = API.Checklist.create_checklist(self.user_content_api)
- DB.Checklist.state_changed("name", cl_content['name'], 'review')
+ DB.Checklist.state_changed(
+ "name", cl_content['name'], Constants.ChecklistStates.Review.TEXT)
Frontend.User.login(
self.user_content_api['el_email'], Constants.Default.Password.TEXT)
Frontend.Checklist.search_by_manual_id(
@@ -238,7 +242,8 @@ class TestChecklistValidations(TestUiBase):
Frontend.Checklist.click_on_checklist(
self.user_content_api, cl_content['name'], recent_checklist_uuid)
Frontend.Checklist.reject("Reject checklist on review state.")
- DB.Checklist.state_changed("uuid", recent_checklist_uuid, 'archive')
+ DB.Checklist.state_changed(
+ "uuid", recent_checklist_uuid, Constants.ChecklistStates.Archive.TEXT)
@exception()
def test_clone_decision_auditlogs(self):
diff --git a/tests/uiTests/test_left_nav_panel.py b/tests/uiTests/test_left_nav_panel.py
index 74a78ad..236097d 100644
--- a/tests/uiTests/test_left_nav_panel.py
+++ b/tests/uiTests/test_left_nav_panel.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -150,7 +150,7 @@ class TestLeftNavPanel(TestUiBase):
myVfName = self.user_content[
'engagement_manual_id'] + ": " + self.user_content['vfName']
actualVfNameid = "clickable-" + myVfName
- actualVfName = Get.by_id(actualVfNameid)
+ actualVfName = Get.by_id(actualVfNameid, True)
Helper.internal_assert(myVfName, actualVfName)
Click.id(actualVfNameid)
uuid = DB.General.select_where_email(
@@ -176,7 +176,6 @@ class TestLeftNavPanel(TestUiBase):
DB.VirtualFunction.remove_engagement_from_recent(
self.user_content['vf_uuid'])
Frontend.General.refresh()
- Wait.id_to_dissappear(self.left_panel_eng_id)
Frontend.Dashboard.statuses_search_vf(
self.user_content['engagement_manual_id'], self.user_content['vfName'])
Wait.text_by_id(self.left_panel_eng_id, self.eng_title)
diff --git a/tests/uiTests/test_login_with_new_user.py b/tests/uiTests/test_login_with_new_user.py
index 8018b1a..d7d1bda 100644
--- a/tests/uiTests/test_login_with_new_user.py
+++ b/tests/uiTests/test_login_with_new_user.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -137,7 +137,7 @@ class TestLoginPageWithNewUser(TestUiBase):
vfFullName = user_content[
'engagement_manual_id'] + ": " + user_content['vfName']
actualVfNameid = "clickable-" + vfFullName
- Click.id(actualVfNameid,wait_for_page=True)
+ Click.id(actualVfNameid, wait_for_page=True)
Wait.id(Constants.Dashboard.Overview.TeamMember.ID)
Frontend.Wizard.invite_team_members_modal(second_user_content['email'])
enguuid = DB.General.select_where("uuid", "ice_engagement", "engagement_manual_id", user_content[
@@ -203,13 +203,15 @@ class TestLoginPageWithNewUser(TestUiBase):
service_provider_internal["full_name"], service_provider_internal["phone"], service_provider_internal["company"])
Frontend.General.re_open(signUpURLforContact)
actualInvitedEmail = Get.value_by_name(Constants.Signup.Email.NAME)
- Helper.internal_assert(service_provider_internal["email"], actualInvitedEmail)
+ Helper.internal_assert(
+ service_provider_internal["email"], actualInvitedEmail)
Helper.internal_assert(
"+" + service_provider_internal["phone"], Get.value_by_name(Constants.Signup.Phone.NAME))
Helper.internal_assert(
service_provider_internal["full_name"], Get.value_by_name(Constants.Signup.FullName.NAME))
+ signupCompany = Get.value_by_name(Constants.Signup.Company.NAME, True)
Helper.internal_assert(
- service_provider_internal["company"], Get.value_by_name(Constants.Signup.Company.NAME))
+ service_provider_internal["company"], signupCompany)
@exception()
def test_create_2_new_users(self):
@@ -295,11 +297,8 @@ class TestLoginPageWithNewUser(TestUiBase):
engLeadEmail = DB.User.select_el_email(vfName)
engagement_manual_id = DB.General.select_where(
"engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
- # Fetch one is_service_provider_contact user ID.
- enguuid = DB.General.select_where(
- "uuid", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
invitation_token = DB.User.select_invitation_token("invitation_token", "ice_invitation", "engagement_uuid",
- enguuid, vendor_contact["email"], 1)
+ engagement_id, vendor_contact["email"], 1)
signUpURLforContact = DB.User.get_contact_signup_url(invitation_token, uuid, vendor_contact["email"],
vendor_contact["full_name"], vendor_contact["phone"], vendor_contact["company"])
Frontend.General.re_open(signUpURLforContact)
@@ -313,6 +312,8 @@ class TestLoginPageWithNewUser(TestUiBase):
vendor_contact["company"], Get.value_by_name(Constants.Signup.Company.NAME))
# SignUp for VendorContact
user_content['engagement_uuid'] = engagement_id
+ user_content['engagement_manual_id'] = engagement_manual_id
+ user_content['vfName'] = vfName
user_content['el_email'] = engLeadEmail
API.User.signup_invited_user(vendor_contact["company"], vendor_contact["email"], invitation_token,
signUpURLforContact, user_content, True)
@@ -384,18 +385,24 @@ class TestLoginPageWithNewUser(TestUiBase):
# SignUp for MainServiceProviderSponsorContact
engagement_manual_id = DB.General.select_where(
"engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
- # Fetch one is_service_provider_contact user ID.
- enguuid = DB.General.select_where(
- "uuid", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+
invitation_token = DB.User.select_invitation_token("invitation_token", "ice_invitation", "engagement_uuid",
- enguuid, service_provider_internal["email"], 1)
+ engagement_id, service_provider_internal["email"], 1)
engLeadEmail = DB.User.select_el_email(vfName)
user_content['engagement_uuid'] = engagement_id
+ user_content['engagement_manual_id'] = engagement_manual_id
+ user_content['vfName'] = vfName
user_content['el_email'] = engLeadEmail
+
API.User.signup_invited_user(service_provider_internal["company"], service_provider_internal["email"], invitation_token,
signUpURLforContact, user_content, True)
- activationUrl2 = DB.User.get_activation_url(service_provider_internal["email"])
+ activationUrl2 = DB.User.get_activation_url(
+ service_provider_internal["email"])
# Activate for VendorContact
+
+
+
+
engagement_manual_id = DB.General.select_where(
"engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
# Validate opened right VF for VendorContact
diff --git a/tests/uiTests/test_next_step.py b/tests/uiTests/test_next_step.py
index 036d466..cf1e00c 100644
--- a/tests/uiTests/test_next_step.py
+++ b/tests/uiTests/test_next_step.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -204,7 +204,8 @@ class TestNextStep(TestUiBase):
API.User.login_user(user_content['el_email'])
Wait.page_has_loaded()
cl_content = API.Checklist.create_checklist(user_content)
- DB.Checklist.state_changed("name", cl_content['name'], 'review')
+ DB.Checklist.state_changed(
+ "name", cl_content['name'], Constants.ChecklistStates.Review.TEXT)
new_cl_uuid = DB.Checklist.get_recent_checklist_uuid(cl_content['name'])[
0]
API.Checklist.add_checklist_next_step(user_content, new_cl_uuid)
@@ -221,7 +222,8 @@ class TestNextStep(TestUiBase):
user_content['session_token'] = "token " + \
API.User.login_user(user_content['el_email'])
checklist = API.Checklist.create_checklist(user_content)
- DB.Checklist.state_changed("uuid", checklist['uuid'], 'review')
+ DB.Checklist.state_changed(
+ "uuid", checklist['uuid'], Constants.ChecklistStates.Review.TEXT)
Frontend.User.relogin(
user_content['el_email'], 'iceusers')
eng_id = "clickable-%s: %s" % (
diff --git a/tests/uiTests/test_overview.py b/tests/uiTests/test_overview.py
index 2045acc..31db2f7 100644
--- a/tests/uiTests/test_overview.py
+++ b/tests/uiTests/test_overview.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -45,7 +45,6 @@ from services.logging_service import LoggingServiceFactory
from services.types import API, Frontend, DB
from tests.uiTests.test_ui_base import TestUiBase
-
logger = LoggingServiceFactory.get_logger()
@@ -54,9 +53,10 @@ class TestOverview(TestUiBase):
@exception()
def test_engagement_validation_details_update_when_cl_closed(self):
user_content = API.VirtualFunction.create_engagement()
- API.GitLab.git_clone_push(user_content)
- cl_uuid = DB.General.select_where_and('uuid', Constants.DBConstants.IceTables.CHECKLIST, 'engagement_id', user_content['engagement_uuid'],
- 'name', Constants.Dashboard.Checklist.ChecklistDefaultNames.AIC_INSTANTIATION, 1)
+ cl_name = Constants.Dashboard.Checklist.ChecklistDefaultNames.AIC_INSTANTIATION
+ DB.Checklist.state_changed(
+ "name", cl_name, Constants.ChecklistStates.Review.TEXT)
+ cl_uuid = DB.Checklist.get_recent_checklist_uuid(cl_name)[0]
vf_staff_emails = [user_content['el_email'], user_content[
'pr_email'], Constants.Users.Admin.EMAIL]
API.Checklist.move_cl_to_closed(cl_uuid, vf_staff_emails)
diff --git a/tests/uiTests/test_sanity.py b/tests/uiTests/test_sanity.py
index 8399e12..1a32215 100644
--- a/tests/uiTests/test_sanity.py
+++ b/tests/uiTests/test_sanity.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -71,7 +71,8 @@ class TestSanity(TestUiBase):
engagement_manual_id = newObjWithChecklist[2]
actualVfNameid = newObjWithChecklist[3]
checklistName = newObjWithChecklist[5]
- DB.Checklist.state_changed("uuid", checklistUuid, 'review')
+ DB.Checklist.state_changed(
+ "uuid", checklistUuid, Constants.ChecklistStates.Review.TEXT)
DB.Checklist.update_decisions(checklistUuid, checklistName)
Frontend.User.relogin(
@@ -79,7 +80,7 @@ class TestSanity(TestUiBase):
Frontend.Checklist.click_on_checklist(user_content, checklistName)
Frontend.Checklist.validate_reject_is_enabled()
Frontend.Checklist.review_state_actions_and_validations(
- checklistName, user_content['vfName'], "review")
+ checklistName, user_content['vfName'], Constants.ChecklistStates.Review.TEXT)
Frontend.Checklist.cl_to_next_stage(actualVfNameid)
engPreeRiviewerLeadEmail = DB.Checklist.get_pr_email(checklistUuid)
diff --git a/tox.ini b/tox.ini
index e22bd20..c52ea41 100644
--- a/tox.ini
+++ b/tox.ini
@@ -17,3 +17,8 @@ basepython=python2.7
[testenv:py3]
basepython=python3.6
+
+[flake8]
+show-source = True
+exclude=venv-tox,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build
+