aboutsummaryrefslogtreecommitdiffstats
path: root/services/api
diff options
context:
space:
mode:
Diffstat (limited to 'services/api')
-rw-r--r--services/api/__init__.py38
-rw-r--r--services/api/api_bridge.py79
-rw-r--r--services/api/api_checklist.py251
-rw-r--r--services/api/api_gitlab.py415
-rw-r--r--services/api/api_jenkins.py93
-rw-r--r--services/api/api_rados.py140
-rw-r--r--services/api/api_user.py330
-rw-r--r--services/api/api_virtual_function.py403
8 files changed, 0 insertions, 1749 deletions
diff --git a/services/api/__init__.py b/services/api/__init__.py
deleted file mode 100644
index 32b601a..0000000
--- a/services/api/__init__.py
+++ /dev/null
@@ -1,38 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
diff --git a/services/api/api_bridge.py b/services/api/api_bridge.py
deleted file mode 100644
index d8cb46c..0000000
--- a/services/api/api_bridge.py
+++ /dev/null
@@ -1,79 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-
-class APIBridge:
-
- """
- This class helps to use functions inside
- classes with circular import (dependencies).
- Use this class only when there is circular
- import in one of the API services.
- """
-
- @staticmethod
- def is_gitlab_ready(user_content):
- """is_gitlab_ready: Originally can be found under APIGitLab class."""
- from services.api.api_gitlab import APIGitLab
- return APIGitLab.is_gitlab_ready(user_content)
-
- @staticmethod
- def login_user(email):
- """login_user: Originally can be found under APIUser class."""
- from services.api.api_user import APIUser
- return APIUser.login_user(email)
-
- @staticmethod
- def set_ssh(user_content, sshKey):
- """set_ssh: Originally can be found under APIUser class."""
- from services.api.api_user import APIUser
- return APIUser.set_ssh(user_content, sshKey)
-
- @staticmethod
- def create_engagement(wait_for_gitlab=True):
- """create_engagement: Originally can be found under
- APIVirtualFunction class."""
- from services.api.api_virtual_function import APIVirtualFunction
- return APIVirtualFunction.create_engagement(wait_for_gitlab)
-
- @staticmethod
- def frontend_login(email, password):
- """login: Originally can be found under FEUser class."""
- from services.frontend.fe_user import FEUser
- return FEUser.login(email, password)
diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py
deleted file mode 100644
index 02ab311..0000000
--- a/services/api/api_checklist.py
+++ /dev/null
@@ -1,251 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import datetime
-
-import requests
-
-from services.api.api_gitlab import APIGitLab
-from services.api.api_user import APIUser
-from services.constants import Constants
-from services.database.db_general import DBGeneral
-from services.helper import Helper
-from services.logging_service import LoggingServiceFactory
-from services.database.db_checklist import DBChecklist
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class APIChecklist:
-
- @staticmethod
- def create_checklist(
- user_content,
- files=[
- "file0",
- "file1"],
- return_negative_response=False):
- r1 = None
- postURL = Constants.Default.URL.Checklist.TEXT + \
- user_content['engagement_uuid'] + '/checklist/new/'
- logger.debug("Post create checklist URL: " + postURL)
- headers = dict()
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- data = dict()
- data['checkListAssociatedFiles'] = files
- data['checkListName'] = "checklistAPI" + \
- Helper.rand_string('randomString')
- data['checkListTemplateUuid'] = DBGeneral.select_where(
- "uuid", "ice_checklist_template", "name",
- Constants.Template.Heat.TEXT, 1)
- try:
- if not APIGitLab.is_gitlab_ready(user_content):
- raise Exception(
- "Gitlab is not ready and because " +
- "of that the test is failed.")
-
- r1 = requests.post(postURL, json=data,
- headers=headers, verify=False)
-
- Helper.internal_assert_boolean(r1.status_code, 200)
- logger.debug("Checklist was created successfully!")
- cl_content = r1.json()
- return cl_content
- except BaseException:
- if return_negative_response:
- return r1
- if r1 is None:
- logger.error(
- "Failed to create checklist for VF " +
- user_content['vfName'])
- else:
- logger.error(
- "Failed to create checklist for VF " +
- user_content['vfName'] +
- ", see response >>> %s %s.\nContent: %s" %
- (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def update_checklist(user_content, cl_uuid):
- r1 = None
- postURL = Constants.Default.URL.Checklist.Update.TEXT + '/' + cl_uuid
- logger.debug("Post create checklist URL: " + postURL + '/' + cl_uuid)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- data = dict() # Create JSON data for post request.
- data['checklistUuid'] = cl_uuid
- data['checkListAssociatedFiles'] = ["file1", "file2"]
- data['checkListName'] = "UpdateChecklistAPI" + \
- Helper.rand_string('randomString')
- data['checkListTemplateUuid'] = DBGeneral.select_where(
- "uuid", "ice_checklist_template", "name",
- Constants.Template.Heat.TEXT, 1)
- try:
- r1 = requests.put(
- postURL, json=data, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- logger.debug("DBChecklist was created successfully!")
- cl_content = r1.json()
- return cl_content['uuid']
- except BaseException:
- if r1 is None:
- logger.error(
- "Failed to create checklist for VF " +
- user_content['vfName'])
- else:
- logger.error(
- "Failed to create checklist for VF " +
- user_content['vfName'] +
- ", see response >>> %s %s.\nContent: %s" %
- (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def add_checklist_audit_log(user_content, cl_uuid):
- r1 = None
- postURL = Constants.Default.URL.Checklist.Update + \
- cl_uuid + '/auditlog/'
- logger.debug("Post checklist audit log URL: " + postURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- data = dict() # Create JSON data for post request.
- data['description'] = "API audit log test " + \
- Helper.rand_string('randomString')
- try:
- r1 = requests.post(
- postURL, json=data, headers=headers, verify=False)
- Helper.internal_assert_boolean(r1.status_code, 200)
- logger.debug("Audit log was added successfully!")
- except BaseException:
- if r1 is None:
- logger.error(
- "Failed to add audit log for checklist uuid: " + cl_uuid)
- else:
- logger.error(
- "Failed to add audit log for checklist uuid: " +
- cl_uuid +
- ", see response >>> %s %s.\nContent: %s" %
- (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def add_checklist_next_step(user_content, cl_uuid):
- r1 = None
- postURL = Constants.Default.URL.Checklist.TEXT + \
- user_content['engagement_uuid'] + \
- '/checklist/' + cl_uuid + '/nextstep/'
- logger.debug("Post checklist next step URL: " + postURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- data = dict() # Create JSON data for post request.
- data['files'] = ["file0"]
- data['assigneesUuids'] = [user_content['uuid']]
- data['duedate'] = str(datetime.date.today())
- data['description'] = "API next step test " + \
- Helper.rand_string('randomString')
- list_data = []
- list_data.append(data)
- try:
- r1 = requests.post(
- postURL, json=list_data, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- logger.debug("Next step was added successfully!")
- ns_uuid = r1.json()
- return ns_uuid[0]['uuid']
- except BaseException:
- if r1 is None:
- logger.error(
- "Failed to add next step for checklist uuid: " + cl_uuid)
- else:
- logger.error(
- "Failed to add next step for checklist uuid: " +
- cl_uuid +
- ", see response >>> %s %s.\nContent: %s" %
- (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def jump_state(checklistUUID, engLeadEmail):
- token = APIUser.login_user(engLeadEmail)
- stateURL = Constants.Default.URL.Checklist.Rest.TEXT + \
- str(checklistUUID) + '/state/'
- APIChecklist.go_to_next_state(stateURL, token)
-
- @staticmethod
- def go_to_next_state(url, token):
- headers = {"Authorization": "token " + token}
- body = dict()
- body['description'] = ""
- body['decline'] = "False"
- r = requests.put(url, headers=headers, json=body, verify=False)
- if (r.status_code == 201 or r.status_code == 200):
- logger.debug("go_to_next_state put request result status: %s" %
- r.status_code)
- else:
- logger.error("PUT request failed to change checklist state >>> " +
- str(r.status_code) + " " + r.reason)
- raise Exception("PUT request failed to change checklist state")
-
- @staticmethod
- def move_cl_to_closed(cl_uuid, vf_staff_emails):
- api_checklist_obj = APIChecklist()
- states = [Constants.ChecklistStates.PeerReview.TEXT,
- Constants.ChecklistStates.Approval.TEXT,
- Constants.ChecklistStates.Handoff.TEXT,
- Constants.ChecklistStates.Closed.TEXT]
- for i in range(len(vf_staff_emails)):
- logger.debug(
- "Trying to jump state for %s [%s]" %
- (vf_staff_emails[i], i))
- DBChecklist.update_all_decisions_to_approve(cl_uuid)
- api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i])
- logger.debug("Checking state changed to %s" % states[i])
- DBChecklist.state_changed("uuid", cl_uuid, states[i])
-
- # Move CL to closed state.
- logger.debug("Trying to jump state 'closed' for %s" %
- vf_staff_emails[0])
- api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[0])
- logger.debug("Checking state changed to %s" % states[-1])
- DBChecklist.state_changed("uuid", cl_uuid, states[-1])
diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py
deleted file mode 100644
index 639f67e..0000000
--- a/services/api/api_gitlab.py
+++ /dev/null
@@ -1,415 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import os
-import subprocess
-import sys
-import time
-
-from django.conf import settings
-import git
-import requests
-
-from services.api.api_bridge import APIBridge
-from services.constants import Constants
-from services.database.db_virtual_function import DBVirtualFunction
-from services.helper import Helper
-from services.logging_service import LoggingServiceFactory
-from services.session import session
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class APIGitLab:
-
- @staticmethod
- def display_output(p):
- while True:
- out = p.stderr.read(1)
- if out == b'' and p.poll() is not None:
- break
- if out != '':
- sys.stdout.write(str(out.decode()))
- sys.stdout.flush()
-
- @staticmethod
- def get_git_project(path_with_namespace):
- r1 = None
- getURL = Constants.Default.URL.GitLab.Projects.TEXT + \
- path_with_namespace
- logger.debug("Get project URL: " + getURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
- try:
- r1 = requests.get(getURL, headers=headers, verify=False)
- counter = 0
- while r1.status_code == 404 or r1.content == b'[]' and \
- counter <= Constants.\
- GitLabConstants.RETRIES_NUMBER:
- time.sleep(session.wait_until_time_pause)
- r1 = requests.get(getURL, headers=headers, verify=False)
- logger.debug(
- "trying to get the git project, " +
- "yet to succeed (try #%s)" % counter)
- counter += 1
- Helper.internal_assert(r1.status_code, 200)
- if r1.content == b'[]':
- logger.error("Got an empty list as a response.")
- raise
- logger.debug("Project exists on APIGitLab!")
- content = r1.json() # Change it from list to dict.
- return content
- except BaseException:
- if r1 is None:
- logger.error("Failed to get project from APIGitLab.")
- else:
- logger.error(
- "Failed to get project from APIGitLab, " +
- "see response >>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- def are_all_list_users_registered_as_project_members(
- self, users_emails_list, project_path_with_namespace):
- for email in users_emails_list:
- if not self.validate_git_project_members(
- project_path_with_namespace, email):
- raise Exception(
- "Couldn't find the invited users: " +
- email +
- " in GitLab.")
- logger.debug(
- "Invited user: " + email + " found in GitLab.")
-
- @staticmethod
- def validate_git_project_members(path_with_namespace, user_email):
- if settings.DATABASE_TYPE != 'local':
- r1 = None
- headers = dict()
- git_user = APIGitLab.get_git_user(user_email)
- getURL = Constants.Default.URL.GitLab.Projects.TEXT + \
- path_with_namespace + "/members/" + str(git_user['id'])
- logger.debug("Get project members URL: " + getURL)
- headers['Content-type'] = 'application/json'
- headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
- counter = 0
- while (r1 is None or r1.content == b'[]' or r1.status_code !=
- 200) and counter <= Constants.GitLabConstants.\
- RETRIES_NUMBER:
- logger.debug(
- "try to get git project members (try #%s)" % counter)
- time.sleep(session.wait_until_time_pause)
- try:
- r1 = requests.get(getURL, headers=headers, verify=False)
- counter += 1
- except Exception as e:
- if counter >= Constants.GitLabConstants.RETRIES_NUMBER:
- logger.error(
- "Failed to get project's team members from " +
- "APIGitLab, see response >>> %s %s \n %s %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8'), e.message))
- return False
- if r1.content == b'[]':
- logger.error("Got an empty list as a response.")
- return False
- elif r1.status_code != 200:
- logger.error("Got %s %s." % (r1.status_code, r1.reason))
- return False
- logger.debug("Got %s %s, user found in project." %
- (r1.status_code, r1.reason))
- return True
-
- @staticmethod
- def negative_validate_git_project_member(
- path_with_namespace, user_email, git_user_id):
- if settings.DATABASE_TYPE != 'local':
- r1 = None
- headers = dict()
- getURL = Constants.Default.URL.GitLab.Projects.TEXT + \
- path_with_namespace + "/members/" + git_user_id
- logger.debug("Get project members URL: " + getURL)
- headers['Content-type'] = 'application/json'
- headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
- counter = 0
- while r1 is None or str.encode(
- user_email) not in r1.content and counter <= Constants.\
- GitLabConstants.RETRIES_NUMBER:
- logger.debug(
- "try to get git project members (try #%s)" % counter)
- time.sleep(session.wait_until_time_pause)
- try:
- r1 = requests.get(getURL, headers=headers, verify=False)
- counter += 1
- except Exception as e:
- if counter >= Constants.GitLabConstants.RETRIES_NUMBER:
- logger.error(
- "Failed to get project's team members from " +
- "APIGitLab, see response >>> %s %s \n %s %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8'), e.message))
- return False
-
- if r1.content == b'[]':
- logger.debug("Got %s %s, user not found in project." %
- (r1.status_code, r1.reason))
- return True
- else:
- logger.debug("Got %s %s, user found in project." %
- (r1.status_code, r1.reason))
- return False
-
- @staticmethod
- def get_git_user(user_email):
- if settings.DATABASE_TYPE != 'local':
- r1 = None
- user_email = user_email.replace("@", "_at_")
- getURL = settings.GITLAB_URL + \
- "api/v3/users?username=" + user_email
- logger.debug("Get user URL: " + getURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
- try:
- r1 = requests.get(getURL, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- counter = 0
- while r1.content == b'[]' and counter <= 60:
- logger.info(
- "Will try to get gitlab user until " +
- "will be response... #%s" %
- counter)
- time.sleep(session.wait_until_time_pause_long)
- r1 = requests.get(getURL, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- counter += 1
-
- if r1.content == b'[]':
- logger.error("Got an empty user from gitlab.")
- raise Exception("Got an empty user from gitlab.")
-
- logger.debug("Got %s %s and received user data: %s." %
- (r1.status_code, r1.reason, r1.content))
- content = r1.json()
- return content[0]
- except BaseException:
- if r1 is None:
- logger.error("Failed to get user from APIGitLab.")
- else:
- logger.error(
- "Failed to get user from APIGitLab, see response " +
- ">>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def get_git_user_ssh_key(git_user_id):
- r1 = None
- getURL = Constants.Default.URL.GitLab.Users.TEXT + \
- str(git_user_id) + "/keys"
- logger.debug("Get user URL: " + getURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
- try:
- r1 = requests.get(getURL, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- if r1.content == '[]':
- logger.error("Got an empty list as a response.")
- raise
- logger.debug("Got %s %s and received user's public key." %
- (r1.status_code, r1.reason))
- content = r1.json() # Change it from list to dict.
- gitPubKey = content[0]['key']
- return gitPubKey
- except BaseException:
- if r1 is None:
- logger.error("Failed to get user's public key " +
- "from APIGitLab.")
- else:
- logger.error(
- "Failed to get user's public key from APIGitLab, " +
- "see response >>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def git_clone_push(user_content):
- if settings.DATABASE_TYPE != 'local':
- logger.debug(
- "About to push files into project's repository on the " +
- "local folder(not over origin).")
- try:
- user_content['session_token'] = "token " + \
- APIBridge.login_user(Constants.Users.Admin.EMAIL)
- used_email_for_actions = Constants.Users.Admin.EMAIL
- repo_dir = Constants.Paths.LocalGitFolder.PATH + \
- user_content['vfName']
- if not os.path.exists(repo_dir):
- os.makedirs(repo_dir)
- logger.debug("Created the following folder: %s" % repo_dir)
- # Create pair of keys for user.
- user_pub_key = Helper.get_or_create_rsa_key_for_admin()
- DBVirtualFunction.add_admin_to_eng_team(
- user_content['engagement_uuid'])
- # Set SSH Key for the user.
- APIBridge.set_ssh(user_content, user_pub_key)
- git_user = APIGitLab.get_git_user(used_email_for_actions)
-
- counter = 0
- git_user_pub_key = None
- while user_pub_key != git_user_pub_key and counter < \
- Constants.GitLabConstants.RETRIES_NUMBER:
- try:
- git_user_pub_key = APIGitLab.get_git_user_ssh_key(
- git_user['id'])
- except Exception as e:
- pass
-
- counter += 1
- time.sleep(session.wait_until_time_pause)
-
- # Check that the SSH key was added to user on APIGitLab.
- if user_pub_key != git_user_pub_key:
- raise Exception(
- "The SSH Key received does not equal to the"
- " one provided! The key from"
- "APIGitLab:\n %s ==<>== %s" %
- (git_user_pub_key, user_pub_key))
-
- gitRepoURL = "git@gitlab:%s/%s.git" % (
- user_content['engagement_manual_id'],
- user_content['vfName'])
- logger.debug("Clone repo from: " + gitRepoURL)
- APIGitLab.is_gitlab_ready(user_content)
- cmd = 'cd ' + repo_dir + \
- '; git config --global user.email \"' + Constants.\
- Users.Admin.EMAIL + \
- '\"; git config --global user.name \"' + \
- Constants.Users.Admin.FULLNAME + '\";'
- # Commit all changes.
- p = subprocess_popen = subprocess.Popen(
- cmd, shell=True, stderr=subprocess.PIPE)
- while subprocess_popen is None:
- logger.debug(
- "waiting to subprocess command to complete...")
- APIGitLab.display_output(p)
- # Clone project from APIGitLab.
- repo = git.Repo.clone_from(gitRepoURL, repo_dir)
- logger.debug("Successfully cloned repo to " + repo_dir)
- # Create three files (file0, file1, file2) and add them to git
- # index.
- for i in range(3):
- fileName = repo_dir + '/file' + str(i)
- with open(fileName, 'w') as content_file:
- os.chmod(fileName, 0o600)
- content_file.write("Test file " + fileName)
- repo.index.add([fileName])
- logger.debug(
- fileName + " was created and added to commit list.")
- cmd = 'cd ' + repo_dir + \
- '; git commit -a -m \"Create and add 3 files to git.\"'
- # Commit all changes.
- p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- APIGitLab.display_output(p)
- logger.debug("All files added to commit list.")
- cmd = 'cd ' + repo_dir + '; git push'
- # Push commit to APIGitLab.
- p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- APIGitLab.display_output(p)
- logger.debug("All files were pushed to APIGitLab.")
- except Exception as e:
- logger.error(
- "_-_-_-_-_- Unexpected error in git_clone_push: " + str(e))
- raise Exception(e)
-
- @staticmethod
- def git_push_commit(user_content):
- if settings.DATABASE_TYPE != 'local':
- logger.debug(
- "About to push files into project's repository on APIGitLab")
- try:
- git_work = '/tmp/git_work/'
- repo_dir = git_work + user_content['vfName']
- # Create three files (file0, file1, file2) and add them to git
- # index.
- for i in range(3):
- fileName = repo_dir + '/file' + str(i)
- with open(fileName, 'w') as content_file:
- os.chmod(fileName, 0o600)
- content_file.write("Edit test file " + fileName)
- logger.debug(fileName + " was edited.")
- cmd = 'cd ' + repo_dir + \
- '; git commit -a -m \"Create and add 3 files to git.\"'
- # Commit all changes.
- p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- APIGitLab.display_output(p)
- logger.debug("All edited files were committed.")
- cmd = 'cd ' + repo_dir + '; git push'
- # Push commit to APIGitLab.
- p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- APIGitLab.display_output(p)
- logger.debug(
- "All edited files were commited and pushed to APIGitLab.")
- except Exception as e:
- logger.error(
- "_-_-_-_-_- Unexpected error in git_push_commit : " +
- str(e))
- raise Exception(
- "Something went wrong on git_push_commit " +
- "function, please check logs.")
-
- @staticmethod
- def is_gitlab_ready(user_content):
- path_with_namespace = user_content[
- 'engagement_manual_id'] + "%2F" + user_content['vfName']
- # If admin user is in project repo, it means the project exists as
- # well.
- cont = APIGitLab.validate_git_project_members(
- path_with_namespace, Constants.Users.Admin.EMAIL)
- if cont:
- logger.debug("Gitlab is ready for action, git repo was found!")
- return True
- else:
- raise Exception(
- "Something went wrong while waiting for GitLab.")
diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py
deleted file mode 100644
index 0b57b00..0000000
--- a/services/api/api_jenkins.py
+++ /dev/null
@@ -1,93 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-from django.conf import settings
-from requests.auth import HTTPBasicAuth
-from services.constants import Constants
-from services.helper import Helper
-from services.session import session
-import logging
-import requests
-import time
-
-
-logger = logging.getLogger('ice-ci.logger')
-
-
-class APIJenkins:
-
- @staticmethod
- def get_jenkins_job(job_name):
- r1 = None
- counter = 0
- getURL = settings.JENKINS_URL + "job/" + job_name
- logger.debug("Get APIJenkins job URL: " + getURL)
- try:
- r1 = requests.get(getURL, auth=HTTPBasicAuth(
- settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD))
- while r1.status_code != 200 and counter <= \
- Constants.GitLabConstants.RETRIES_NUMBER:
- r1 = requests.get(getURL, auth=HTTPBasicAuth(
- settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD))
- time.sleep(session.wait_until_time_pause)
- logger.debug(
- "try to get jenkins job (try #%s)" % counter)
- counter += 1
- Helper.internal_assert(r1.status_code, 200)
- logger.debug("Job was created on APIJenkins!")
- except BaseException:
- msg = None
-
- if r1 is None:
- msg = "APIJenkins didn't create job for %s" % job_name
- else:
- msg = "APIJenkins didn't create job for %s, " +\
- "see response >>> %s %s" % (
- job_name, r1.status_code, r1.reason)
-
- logger.error(msg)
- raise Exception(msg)
-
- @staticmethod
- def find_build_num_out_of_jenkins_log(log):
- lines_array = log.splitlines()
- for line in lines_array:
- if Constants.Dashboard.Checklist.JenkinsLog.\
- Modal.Body.BUILD_IDENTIFIER in line:
- parts = line.partition('jenkins')
- return parts[2]
diff --git a/services/api/api_rados.py b/services/api/api_rados.py
deleted file mode 100644
index 9a130b1..0000000
--- a/services/api/api_rados.py
+++ /dev/null
@@ -1,140 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import time
-
-from boto.s3.connection import S3Connection, OrdinaryCallingFormat
-from django.conf import settings
-
-from rados.rgwa_client_factory import RGWAClientFactory
-from services.constants import Constants
-from services.logging_service import LoggingServiceFactory
-from services.session import session
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class APIRados:
-
- @staticmethod
- def get_bucket(name):
- """Return the Bucket."""
- boto_conn = RGWAClientFactory.standard()
- try:
- return boto_conn.lookup(name)
- except Exception as e:
- logger.error("Problem on get bucket", e)
- raise e
-
- @staticmethod
- def get_bucketfor_specific_user(name, access_key_id, secret_access_key):
- """Return the Bucket."""
- boto_conn = APIRados.specific_client(access_key_id, secret_access_key)
- try:
- return boto_conn.lookup(name)
- except Exception as e:
- logger.error("Problem on get bucket for specific user", e)
- raise e
-
- @staticmethod
- def get_bucket_grants(bucket_name):
- """Return the Grants."""
- for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
- bucket = APIRados.get_bucket(bucket_name)
- if bucket:
- break
- logger.error("Bucket not found. Retry #%s" % counter+1)
- time.sleep(session.wait_until_time_pause_long)
- else:
- raise Exception("Max retries exceeded, failing test...")
- grants = bucket.list_grants()
- return grants
-
- @staticmethod
- def is_bucket_ready(bucket_id):
- for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
- bucket = APIRados.get_bucket(bucket_id)
- if bucket:
- break
- logger.debug(
- "bucket are not ready yet, trying again (%s of %s)" % (
- counter+1, Constants.RGWAConstants.BUCKET_RETRIES_NUMBER))
- time.sleep(session.wait_until_time_pause_long)
- else:
- raise Exception("Max retries exceeded, failing test...")
- logger.debug("bucket are ready to continue!")
- return True
-
- @staticmethod
- def users_of_bucket_ready_after_complete(bucket_id, user_name):
- for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
- grants = APIRados.get_bucket_grants(bucket_id)
- if not any(user_name == g.id for g in grants):
- break
- time.sleep(session.wait_until_time_pause_long)
- else:
- raise Exception("Max retries exceeded, failing test...")
- return False
- logger.debug("users_of_bucket are ready to continue!")
- return True
-
- @staticmethod
- def users_of_bucket_ready_after_created(bucket_id, user_uuid):
- for counter in range(Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
- grants = APIRados.get_bucket_grants(bucket_id)
- if any(user_uuid == g.id for g in grants):
- break
- time.sleep(session.wait_until_time_pause_long)
- else:
- raise Exception("Max retries exceeded, failing test...")
- logger.debug("users_of_bucket are ready to continue!")
- return True
-
- @staticmethod
- def specific_client(access_key_id, secret_access_key):
- boto_conn = S3Connection(
- host=settings.AWS_S3_HOST,
- port=settings.AWS_S3_PORT,
- aws_access_key_id=access_key_id,
- aws_secret_access_key=secret_access_key,
- calling_format=OrdinaryCallingFormat(),
- is_secure=True,)
-
- boto_conn.num_retries = 0
- return boto_conn
diff --git a/services/api/api_user.py b/services/api/api_user.py
deleted file mode 100644
index 963280e..0000000
--- a/services/api/api_user.py
+++ /dev/null
@@ -1,330 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import re
-
-from django.conf import settings
-import requests
-
-from services.api.api_bridge import APIBridge
-from services.api.api_gitlab import APIGitLab
-from services.constants import Constants, ServiceProvider
-from services.helper import Helper
-from services.logging_service import LoggingServiceFactory
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class APIUser:
-
- @staticmethod
- # Update account API - only adds new SSH key!
- def update_account(user_content):
- r1 = None
- token = APIUser.login_user(user_content['email'])
- user_content['session_token'] = 'token ' + token
- sshKey = Helper.generate_sshpub_key()
- putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account'
- logger.debug("Put user URL: " + putURL)
- headers = dict() # Create header for put request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- # headers['Authorization'] = user_content['activation_token']
- put_data = dict() # Create JSON data for put request.
- user_content['vendor'] = user_content['company']['name']
- if user_content['vendor'] == "AT&amp;T":
- put_data['company'] = "AT&T"
- else:
- put_data['company'] = user_content['vendor']
- put_data['email'] = user_content['email']
- put_data['full_name'] = user_content['full_name']
- put_data['password'] = ""
- put_data['phone_number'] = "+1201" + \
- Helper.rand_string("randomNumber", 6)
- put_data['ssh_key'] = sshKey
- try:
- r1 = requests.put(
- putURL, json=put_data, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- logger.debug(
- "SSH Key was added successfully to user " +
- user_content['full_name'])
- if not APIBridge.is_gitlab_ready(user_content):
- raise
- return sshKey
- except BaseException:
- if r1 is None:
- logger.error("Failed to add public SSH key to user.")
- else:
- logger.error(
- "PUT request failed to add SSH key to user, see " +
- "response >>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- # Update account API - only adds new SSH key!
- def update_account_injec_script(user_content):
- r1 = None
- putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account'
- logger.debug("Put user URL: " + putURL)
- headers = dict() # Create header for put request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- put_data = dict() # Create JSON data for put request.
- if user_content['vendor'] == "AT&amp;T":
- put_data['company'] = "AT&T"
- else:
- put_data['company'] = user_content['vendor']
- put_data['email'] = user_content['email']
- script = "<script>;</script>"
- put_data['full_name'] = script
- put_data['password'] = ""
- put_data['phone_number'] = "+1201" + \
- Helper.rand_string("randomNumber", 6)
- try:
- r1 = requests.put(
- putURL, json=put_data, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- msg = "Testing for Cross site scripting successfully : " + \
- user_content['full_name'] + \
- "stattus Code = " + str(r1.status_code)
- logger.debug(msg)
- if not APIBridge.is_gitlab_ready(user_content):
- raise
- return True
- except BaseException:
- if r1 is None:
- logger.error("Failed to add public SSH key to user.")
- else:
- logger.error(
- "PUT request failed to add SSH key to user, " +
- "see response >>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def create_new_user(company=False, activate=False):
- signupUrl = settings.EM_REST_URL + "signup/"
- signupParams = APIUser.create_signup_param(company)
- r1 = requests.post(signupUrl, json=signupParams, verify=False)
- if (r1.status_code == 201 or r1.status_code == 200):
- logger.debug("Moving to next test case. status=" +
- str(r1.status_code))
- pass # Need to break here.
- try:
- user_data = r1.json()
- except Exception as e:
- logger.error("=========== json error ========")
- logger.error("r1.content = " + str(r1.content) +
- ", status=" + str(r1.status_code))
- logger.error("=========== json error end ========")
- raise e
- if activate:
- APIUser.activate_user(
- user_data['uuid'], user_data["user"]["activation_token"])
- return user_data
-
- @staticmethod
- def login_user(email):
- postUrl = settings.EM_REST_URL + "login"
- user_data = dict() # Create JSON data for post request.
- user_data['email'] = email
- user_data['password'] = "iceusers"
- try:
- headers = {'Content-type': 'application/json'}
- r = requests.post(
- postUrl, json=user_data, headers=headers, verify=False)
- logger.debug(str(r.status_code) + " " + r.reason)
- decoded_response = r.json()
- return decoded_response['token']
- except BaseException:
- logger.debug("Failed to login.")
- raise
-
- @staticmethod
- def set_ssh(user_content, sshKey=None): # Set SSH key to user.
- r1 = None
- if sshKey is None:
- logger.debug("About to generate an ssh key for the user: %s" %
- user_content['email'])
- sshKey = Helper.generate_sshpub_key()
- postURL = settings.ICE_EM_URL + '/v1/engmgr/users/ssh'
- logger.debug("Post user URL: " + postURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- post_data = dict() # Create JSON data for post request.
- post_data['ssh_key'] = sshKey
- try:
- r1 = requests.post(
- postURL, json=post_data, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- logger.debug(
- "SSH Key was added successfully")
- if not APIBridge.is_gitlab_ready(user_content):
- raise
- return sshKey
- except BaseException:
- if r1 is None:
- logger.error("Failed to add public SSH key.")
- else:
- logger.error(
- "POST request failed to add SSH key to user, " +
- "see response >>> %s %s" %
- (r1.status_code, r1.reason))
- raise
-
- @staticmethod
- def create_signup_param(company=False):
- try: # Click on element in UI, by xPath locator.
- if not company:
- company = ServiceProvider.MainServiceProvider
- email_domain = ServiceProvider.email
- else:
- email_domain = company.lower() + ".com" #
- data = {
- "company": company,
- "full_name": Helper.rand_string("randomString"),
- "email": Helper.rand_string("randomString") +
- "@" + email_domain,
- "phone_number": Constants.Default.Phone.TEXT,
- "password": Constants.Default.Password.TEXT,
- "regular_email_updates": "True"}
-
- return data
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- errorMsg = "Could not create Sign Up parametrs"
- raise Exception(errorMsg, e)
-
- @staticmethod
- def activate_user(userUuid, activationToken):
- postUrl = settings.ICE_EM_URL + "/v1/engmgr/users/activate/" + \
- userUuid + "/" + activationToken
- logger.debug(postUrl)
- r1 = requests.get(postUrl, verify=False)
- if (r1.status_code == 200):
- logger.debug("APIUser activated successfully!")
- return True
- else:
- raise Exception(
- "Failed to activate user >>> %s %s" %
- (r1.status_code, r1.reason))
-
- @staticmethod
- def signup_invited_user(
- company,
- invited_email,
- invite_token,
- invite_url,
- user_content,
- is_contact_user="false",
- activate=False,
- wait_for_gitlab=True):
- r1 = None
- postURL = settings.ICE_EM_URL + '/v1/engmgr/signup'
- logger.debug("Post signup URL: " + postURL)
- if is_contact_user == "true":
- fullName = re.sub("http.*full_name=", "", invite_url)
- fullName = re.sub("&.*", "", fullName)
- logger.debug(
- "Invited contact full name is (according to url): " + fullName)
- else:
- fullName = Helper.rand_string('randomString')
-
- post_data = dict() # Create JSON data for post request.
- post_data['company'] = company
- post_data['email'] = invited_email
- post_data['full_name'] = fullName
- post_data['invitation'] = invite_token
- post_data['is_contact_user'] = is_contact_user
- post_data['password'] = "iceusers"
- post_data['phone_number'] = "+1201" + \
- Helper.rand_string("randomNumber", 6)
- post_data['regular_email_updates'] = "False"
- post_data['terms'] = "True"
- try:
- requests.get(invite_url, verify=False)
- r1 = requests.post(
- postURL, json=post_data, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- logger.debug("Invited user signed-up successfully!")
-
- user_data = r1.json()
- if activate:
- APIUser.activate_user(
- user_data['uuid'], user_data["user"]["activation_token"])
-
- if wait_for_gitlab:
- if not APIBridge.is_gitlab_ready(user_content):
- raise
- return post_data
- except BaseException:
- if r1 is None:
- logger.error("Failed to sign up the invited team member.")
- else:
- logger.error(
- "POST request failed to sign up the invited " +
- "team member, see response >>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def create_new_user_content():
- user_content = APIBridge.create_engagement()
- APIGitLab.git_clone_push(user_content)
- APIBridge.frontend_login(
- user_content['email'], Constants.Default.Password.TEXT)
- vfName = user_content['vfName']
- uuid = user_content['uuid']
- inviteEmail = Helper.rand_invite_email()
- newObj = [vfName, uuid, inviteEmail]
- return newObj, user_content
-
- @staticmethod
- def create_new_user_content_login_with_api():
- user_content = APIBridge.create_engagement()
- APIGitLab.git_clone_push(user_content)
- token = "token " + APIBridge.login_user(user_content['el_email'])
- user_content['session_token'] = token
- return user_content
diff --git a/services/api/api_virtual_function.py b/services/api/api_virtual_function.py
deleted file mode 100644
index 193d6e2..0000000
--- a/services/api/api_virtual_function.py
+++ /dev/null
@@ -1,403 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import datetime
-import json
-import time
-
-from django.conf import settings
-import requests
-
-from services.api.api_user import APIUser
-from services.constants import Constants, ServiceProvider
-from services.database.db_general import DBGeneral
-from services.helper import Helper
-from services.logging_service import LoggingServiceFactory
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class APIVirtualFunction:
-
- @staticmethod
- def add_next_step(user_content, files=[]):
- r1 = None
- postURL = settings.ICE_EM_URL + '/v1/engmgr/engagements/' + \
- user_content['engagement_uuid'] + '/nextsteps'
- logger.debug("Post add next step URL: " + postURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- data = dict() # Create JSON data for post request.
- files_list = list()
- if isinstance(files, list):
- for file in files:
- files_list.append(file)
- else:
- files_list.append(files)
- data['files'] = files_list
- data['assigneesUuids'] = [user_content['uuid']]
- data['duedate'] = str(datetime.date.today())
- data['description'] = "API test - add next step."
- list_data = []
- list_data.append(data)
- try:
- r1 = requests.post(
- postURL, json=list_data, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- logger.debug("Next step was added to the engagement!")
- ns_uuid = r1.json()
- return ns_uuid[0]['uuid']
- except BaseException:
- if r1 is None:
- logger.error(
- "Failed to add next step to VF " + user_content['vfName'])
- else:
- logger.error("Failed to add next step to VF " +
- user_content['vfName'] +
- ", see response >>> %s %s.\nContent: %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def create_vf(token):
- r1 = None
- postUrl = settings.EM_REST_URL + "vf/"
- targetVersion = DBGeneral.select_from(
- "uuid", "ice_deployment_target", 1)
- ecompRelease = DBGeneral.select_from("uuid", "ice_ecomp_release", 1)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = 'token ' + token
- jdata = [
- {
- "virtual_function": Helper.rand_string("randomString"),
- "version": Helper.rand_string("randomString") +
- Helper.rand_string("randomNumber"),
- "target_lab_entry_date": time.strftime("%Y-%m-%d"),
- "target_aic_uuid": targetVersion,
- "ecomp_release": ecompRelease,
- "is_service_provider_internal": False}]
- try:
- r1 = requests.post(
- postUrl, json=jdata, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- logger.debug("Virtual Function created successfully!")
- content = r1.content[1:-1]
- return content
- except BaseException:
- if r1 is None:
- logger.debug("Failed to create VF >>> request failed!")
- else:
- logger.debug(
- "Failed to create VF >>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def get_engagement(user_content):
- r1 = None
- postUrl = settings.EM_REST_URL + 'single-engagement/' + \
- str(user_content['engagement_uuid'],)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- try:
- r1 = requests.get(
- postUrl, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- logger.debug("Retrieved the Engagement successfully!")
- content = r1.content
- return json.loads(content)
- except BaseException:
- if r1 is None:
- logger.debug(
- "Failed to Retrieve the Engagement >>> request failed!")
- else:
- logger.debug(
- "Failed to Retrieve the Engagement >>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def invite_team_member(user_content):
- r1 = None
- postURL = settings.ICE_EM_URL + '/v1/engmgr/invite-team-members/'
- logger.debug("Post invite user URL: " + postURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- data = dict() # Create JSON data for post request.
- data['email'] = Helper.rand_string(
- 'randomString') + "@" + ServiceProvider.email
- data['eng_uuid'] = user_content['engagement_uuid']
- list_data = []
- list_data.append(data)
- try:
- r1 = requests.post(
- postURL, json=list_data, headers=headers, verify=False)
- Helper.internal_assert_boolean(r1.status_code, 200)
- logger.debug("Invite sent successfully to email " + data['email'])
- invite_token = DBGeneral.select_where_and(
- "invitation_token",
- "ice_invitation",
- "email",
- data['email'],
- "engagement_uuid",
- user_content['engagement_uuid'],
- 1)
- invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + \
- invite_token + "&email=" + data['email']
- logger.debug("Invitation URL is: " + invite_url)
- return data['email'], invite_token, invite_url
- except BaseException:
- if r1 is None:
- logger.error("Failed to invite team member.")
- else:
- logger.error(
- "POST request failed to invite team member, " +
- "see response >>> %s %s" % (r1.status_code, r1.reason))
- raise
-
- @staticmethod
- def add_contact(user_content):
- r1 = None
- postURL = settings.ICE_EM_URL + '/v1/engmgr/add-contact/'
- logger.debug("Post invite vendor contact URL: " + postURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- data = dict() # Create JSON data for post request.
- data['company'] = user_content['vendor_uuid']
- data['email'] = Helper.rand_string(
- 'randomString') + "@" + ServiceProvider.email
- data['eng_uuid'] = user_content['engagement_uuid']
- data['full_name'] = Helper.rand_string('randomString')
- data['phone_number'] = "+1201" + Helper.rand_string("randomNumber", 6)
- try:
- r1 = requests.post(
- postURL, json=data, headers=headers, verify=False)
- Helper.internal_assert_boolean(r1.status_code, 200)
- logger.debug("Invite sent successfully to email " + data['email'])
- invite_token = DBGeneral.select_where_and(
- "invitation_token",
- "ice_invitation",
- "email",
- data['email'],
- "engagement_uuid",
- user_content['engagement_uuid'],
- 1)
- invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" +\
- invite_token + "&email=" + data['email'] +\
- "&full_name=" + data['full_name'] + \
- "&phone_number=" + data['phone_number'] + "&company=" + \
- data['company'] + "&is_contact_user=true"
- logger.debug("Invitation URL is: " + invite_url)
- return data['email'], invite_token, invite_url
- except BaseException:
- if r1 is None:
- logger.error("Failed to invite vendor contact.")
- else:
- logger.error(
- "POST request failed to invite vendor contact, " +
- "see response >>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def edit_next_step(user_content, ns_uuid):
- r1 = None
- postURL = settings.ICE_EM_URL + '/v1/engmgr/nextsteps/' + ns_uuid + \
- '/engagement/' + user_content['engagement_uuid'] + '/modify/'
- logger.debug("Put next step URL: " + postURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = user_content['session_token']
- data = dict() # Create JSON data for post request.
- data['files'] = []
- data['assigneesUuids'] = [user_content['uuid']]
- data['duedate'] = str(datetime.date.today())
- data['description'] = "API edit next step test " + \
- Helper.rand_string('randomString')
- try:
- r1 = requests.put(
- postURL, json=data, headers=headers, verify=False)
- Helper.internal_assert_boolean(r1.status_code, 202)
- logger.debug("Next step was edited successfully!")
- except BaseException:
- if r1 is None:
- logger.error("Failed to edit next step uuid: " + ns_uuid)
- else:
- logger.error(
- "Failed to edit next step uuid: " +
- ns_uuid +
- ", see response >>> %s %s" %
- (r1.status_code,
- r1.reason))
- raise
-
- @staticmethod
- def get_export_dasboard_excel(token, keywords=""):
- postUrl = settings.EM_REST_URL + \
- "engagement/export/?stage=All&keyword=" + keywords
- headers = {"Authorization": token}
- r1 = requests.get(postUrl, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 200)
- if (r1.status_code == 200):
- logger.debug("APIUser activated successfully!")
- return r1.content
- else:
- raise Exception(
- "Failed to activate user >>> %s %s" %
- (r1.status_code, r1.reason))
- return False
-
- @staticmethod
- def create_engagement(wait_for_gitlab=True):
- user_content = APIUser.create_new_user()
- APIUser.activate_user(
- user_content['uuid'], user_content['user']['activation_token'])
- token = APIUser.login_user(user_content['email'])
- vf_content = json.loads(APIVirtualFunction.create_vf(token))
- user_content['vfName'] = vf_content['name']
- user_content['vf_uuid'] = vf_content['uuid']
- user_content['target_aic'] = vf_content['deployment_target']['version']
- # <-- ECOMP RELEASE
- user_content['ecomp_release'] = vf_content[
- 'ecomp_release']['name']
- user_content['vnf_version'] = vf_content['version']
- if(vf_content['vendor']['name'] == "AT&amp;T"):
- user_content['vendor'] = "AT&T"
- else:
- user_content['vendor'] = vf_content['vendor']['name']
- user_content['vendor_uuid'] = vf_content['vendor']['uuid']
- user_content['engagement_manual_id'] = vf_content[
- 'engagement']['engagement_manual_id']
- user_content['target_lab_entry_date'] = vf_content[
- 'target_lab_entry_date']
- user_content['el_email'] = vf_content[
- 'engagement']['reviewer']['email']
- user_content['el_name'] = vf_content[
- 'engagement']['reviewer']['full_name']
- user_content['pr_email'] = vf_content[
- 'engagement']['peer_reviewer']['email']
- user_content['pr_name'] = vf_content[
- 'engagement']['peer_reviewer']['full_name']
- user_content['engagement_uuid'] = vf_content['engagement']['uuid']
- user_content['session_token'] = 'token ' + token
- user_content['engagement'] = vf_content['engagement']
- user_content['vfStage'] = vf_content['engagement']['engagement_stage']
-
- return user_content
-
- @staticmethod
- def set_eng_stage(user_content, requested_stage):
- token = APIUser.login_user(user_content['el_email'])
- r1 = None
- putUrl = Constants.Default.URL.Engagement.SingleEngagement.TEXT + \
- user_content['engagement_uuid'] + "/stage/" + str(requested_stage)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = 'token ' + token
- try:
- r1 = requests.put(
- putUrl, headers=headers, verify=False)
- Helper.internal_assert(r1.status_code, 202)
- logger.debug(
- "Engagement stage was successfully changed to " +
- str(requested_stage) +
- "!")
- content = r1.content[1:-1]
- return content
- except BaseException:
- if r1 is None:
- logger.debug("Failed to set eng stage >>> request failed!")
- else:
- logger.debug(
- "Failed to set eng stage >>> %s %s \n %s" %
- (r1.status_code, r1.reason, str(
- r1.content, 'utf-8')))
- raise
-
- @staticmethod
- def update_aic_version(eng_uuid, aic_version_uuid, session_token):
- r1 = None
- putURL = Constants.Default.URL.Engagement.EngagementOperations.TEXT + \
- eng_uuid + '/deployment-targets/' + aic_version_uuid
- logger.debug("Put next step URL: " + putURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = session_token
- try:
- r1 = requests.put(putURL, headers=headers, verify=False)
- Helper.internal_assert_boolean(r1.status_code, 200)
- logger.debug("AIC version has changed!")
- except BaseException:
- if r1 is None:
- msg = "Failed to edit AIC version"
- else:
- msg = "Failed to edit AIC version, see response >>> %s %s" % (
- r1.status_code, r1.reason)
- raise msg
-
- @staticmethod
- def update_ecomp_release(eng_uuid, ecomp_release_uuid, session_token):
- r1 = None
- putURL = Constants.Default.URL.Engagement.EngagementOperations.TEXT + \
- eng_uuid + '/ecomp-releases/' + ecomp_release_uuid
- logger.debug("Put next step URL: " + putURL)
- headers = dict() # Create header for post request.
- headers['Content-type'] = 'application/json'
- headers['Authorization'] = session_token
- try:
- r1 = requests.put(putURL, headers=headers, verify=False)
- Helper.internal_assert_boolean(r1.status_code, 200)
- logger.debug("AIC version has changed!")
- except BaseException:
- if r1 is None:
- msg = "Failed to update ECOMP release"
- else:
- msg = "Failed to update ECOMP release," +\
- " see response >>> %s %s" % (
- r1.status_code, r1.reason)
- raise msg