From d7d8722ce27e308defb6764d8d76f85ce7d63927 Mon Sep 17 00:00:00 2001 From: "Lovett, Trevor" Date: Tue, 3 Mar 2020 14:04:39 -0600 Subject: [VVP] Removing outdated test-engine code Issue-ID: VVP-381 Signed-off-by: Lovett, Trevor Change-Id: I8622794a24deb83e14334e3454194914b835d37d --- services/api/__init__.py | 38 ---- services/api/api_bridge.py | 79 ------- services/api/api_checklist.py | 251 --------------------- services/api/api_gitlab.py | 415 ----------------------------------- services/api/api_jenkins.py | 93 -------- services/api/api_rados.py | 140 ------------ services/api/api_user.py | 330 ---------------------------- services/api/api_virtual_function.py | 403 ---------------------------------- 8 files changed, 1749 deletions(-) delete mode 100644 services/api/__init__.py delete mode 100644 services/api/api_bridge.py delete mode 100644 services/api/api_checklist.py delete mode 100644 services/api/api_gitlab.py delete mode 100644 services/api/api_jenkins.py delete mode 100644 services/api/api_rados.py delete mode 100644 services/api/api_user.py delete mode 100644 services/api/api_virtual_function.py (limited to 'services/api') diff --git a/services/api/__init__.py b/services/api/__init__.py deleted file mode 100644 index 32b601a..0000000 --- a/services/api/__init__.py +++ /dev/null @@ -1,38 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. diff --git a/services/api/api_bridge.py b/services/api/api_bridge.py deleted file mode 100644 index d8cb46c..0000000 --- a/services/api/api_bridge.py +++ /dev/null @@ -1,79 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - - -class APIBridge: - - """ - This class helps to use functions inside - classes with circular import (dependencies). - Use this class only when there is circular - import in one of the API services. - """ - - @staticmethod - def is_gitlab_ready(user_content): - """is_gitlab_ready: Originally can be found under APIGitLab class.""" - from services.api.api_gitlab import APIGitLab - return APIGitLab.is_gitlab_ready(user_content) - - @staticmethod - def login_user(email): - """login_user: Originally can be found under APIUser class.""" - from services.api.api_user import APIUser - return APIUser.login_user(email) - - @staticmethod - def set_ssh(user_content, sshKey): - """set_ssh: Originally can be found under APIUser class.""" - from services.api.api_user import APIUser - return APIUser.set_ssh(user_content, sshKey) - - @staticmethod - def create_engagement(wait_for_gitlab=True): - """create_engagement: Originally can be found under - APIVirtualFunction class.""" - from services.api.api_virtual_function import APIVirtualFunction - return APIVirtualFunction.create_engagement(wait_for_gitlab) - - @staticmethod - def frontend_login(email, password): - """login: Originally can be found under FEUser class.""" - from services.frontend.fe_user import FEUser - return FEUser.login(email, password) diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py deleted file mode 100644 index 02ab311..0000000 --- a/services/api/api_checklist.py +++ /dev/null @@ -1,251 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import datetime - -import requests - -from services.api.api_gitlab import APIGitLab -from services.api.api_user import APIUser -from services.constants import Constants -from services.database.db_general import DBGeneral -from services.helper import Helper -from services.logging_service import LoggingServiceFactory -from services.database.db_checklist import DBChecklist - - -logger = LoggingServiceFactory.get_logger() - - -class APIChecklist: - - @staticmethod - def create_checklist( - user_content, - files=[ - "file0", - "file1"], - return_negative_response=False): - r1 = None - postURL = Constants.Default.URL.Checklist.TEXT + \ - user_content['engagement_uuid'] + '/checklist/new/' - logger.debug("Post create checklist URL: " + postURL) - headers = dict() - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - data = dict() - data['checkListAssociatedFiles'] = files - data['checkListName'] = "checklistAPI" + \ - Helper.rand_string('randomString') - data['checkListTemplateUuid'] = DBGeneral.select_where( - "uuid", "ice_checklist_template", "name", - Constants.Template.Heat.TEXT, 1) - try: - if not APIGitLab.is_gitlab_ready(user_content): - raise Exception( - "Gitlab is not ready and because " + - "of that the test is failed.") - - r1 = requests.post(postURL, json=data, - headers=headers, verify=False) - - Helper.internal_assert_boolean(r1.status_code, 200) - logger.debug("Checklist was created successfully!") - cl_content = r1.json() - return cl_content - except BaseException: - if return_negative_response: - return r1 - if r1 is None: - logger.error( - "Failed to create checklist for VF " + - user_content['vfName']) - else: - logger.error( - "Failed to create checklist for VF " + - user_content['vfName'] + - ", see response >>> %s %s.\nContent: %s" % - (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) - raise - - @staticmethod - def update_checklist(user_content, cl_uuid): - r1 = None - postURL = Constants.Default.URL.Checklist.Update.TEXT + '/' + cl_uuid - logger.debug("Post create checklist URL: " + postURL + '/' + cl_uuid) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - data = dict() # Create JSON data for post request. - data['checklistUuid'] = cl_uuid - data['checkListAssociatedFiles'] = ["file1", "file2"] - data['checkListName'] = "UpdateChecklistAPI" + \ - Helper.rand_string('randomString') - data['checkListTemplateUuid'] = DBGeneral.select_where( - "uuid", "ice_checklist_template", "name", - Constants.Template.Heat.TEXT, 1) - try: - r1 = requests.put( - postURL, json=data, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - logger.debug("DBChecklist was created successfully!") - cl_content = r1.json() - return cl_content['uuid'] - except BaseException: - if r1 is None: - logger.error( - "Failed to create checklist for VF " + - user_content['vfName']) - else: - logger.error( - "Failed to create checklist for VF " + - user_content['vfName'] + - ", see response >>> %s %s.\nContent: %s" % - (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) - raise - - @staticmethod - def add_checklist_audit_log(user_content, cl_uuid): - r1 = None - postURL = Constants.Default.URL.Checklist.Update + \ - cl_uuid + '/auditlog/' - logger.debug("Post checklist audit log URL: " + postURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - data = dict() # Create JSON data for post request. - data['description'] = "API audit log test " + \ - Helper.rand_string('randomString') - try: - r1 = requests.post( - postURL, json=data, headers=headers, verify=False) - Helper.internal_assert_boolean(r1.status_code, 200) - logger.debug("Audit log was added successfully!") - except BaseException: - if r1 is None: - logger.error( - "Failed to add audit log for checklist uuid: " + cl_uuid) - else: - logger.error( - "Failed to add audit log for checklist uuid: " + - cl_uuid + - ", see response >>> %s %s.\nContent: %s" % - (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) - raise - - @staticmethod - def add_checklist_next_step(user_content, cl_uuid): - r1 = None - postURL = Constants.Default.URL.Checklist.TEXT + \ - user_content['engagement_uuid'] + \ - '/checklist/' + cl_uuid + '/nextstep/' - logger.debug("Post checklist next step URL: " + postURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - data = dict() # Create JSON data for post request. - data['files'] = ["file0"] - data['assigneesUuids'] = [user_content['uuid']] - data['duedate'] = str(datetime.date.today()) - data['description'] = "API next step test " + \ - Helper.rand_string('randomString') - list_data = [] - list_data.append(data) - try: - r1 = requests.post( - postURL, json=list_data, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - logger.debug("Next step was added successfully!") - ns_uuid = r1.json() - return ns_uuid[0]['uuid'] - except BaseException: - if r1 is None: - logger.error( - "Failed to add next step for checklist uuid: " + cl_uuid) - else: - logger.error( - "Failed to add next step for checklist uuid: " + - cl_uuid + - ", see response >>> %s %s.\nContent: %s" % - (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) - raise - - @staticmethod - def jump_state(checklistUUID, engLeadEmail): - token = APIUser.login_user(engLeadEmail) - stateURL = Constants.Default.URL.Checklist.Rest.TEXT + \ - str(checklistUUID) + '/state/' - APIChecklist.go_to_next_state(stateURL, token) - - @staticmethod - def go_to_next_state(url, token): - headers = {"Authorization": "token " + token} - body = dict() - body['description'] = "" - body['decline'] = "False" - r = requests.put(url, headers=headers, json=body, verify=False) - if (r.status_code == 201 or r.status_code == 200): - logger.debug("go_to_next_state put request result status: %s" % - r.status_code) - else: - logger.error("PUT request failed to change checklist state >>> " + - str(r.status_code) + " " + r.reason) - raise Exception("PUT request failed to change checklist state") - - @staticmethod - def move_cl_to_closed(cl_uuid, vf_staff_emails): - api_checklist_obj = APIChecklist() - states = [Constants.ChecklistStates.PeerReview.TEXT, - Constants.ChecklistStates.Approval.TEXT, - Constants.ChecklistStates.Handoff.TEXT, - Constants.ChecklistStates.Closed.TEXT] - for i in range(len(vf_staff_emails)): - logger.debug( - "Trying to jump state for %s [%s]" % - (vf_staff_emails[i], i)) - DBChecklist.update_all_decisions_to_approve(cl_uuid) - api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i]) - logger.debug("Checking state changed to %s" % states[i]) - DBChecklist.state_changed("uuid", cl_uuid, states[i]) - - # Move CL to closed state. - logger.debug("Trying to jump state 'closed' for %s" % - vf_staff_emails[0]) - api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[0]) - logger.debug("Checking state changed to %s" % states[-1]) - DBChecklist.state_changed("uuid", cl_uuid, states[-1]) diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py deleted file mode 100644 index 639f67e..0000000 --- a/services/api/api_gitlab.py +++ /dev/null @@ -1,415 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import os -import subprocess -import sys -import time - -from django.conf import settings -import git -import requests - -from services.api.api_bridge import APIBridge -from services.constants import Constants -from services.database.db_virtual_function import DBVirtualFunction -from services.helper import Helper -from services.logging_service import LoggingServiceFactory -from services.session import session - - -logger = LoggingServiceFactory.get_logger() - - -class APIGitLab: - - @staticmethod - def display_output(p): - while True: - out = p.stderr.read(1) - if out == b'' and p.poll() is not None: - break - if out != '': - sys.stdout.write(str(out.decode())) - sys.stdout.flush() - - @staticmethod - def get_git_project(path_with_namespace): - r1 = None - getURL = Constants.Default.URL.GitLab.Projects.TEXT + \ - path_with_namespace - logger.debug("Get project URL: " + getURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN - try: - r1 = requests.get(getURL, headers=headers, verify=False) - counter = 0 - while r1.status_code == 404 or r1.content == b'[]' and \ - counter <= Constants.\ - GitLabConstants.RETRIES_NUMBER: - time.sleep(session.wait_until_time_pause) - r1 = requests.get(getURL, headers=headers, verify=False) - logger.debug( - "trying to get the git project, " + - "yet to succeed (try #%s)" % counter) - counter += 1 - Helper.internal_assert(r1.status_code, 200) - if r1.content == b'[]': - logger.error("Got an empty list as a response.") - raise - logger.debug("Project exists on APIGitLab!") - content = r1.json() # Change it from list to dict. - return content - except BaseException: - if r1 is None: - logger.error("Failed to get project from APIGitLab.") - else: - logger.error( - "Failed to get project from APIGitLab, " + - "see response >>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - def are_all_list_users_registered_as_project_members( - self, users_emails_list, project_path_with_namespace): - for email in users_emails_list: - if not self.validate_git_project_members( - project_path_with_namespace, email): - raise Exception( - "Couldn't find the invited users: " + - email + - " in GitLab.") - logger.debug( - "Invited user: " + email + " found in GitLab.") - - @staticmethod - def validate_git_project_members(path_with_namespace, user_email): - if settings.DATABASE_TYPE != 'local': - r1 = None - headers = dict() - git_user = APIGitLab.get_git_user(user_email) - getURL = Constants.Default.URL.GitLab.Projects.TEXT + \ - path_with_namespace + "/members/" + str(git_user['id']) - logger.debug("Get project members URL: " + getURL) - headers['Content-type'] = 'application/json' - headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN - counter = 0 - while (r1 is None or r1.content == b'[]' or r1.status_code != - 200) and counter <= Constants.GitLabConstants.\ - RETRIES_NUMBER: - logger.debug( - "try to get git project members (try #%s)" % counter) - time.sleep(session.wait_until_time_pause) - try: - r1 = requests.get(getURL, headers=headers, verify=False) - counter += 1 - except Exception as e: - if counter >= Constants.GitLabConstants.RETRIES_NUMBER: - logger.error( - "Failed to get project's team members from " + - "APIGitLab, see response >>> %s %s \n %s %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'), e.message)) - return False - if r1.content == b'[]': - logger.error("Got an empty list as a response.") - return False - elif r1.status_code != 200: - logger.error("Got %s %s." % (r1.status_code, r1.reason)) - return False - logger.debug("Got %s %s, user found in project." % - (r1.status_code, r1.reason)) - return True - - @staticmethod - def negative_validate_git_project_member( - path_with_namespace, user_email, git_user_id): - if settings.DATABASE_TYPE != 'local': - r1 = None - headers = dict() - getURL = Constants.Default.URL.GitLab.Projects.TEXT + \ - path_with_namespace + "/members/" + git_user_id - logger.debug("Get project members URL: " + getURL) - headers['Content-type'] = 'application/json' - headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN - counter = 0 - while r1 is None or str.encode( - user_email) not in r1.content and counter <= Constants.\ - GitLabConstants.RETRIES_NUMBER: - logger.debug( - "try to get git project members (try #%s)" % counter) - time.sleep(session.wait_until_time_pause) - try: - r1 = requests.get(getURL, headers=headers, verify=False) - counter += 1 - except Exception as e: - if counter >= Constants.GitLabConstants.RETRIES_NUMBER: - logger.error( - "Failed to get project's team members from " + - "APIGitLab, see response >>> %s %s \n %s %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'), e.message)) - return False - - if r1.content == b'[]': - logger.debug("Got %s %s, user not found in project." % - (r1.status_code, r1.reason)) - return True - else: - logger.debug("Got %s %s, user found in project." % - (r1.status_code, r1.reason)) - return False - - @staticmethod - def get_git_user(user_email): - if settings.DATABASE_TYPE != 'local': - r1 = None - user_email = user_email.replace("@", "_at_") - getURL = settings.GITLAB_URL + \ - "api/v3/users?username=" + user_email - logger.debug("Get user URL: " + getURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN - try: - r1 = requests.get(getURL, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - counter = 0 - while r1.content == b'[]' and counter <= 60: - logger.info( - "Will try to get gitlab user until " + - "will be response... #%s" % - counter) - time.sleep(session.wait_until_time_pause_long) - r1 = requests.get(getURL, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - counter += 1 - - if r1.content == b'[]': - logger.error("Got an empty user from gitlab.") - raise Exception("Got an empty user from gitlab.") - - logger.debug("Got %s %s and received user data: %s." % - (r1.status_code, r1.reason, r1.content)) - content = r1.json() - return content[0] - except BaseException: - if r1 is None: - logger.error("Failed to get user from APIGitLab.") - else: - logger.error( - "Failed to get user from APIGitLab, see response " + - ">>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - def get_git_user_ssh_key(git_user_id): - r1 = None - getURL = Constants.Default.URL.GitLab.Users.TEXT + \ - str(git_user_id) + "/keys" - logger.debug("Get user URL: " + getURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN - try: - r1 = requests.get(getURL, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - if r1.content == '[]': - logger.error("Got an empty list as a response.") - raise - logger.debug("Got %s %s and received user's public key." % - (r1.status_code, r1.reason)) - content = r1.json() # Change it from list to dict. - gitPubKey = content[0]['key'] - return gitPubKey - except BaseException: - if r1 is None: - logger.error("Failed to get user's public key " + - "from APIGitLab.") - else: - logger.error( - "Failed to get user's public key from APIGitLab, " + - "see response >>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - def git_clone_push(user_content): - if settings.DATABASE_TYPE != 'local': - logger.debug( - "About to push files into project's repository on the " + - "local folder(not over origin).") - try: - user_content['session_token'] = "token " + \ - APIBridge.login_user(Constants.Users.Admin.EMAIL) - used_email_for_actions = Constants.Users.Admin.EMAIL - repo_dir = Constants.Paths.LocalGitFolder.PATH + \ - user_content['vfName'] - if not os.path.exists(repo_dir): - os.makedirs(repo_dir) - logger.debug("Created the following folder: %s" % repo_dir) - # Create pair of keys for user. - user_pub_key = Helper.get_or_create_rsa_key_for_admin() - DBVirtualFunction.add_admin_to_eng_team( - user_content['engagement_uuid']) - # Set SSH Key for the user. - APIBridge.set_ssh(user_content, user_pub_key) - git_user = APIGitLab.get_git_user(used_email_for_actions) - - counter = 0 - git_user_pub_key = None - while user_pub_key != git_user_pub_key and counter < \ - Constants.GitLabConstants.RETRIES_NUMBER: - try: - git_user_pub_key = APIGitLab.get_git_user_ssh_key( - git_user['id']) - except Exception as e: - pass - - counter += 1 - time.sleep(session.wait_until_time_pause) - - # Check that the SSH key was added to user on APIGitLab. - if user_pub_key != git_user_pub_key: - raise Exception( - "The SSH Key received does not equal to the" - " one provided! The key from" - "APIGitLab:\n %s ==<>== %s" % - (git_user_pub_key, user_pub_key)) - - gitRepoURL = "git@gitlab:%s/%s.git" % ( - user_content['engagement_manual_id'], - user_content['vfName']) - logger.debug("Clone repo from: " + gitRepoURL) - APIGitLab.is_gitlab_ready(user_content) - cmd = 'cd ' + repo_dir + \ - '; git config --global user.email \"' + Constants.\ - Users.Admin.EMAIL + \ - '\"; git config --global user.name \"' + \ - Constants.Users.Admin.FULLNAME + '\";' - # Commit all changes. - p = subprocess_popen = subprocess.Popen( - cmd, shell=True, stderr=subprocess.PIPE) - while subprocess_popen is None: - logger.debug( - "waiting to subprocess command to complete...") - APIGitLab.display_output(p) - # Clone project from APIGitLab. - repo = git.Repo.clone_from(gitRepoURL, repo_dir) - logger.debug("Successfully cloned repo to " + repo_dir) - # Create three files (file0, file1, file2) and add them to git - # index. - for i in range(3): - fileName = repo_dir + '/file' + str(i) - with open(fileName, 'w') as content_file: - os.chmod(fileName, 0o600) - content_file.write("Test file " + fileName) - repo.index.add([fileName]) - logger.debug( - fileName + " was created and added to commit list.") - cmd = 'cd ' + repo_dir + \ - '; git commit -a -m \"Create and add 3 files to git.\"' - # Commit all changes. - p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) - APIGitLab.display_output(p) - logger.debug("All files added to commit list.") - cmd = 'cd ' + repo_dir + '; git push' - # Push commit to APIGitLab. - p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) - APIGitLab.display_output(p) - logger.debug("All files were pushed to APIGitLab.") - except Exception as e: - logger.error( - "_-_-_-_-_- Unexpected error in git_clone_push: " + str(e)) - raise Exception(e) - - @staticmethod - def git_push_commit(user_content): - if settings.DATABASE_TYPE != 'local': - logger.debug( - "About to push files into project's repository on APIGitLab") - try: - git_work = '/tmp/git_work/' - repo_dir = git_work + user_content['vfName'] - # Create three files (file0, file1, file2) and add them to git - # index. - for i in range(3): - fileName = repo_dir + '/file' + str(i) - with open(fileName, 'w') as content_file: - os.chmod(fileName, 0o600) - content_file.write("Edit test file " + fileName) - logger.debug(fileName + " was edited.") - cmd = 'cd ' + repo_dir + \ - '; git commit -a -m \"Create and add 3 files to git.\"' - # Commit all changes. - p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) - APIGitLab.display_output(p) - logger.debug("All edited files were committed.") - cmd = 'cd ' + repo_dir + '; git push' - # Push commit to APIGitLab. - p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) - APIGitLab.display_output(p) - logger.debug( - "All edited files were commited and pushed to APIGitLab.") - except Exception as e: - logger.error( - "_-_-_-_-_- Unexpected error in git_push_commit : " + - str(e)) - raise Exception( - "Something went wrong on git_push_commit " + - "function, please check logs.") - - @staticmethod - def is_gitlab_ready(user_content): - path_with_namespace = user_content[ - 'engagement_manual_id'] + "%2F" + user_content['vfName'] - # If admin user is in project repo, it means the project exists as - # well. - cont = APIGitLab.validate_git_project_members( - path_with_namespace, Constants.Users.Admin.EMAIL) - if cont: - logger.debug("Gitlab is ready for action, git repo was found!") - return True - else: - raise Exception( - "Something went wrong while waiting for GitLab.") diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py deleted file mode 100644 index 0b57b00..0000000 --- a/services/api/api_jenkins.py +++ /dev/null @@ -1,93 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -from django.conf import settings -from requests.auth import HTTPBasicAuth -from services.constants import Constants -from services.helper import Helper -from services.session import session -import logging -import requests -import time - - -logger = logging.getLogger('ice-ci.logger') - - -class APIJenkins: - - @staticmethod - def get_jenkins_job(job_name): - r1 = None - counter = 0 - getURL = settings.JENKINS_URL + "job/" + job_name - logger.debug("Get APIJenkins job URL: " + getURL) - try: - r1 = requests.get(getURL, auth=HTTPBasicAuth( - settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD)) - while r1.status_code != 200 and counter <= \ - Constants.GitLabConstants.RETRIES_NUMBER: - r1 = requests.get(getURL, auth=HTTPBasicAuth( - settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD)) - time.sleep(session.wait_until_time_pause) - logger.debug( - "try to get jenkins job (try #%s)" % counter) - counter += 1 - Helper.internal_assert(r1.status_code, 200) - logger.debug("Job was created on APIJenkins!") - except BaseException: - msg = None - - if r1 is None: - msg = "APIJenkins didn't create job for %s" % job_name - else: - msg = "APIJenkins didn't create job for %s, " +\ - "see response >>> %s %s" % ( - job_name, r1.status_code, r1.reason) - - logger.error(msg) - raise Exception(msg) - - @staticmethod - def find_build_num_out_of_jenkins_log(log): - lines_array = log.splitlines() - for line in lines_array: - if Constants.Dashboard.Checklist.JenkinsLog.\ - Modal.Body.BUILD_IDENTIFIER in line: - parts = line.partition('jenkins') - return parts[2] diff --git a/services/api/api_rados.py b/services/api/api_rados.py deleted file mode 100644 index 9a130b1..0000000 --- a/services/api/api_rados.py +++ /dev/null @@ -1,140 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import time - -from boto.s3.connection import S3Connection, OrdinaryCallingFormat -from django.conf import settings - -from rados.rgwa_client_factory import RGWAClientFactory -from services.constants import Constants -from services.logging_service import LoggingServiceFactory -from services.session import session - - -logger = LoggingServiceFactory.get_logger() - - -class APIRados: - - @staticmethod - def get_bucket(name): - """Return the Bucket.""" - boto_conn = RGWAClientFactory.standard() - try: - return boto_conn.lookup(name) - except Exception as e: - logger.error("Problem on get bucket", e) - raise e - - @staticmethod - def get_bucketfor_specific_user(name, access_key_id, secret_access_key): - """Return the Bucket.""" - boto_conn = APIRados.specific_client(access_key_id, secret_access_key) - try: - return boto_conn.lookup(name) - except Exception as e: - logger.error("Problem on get bucket for specific user", e) - raise e - - @staticmethod - def get_bucket_grants(bucket_name): - """Return the Grants.""" - for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): - bucket = APIRados.get_bucket(bucket_name) - if bucket: - break - logger.error("Bucket not found. Retry #%s" % counter+1) - time.sleep(session.wait_until_time_pause_long) - else: - raise Exception("Max retries exceeded, failing test...") - grants = bucket.list_grants() - return grants - - @staticmethod - def is_bucket_ready(bucket_id): - for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): - bucket = APIRados.get_bucket(bucket_id) - if bucket: - break - logger.debug( - "bucket are not ready yet, trying again (%s of %s)" % ( - counter+1, Constants.RGWAConstants.BUCKET_RETRIES_NUMBER)) - time.sleep(session.wait_until_time_pause_long) - else: - raise Exception("Max retries exceeded, failing test...") - logger.debug("bucket are ready to continue!") - return True - - @staticmethod - def users_of_bucket_ready_after_complete(bucket_id, user_name): - for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): - grants = APIRados.get_bucket_grants(bucket_id) - if not any(user_name == g.id for g in grants): - break - time.sleep(session.wait_until_time_pause_long) - else: - raise Exception("Max retries exceeded, failing test...") - return False - logger.debug("users_of_bucket are ready to continue!") - return True - - @staticmethod - def users_of_bucket_ready_after_created(bucket_id, user_uuid): - for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): - grants = APIRados.get_bucket_grants(bucket_id) - if any(user_uuid == g.id for g in grants): - break - time.sleep(session.wait_until_time_pause_long) - else: - raise Exception("Max retries exceeded, failing test...") - logger.debug("users_of_bucket are ready to continue!") - return True - - @staticmethod - def specific_client(access_key_id, secret_access_key): - boto_conn = S3Connection( - host=settings.AWS_S3_HOST, - port=settings.AWS_S3_PORT, - aws_access_key_id=access_key_id, - aws_secret_access_key=secret_access_key, - calling_format=OrdinaryCallingFormat(), - is_secure=True,) - - boto_conn.num_retries = 0 - return boto_conn diff --git a/services/api/api_user.py b/services/api/api_user.py deleted file mode 100644 index 963280e..0000000 --- a/services/api/api_user.py +++ /dev/null @@ -1,330 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import re - -from django.conf import settings -import requests - -from services.api.api_bridge import APIBridge -from services.api.api_gitlab import APIGitLab -from services.constants import Constants, ServiceProvider -from services.helper import Helper -from services.logging_service import LoggingServiceFactory - - -logger = LoggingServiceFactory.get_logger() - - -class APIUser: - - @staticmethod - # Update account API - only adds new SSH key! - def update_account(user_content): - r1 = None - token = APIUser.login_user(user_content['email']) - user_content['session_token'] = 'token ' + token - sshKey = Helper.generate_sshpub_key() - putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account' - logger.debug("Put user URL: " + putURL) - headers = dict() # Create header for put request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - # headers['Authorization'] = user_content['activation_token'] - put_data = dict() # Create JSON data for put request. - user_content['vendor'] = user_content['company']['name'] - if user_content['vendor'] == "AT&T": - put_data['company'] = "AT&T" - else: - put_data['company'] = user_content['vendor'] - put_data['email'] = user_content['email'] - put_data['full_name'] = user_content['full_name'] - put_data['password'] = "" - put_data['phone_number'] = "+1201" + \ - Helper.rand_string("randomNumber", 6) - put_data['ssh_key'] = sshKey - try: - r1 = requests.put( - putURL, json=put_data, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - logger.debug( - "SSH Key was added successfully to user " + - user_content['full_name']) - if not APIBridge.is_gitlab_ready(user_content): - raise - return sshKey - except BaseException: - if r1 is None: - logger.error("Failed to add public SSH key to user.") - else: - logger.error( - "PUT request failed to add SSH key to user, see " + - "response >>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - # Update account API - only adds new SSH key! - def update_account_injec_script(user_content): - r1 = None - putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account' - logger.debug("Put user URL: " + putURL) - headers = dict() # Create header for put request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - put_data = dict() # Create JSON data for put request. - if user_content['vendor'] == "AT&T": - put_data['company'] = "AT&T" - else: - put_data['company'] = user_content['vendor'] - put_data['email'] = user_content['email'] - script = "" - put_data['full_name'] = script - put_data['password'] = "" - put_data['phone_number'] = "+1201" + \ - Helper.rand_string("randomNumber", 6) - try: - r1 = requests.put( - putURL, json=put_data, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - msg = "Testing for Cross site scripting successfully : " + \ - user_content['full_name'] + \ - "stattus Code = " + str(r1.status_code) - logger.debug(msg) - if not APIBridge.is_gitlab_ready(user_content): - raise - return True - except BaseException: - if r1 is None: - logger.error("Failed to add public SSH key to user.") - else: - logger.error( - "PUT request failed to add SSH key to user, " + - "see response >>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - def create_new_user(company=False, activate=False): - signupUrl = settings.EM_REST_URL + "signup/" - signupParams = APIUser.create_signup_param(company) - r1 = requests.post(signupUrl, json=signupParams, verify=False) - if (r1.status_code == 201 or r1.status_code == 200): - logger.debug("Moving to next test case. status=" + - str(r1.status_code)) - pass # Need to break here. - try: - user_data = r1.json() - except Exception as e: - logger.error("=========== json error ========") - logger.error("r1.content = " + str(r1.content) + - ", status=" + str(r1.status_code)) - logger.error("=========== json error end ========") - raise e - if activate: - APIUser.activate_user( - user_data['uuid'], user_data["user"]["activation_token"]) - return user_data - - @staticmethod - def login_user(email): - postUrl = settings.EM_REST_URL + "login" - user_data = dict() # Create JSON data for post request. - user_data['email'] = email - user_data['password'] = "iceusers" - try: - headers = {'Content-type': 'application/json'} - r = requests.post( - postUrl, json=user_data, headers=headers, verify=False) - logger.debug(str(r.status_code) + " " + r.reason) - decoded_response = r.json() - return decoded_response['token'] - except BaseException: - logger.debug("Failed to login.") - raise - - @staticmethod - def set_ssh(user_content, sshKey=None): # Set SSH key to user. - r1 = None - if sshKey is None: - logger.debug("About to generate an ssh key for the user: %s" % - user_content['email']) - sshKey = Helper.generate_sshpub_key() - postURL = settings.ICE_EM_URL + '/v1/engmgr/users/ssh' - logger.debug("Post user URL: " + postURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - post_data = dict() # Create JSON data for post request. - post_data['ssh_key'] = sshKey - try: - r1 = requests.post( - postURL, json=post_data, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - logger.debug( - "SSH Key was added successfully") - if not APIBridge.is_gitlab_ready(user_content): - raise - return sshKey - except BaseException: - if r1 is None: - logger.error("Failed to add public SSH key.") - else: - logger.error( - "POST request failed to add SSH key to user, " + - "see response >>> %s %s" % - (r1.status_code, r1.reason)) - raise - - @staticmethod - def create_signup_param(company=False): - try: # Click on element in UI, by xPath locator. - if not company: - company = ServiceProvider.MainServiceProvider - email_domain = ServiceProvider.email - else: - email_domain = company.lower() + ".com" # - data = { - "company": company, - "full_name": Helper.rand_string("randomString"), - "email": Helper.rand_string("randomString") + - "@" + email_domain, - "phone_number": Constants.Default.Phone.TEXT, - "password": Constants.Default.Password.TEXT, - "regular_email_updates": "True"} - - return data - # If failed - count the failure and add the error to list of errors. - except Exception as e: - errorMsg = "Could not create Sign Up parametrs" - raise Exception(errorMsg, e) - - @staticmethod - def activate_user(userUuid, activationToken): - postUrl = settings.ICE_EM_URL + "/v1/engmgr/users/activate/" + \ - userUuid + "/" + activationToken - logger.debug(postUrl) - r1 = requests.get(postUrl, verify=False) - if (r1.status_code == 200): - logger.debug("APIUser activated successfully!") - return True - else: - raise Exception( - "Failed to activate user >>> %s %s" % - (r1.status_code, r1.reason)) - - @staticmethod - def signup_invited_user( - company, - invited_email, - invite_token, - invite_url, - user_content, - is_contact_user="false", - activate=False, - wait_for_gitlab=True): - r1 = None - postURL = settings.ICE_EM_URL + '/v1/engmgr/signup' - logger.debug("Post signup URL: " + postURL) - if is_contact_user == "true": - fullName = re.sub("http.*full_name=", "", invite_url) - fullName = re.sub("&.*", "", fullName) - logger.debug( - "Invited contact full name is (according to url): " + fullName) - else: - fullName = Helper.rand_string('randomString') - - post_data = dict() # Create JSON data for post request. - post_data['company'] = company - post_data['email'] = invited_email - post_data['full_name'] = fullName - post_data['invitation'] = invite_token - post_data['is_contact_user'] = is_contact_user - post_data['password'] = "iceusers" - post_data['phone_number'] = "+1201" + \ - Helper.rand_string("randomNumber", 6) - post_data['regular_email_updates'] = "False" - post_data['terms'] = "True" - try: - requests.get(invite_url, verify=False) - r1 = requests.post( - postURL, json=post_data, verify=False) - Helper.internal_assert(r1.status_code, 200) - logger.debug("Invited user signed-up successfully!") - - user_data = r1.json() - if activate: - APIUser.activate_user( - user_data['uuid'], user_data["user"]["activation_token"]) - - if wait_for_gitlab: - if not APIBridge.is_gitlab_ready(user_content): - raise - return post_data - except BaseException: - if r1 is None: - logger.error("Failed to sign up the invited team member.") - else: - logger.error( - "POST request failed to sign up the invited " + - "team member, see response >>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - def create_new_user_content(): - user_content = APIBridge.create_engagement() - APIGitLab.git_clone_push(user_content) - APIBridge.frontend_login( - user_content['email'], Constants.Default.Password.TEXT) - vfName = user_content['vfName'] - uuid = user_content['uuid'] - inviteEmail = Helper.rand_invite_email() - newObj = [vfName, uuid, inviteEmail] - return newObj, user_content - - @staticmethod - def create_new_user_content_login_with_api(): - user_content = APIBridge.create_engagement() - APIGitLab.git_clone_push(user_content) - token = "token " + APIBridge.login_user(user_content['el_email']) - user_content['session_token'] = token - return user_content diff --git a/services/api/api_virtual_function.py b/services/api/api_virtual_function.py deleted file mode 100644 index 193d6e2..0000000 --- a/services/api/api_virtual_function.py +++ /dev/null @@ -1,403 +0,0 @@ - -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import datetime -import json -import time - -from django.conf import settings -import requests - -from services.api.api_user import APIUser -from services.constants import Constants, ServiceProvider -from services.database.db_general import DBGeneral -from services.helper import Helper -from services.logging_service import LoggingServiceFactory - - -logger = LoggingServiceFactory.get_logger() - - -class APIVirtualFunction: - - @staticmethod - def add_next_step(user_content, files=[]): - r1 = None - postURL = settings.ICE_EM_URL + '/v1/engmgr/engagements/' + \ - user_content['engagement_uuid'] + '/nextsteps' - logger.debug("Post add next step URL: " + postURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - data = dict() # Create JSON data for post request. - files_list = list() - if isinstance(files, list): - for file in files: - files_list.append(file) - else: - files_list.append(files) - data['files'] = files_list - data['assigneesUuids'] = [user_content['uuid']] - data['duedate'] = str(datetime.date.today()) - data['description'] = "API test - add next step." - list_data = [] - list_data.append(data) - try: - r1 = requests.post( - postURL, json=list_data, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - logger.debug("Next step was added to the engagement!") - ns_uuid = r1.json() - return ns_uuid[0]['uuid'] - except BaseException: - if r1 is None: - logger.error( - "Failed to add next step to VF " + user_content['vfName']) - else: - logger.error("Failed to add next step to VF " + - user_content['vfName'] + - ", see response >>> %s %s.\nContent: %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - def create_vf(token): - r1 = None - postUrl = settings.EM_REST_URL + "vf/" - targetVersion = DBGeneral.select_from( - "uuid", "ice_deployment_target", 1) - ecompRelease = DBGeneral.select_from("uuid", "ice_ecomp_release", 1) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = 'token ' + token - jdata = [ - { - "virtual_function": Helper.rand_string("randomString"), - "version": Helper.rand_string("randomString") + - Helper.rand_string("randomNumber"), - "target_lab_entry_date": time.strftime("%Y-%m-%d"), - "target_aic_uuid": targetVersion, - "ecomp_release": ecompRelease, - "is_service_provider_internal": False}] - try: - r1 = requests.post( - postUrl, json=jdata, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - logger.debug("Virtual Function created successfully!") - content = r1.content[1:-1] - return content - except BaseException: - if r1 is None: - logger.debug("Failed to create VF >>> request failed!") - else: - logger.debug( - "Failed to create VF >>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - def get_engagement(user_content): - r1 = None - postUrl = settings.EM_REST_URL + 'single-engagement/' + \ - str(user_content['engagement_uuid'],) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - try: - r1 = requests.get( - postUrl, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - logger.debug("Retrieved the Engagement successfully!") - content = r1.content - return json.loads(content) - except BaseException: - if r1 is None: - logger.debug( - "Failed to Retrieve the Engagement >>> request failed!") - else: - logger.debug( - "Failed to Retrieve the Engagement >>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - def invite_team_member(user_content): - r1 = None - postURL = settings.ICE_EM_URL + '/v1/engmgr/invite-team-members/' - logger.debug("Post invite user URL: " + postURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - data = dict() # Create JSON data for post request. - data['email'] = Helper.rand_string( - 'randomString') + "@" + ServiceProvider.email - data['eng_uuid'] = user_content['engagement_uuid'] - list_data = [] - list_data.append(data) - try: - r1 = requests.post( - postURL, json=list_data, headers=headers, verify=False) - Helper.internal_assert_boolean(r1.status_code, 200) - logger.debug("Invite sent successfully to email " + data['email']) - invite_token = DBGeneral.select_where_and( - "invitation_token", - "ice_invitation", - "email", - data['email'], - "engagement_uuid", - user_content['engagement_uuid'], - 1) - invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + \ - invite_token + "&email=" + data['email'] - logger.debug("Invitation URL is: " + invite_url) - return data['email'], invite_token, invite_url - except BaseException: - if r1 is None: - logger.error("Failed to invite team member.") - else: - logger.error( - "POST request failed to invite team member, " + - "see response >>> %s %s" % (r1.status_code, r1.reason)) - raise - - @staticmethod - def add_contact(user_content): - r1 = None - postURL = settings.ICE_EM_URL + '/v1/engmgr/add-contact/' - logger.debug("Post invite vendor contact URL: " + postURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - data = dict() # Create JSON data for post request. - data['company'] = user_content['vendor_uuid'] - data['email'] = Helper.rand_string( - 'randomString') + "@" + ServiceProvider.email - data['eng_uuid'] = user_content['engagement_uuid'] - data['full_name'] = Helper.rand_string('randomString') - data['phone_number'] = "+1201" + Helper.rand_string("randomNumber", 6) - try: - r1 = requests.post( - postURL, json=data, headers=headers, verify=False) - Helper.internal_assert_boolean(r1.status_code, 200) - logger.debug("Invite sent successfully to email " + data['email']) - invite_token = DBGeneral.select_where_and( - "invitation_token", - "ice_invitation", - "email", - data['email'], - "engagement_uuid", - user_content['engagement_uuid'], - 1) - invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" +\ - invite_token + "&email=" + data['email'] +\ - "&full_name=" + data['full_name'] + \ - "&phone_number=" + data['phone_number'] + "&company=" + \ - data['company'] + "&is_contact_user=true" - logger.debug("Invitation URL is: " + invite_url) - return data['email'], invite_token, invite_url - except BaseException: - if r1 is None: - logger.error("Failed to invite vendor contact.") - else: - logger.error( - "POST request failed to invite vendor contact, " + - "see response >>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - def edit_next_step(user_content, ns_uuid): - r1 = None - postURL = settings.ICE_EM_URL + '/v1/engmgr/nextsteps/' + ns_uuid + \ - '/engagement/' + user_content['engagement_uuid'] + '/modify/' - logger.debug("Put next step URL: " + postURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = user_content['session_token'] - data = dict() # Create JSON data for post request. - data['files'] = [] - data['assigneesUuids'] = [user_content['uuid']] - data['duedate'] = str(datetime.date.today()) - data['description'] = "API edit next step test " + \ - Helper.rand_string('randomString') - try: - r1 = requests.put( - postURL, json=data, headers=headers, verify=False) - Helper.internal_assert_boolean(r1.status_code, 202) - logger.debug("Next step was edited successfully!") - except BaseException: - if r1 is None: - logger.error("Failed to edit next step uuid: " + ns_uuid) - else: - logger.error( - "Failed to edit next step uuid: " + - ns_uuid + - ", see response >>> %s %s" % - (r1.status_code, - r1.reason)) - raise - - @staticmethod - def get_export_dasboard_excel(token, keywords=""): - postUrl = settings.EM_REST_URL + \ - "engagement/export/?stage=All&keyword=" + keywords - headers = {"Authorization": token} - r1 = requests.get(postUrl, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 200) - if (r1.status_code == 200): - logger.debug("APIUser activated successfully!") - return r1.content - else: - raise Exception( - "Failed to activate user >>> %s %s" % - (r1.status_code, r1.reason)) - return False - - @staticmethod - def create_engagement(wait_for_gitlab=True): - user_content = APIUser.create_new_user() - APIUser.activate_user( - user_content['uuid'], user_content['user']['activation_token']) - token = APIUser.login_user(user_content['email']) - vf_content = json.loads(APIVirtualFunction.create_vf(token)) - user_content['vfName'] = vf_content['name'] - user_content['vf_uuid'] = vf_content['uuid'] - user_content['target_aic'] = vf_content['deployment_target']['version'] - # <-- ECOMP RELEASE - user_content['ecomp_release'] = vf_content[ - 'ecomp_release']['name'] - user_content['vnf_version'] = vf_content['version'] - if(vf_content['vendor']['name'] == "AT&T"): - user_content['vendor'] = "AT&T" - else: - user_content['vendor'] = vf_content['vendor']['name'] - user_content['vendor_uuid'] = vf_content['vendor']['uuid'] - user_content['engagement_manual_id'] = vf_content[ - 'engagement']['engagement_manual_id'] - user_content['target_lab_entry_date'] = vf_content[ - 'target_lab_entry_date'] - user_content['el_email'] = vf_content[ - 'engagement']['reviewer']['email'] - user_content['el_name'] = vf_content[ - 'engagement']['reviewer']['full_name'] - user_content['pr_email'] = vf_content[ - 'engagement']['peer_reviewer']['email'] - user_content['pr_name'] = vf_content[ - 'engagement']['peer_reviewer']['full_name'] - user_content['engagement_uuid'] = vf_content['engagement']['uuid'] - user_content['session_token'] = 'token ' + token - user_content['engagement'] = vf_content['engagement'] - user_content['vfStage'] = vf_content['engagement']['engagement_stage'] - - return user_content - - @staticmethod - def set_eng_stage(user_content, requested_stage): - token = APIUser.login_user(user_content['el_email']) - r1 = None - putUrl = Constants.Default.URL.Engagement.SingleEngagement.TEXT + \ - user_content['engagement_uuid'] + "/stage/" + str(requested_stage) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = 'token ' + token - try: - r1 = requests.put( - putUrl, headers=headers, verify=False) - Helper.internal_assert(r1.status_code, 202) - logger.debug( - "Engagement stage was successfully changed to " + - str(requested_stage) + - "!") - content = r1.content[1:-1] - return content - except BaseException: - if r1 is None: - logger.debug("Failed to set eng stage >>> request failed!") - else: - logger.debug( - "Failed to set eng stage >>> %s %s \n %s" % - (r1.status_code, r1.reason, str( - r1.content, 'utf-8'))) - raise - - @staticmethod - def update_aic_version(eng_uuid, aic_version_uuid, session_token): - r1 = None - putURL = Constants.Default.URL.Engagement.EngagementOperations.TEXT + \ - eng_uuid + '/deployment-targets/' + aic_version_uuid - logger.debug("Put next step URL: " + putURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = session_token - try: - r1 = requests.put(putURL, headers=headers, verify=False) - Helper.internal_assert_boolean(r1.status_code, 200) - logger.debug("AIC version has changed!") - except BaseException: - if r1 is None: - msg = "Failed to edit AIC version" - else: - msg = "Failed to edit AIC version, see response >>> %s %s" % ( - r1.status_code, r1.reason) - raise msg - - @staticmethod - def update_ecomp_release(eng_uuid, ecomp_release_uuid, session_token): - r1 = None - putURL = Constants.Default.URL.Engagement.EngagementOperations.TEXT + \ - eng_uuid + '/ecomp-releases/' + ecomp_release_uuid - logger.debug("Put next step URL: " + putURL) - headers = dict() # Create header for post request. - headers['Content-type'] = 'application/json' - headers['Authorization'] = session_token - try: - r1 = requests.put(putURL, headers=headers, verify=False) - Helper.internal_assert_boolean(r1.status_code, 200) - logger.debug("AIC version has changed!") - except BaseException: - if r1 is None: - msg = "Failed to update ECOMP release" - else: - msg = "Failed to update ECOMP release," +\ - " see response >>> %s %s" % ( - r1.status_code, r1.reason) - raise msg -- cgit 1.2.3-korg