diff options
author | Edan Binshtok <eb578m@intl.att.com> | 2017-10-04 09:33:23 +0300 |
---|---|---|
committer | Edan Binshtok <eb578m@intl.att.com> | 2017-10-04 09:36:04 +0300 |
commit | f8907f0c4fc0ba4bb97a1d636a50c5b40c2642f2 (patch) | |
tree | 3d04d86910c93e42c055e5ed699ab1919482d5be /services/api | |
parent | 733e00df0a6fa19dd92ec7392966340345dd1885 (diff) |
Initial seed
Initial upload of django test framework
Change-Id: I643a7f4efc52cfafe4cc6d92e3178f36a0c1837c
Issue-Id: VVP-1
Signed-off-by: Edan Binshtok <eb578m@intl.att.com>
Diffstat (limited to 'services/api')
-rw-r--r-- | services/api/__init__.py | 38 | ||||
-rw-r--r-- | services/api/__pycache__/__init__.cpython-36.pyc | bin | 0 -> 140 bytes | |||
-rw-r--r-- | services/api/__pycache__/api_bridge.cpython-36.pyc | bin | 0 -> 1860 bytes | |||
-rw-r--r-- | services/api/__pycache__/api_checklist.cpython-36.pyc | bin | 0 -> 5491 bytes | |||
-rw-r--r-- | services/api/__pycache__/api_gitlab.cpython-36.pyc | bin | 0 -> 10952 bytes | |||
-rw-r--r-- | services/api/__pycache__/api_jenkins.cpython-36.pyc | bin | 0 -> 1191 bytes | |||
-rw-r--r-- | services/api/__pycache__/api_user.cpython-36.pyc | bin | 0 -> 7760 bytes | |||
-rw-r--r-- | services/api/__pycache__/api_virtual_function.cpython-36.pyc | bin | 0 -> 9579 bytes | |||
-rw-r--r-- | services/api/api_bridge.py | 74 | ||||
-rw-r--r-- | services/api/api_checklist.py | 219 | ||||
-rw-r--r-- | services/api/api_gitlab.py | 394 | ||||
-rw-r--r-- | services/api/api_jenkins.py | 81 | ||||
-rw-r--r-- | services/api/api_rados.py | 161 | ||||
-rw-r--r-- | services/api/api_user.py | 308 | ||||
-rw-r--r-- | services/api/api_virtual_function.py | 368 |
15 files changed, 1643 insertions, 0 deletions
diff --git a/services/api/__init__.py b/services/api/__init__.py new file mode 100644 index 0000000..30d7152 --- /dev/null +++ b/services/api/__init__.py @@ -0,0 +1,38 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. diff --git a/services/api/__pycache__/__init__.cpython-36.pyc b/services/api/__pycache__/__init__.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..ece5c7a --- /dev/null +++ b/services/api/__pycache__/__init__.cpython-36.pyc diff --git a/services/api/__pycache__/api_bridge.cpython-36.pyc b/services/api/__pycache__/api_bridge.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..1da6617 --- /dev/null +++ b/services/api/__pycache__/api_bridge.cpython-36.pyc diff --git a/services/api/__pycache__/api_checklist.cpython-36.pyc b/services/api/__pycache__/api_checklist.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..6ea208f --- /dev/null +++ b/services/api/__pycache__/api_checklist.cpython-36.pyc diff --git a/services/api/__pycache__/api_gitlab.cpython-36.pyc b/services/api/__pycache__/api_gitlab.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..d4dedbc --- /dev/null +++ b/services/api/__pycache__/api_gitlab.cpython-36.pyc diff --git a/services/api/__pycache__/api_jenkins.cpython-36.pyc b/services/api/__pycache__/api_jenkins.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..905295c --- /dev/null +++ b/services/api/__pycache__/api_jenkins.cpython-36.pyc diff --git a/services/api/__pycache__/api_user.cpython-36.pyc b/services/api/__pycache__/api_user.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..1893ff4 --- /dev/null +++ b/services/api/__pycache__/api_user.cpython-36.pyc diff --git a/services/api/__pycache__/api_virtual_function.cpython-36.pyc b/services/api/__pycache__/api_virtual_function.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..35349c5 --- /dev/null +++ b/services/api/__pycache__/api_virtual_function.cpython-36.pyc diff --git a/services/api/api_bridge.py b/services/api/api_bridge.py new file mode 100644 index 0000000..8926a1d --- /dev/null +++ b/services/api/api_bridge.py @@ -0,0 +1,74 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +class APIBridge: + + """ + This class helps to use functions inside classes with circular import (dependencies). + Use this class only when there is circular import in one of the API services. + """ + + @staticmethod + def is_gitlab_ready(user_content): + """is_gitlab_ready: Originally can be found under APIGitLab class.""" + from services.api.api_gitlab import APIGitLab + return APIGitLab.is_gitlab_ready(user_content) + + @staticmethod + def login_user(email): + """login_user: Originally can be found under APIUser class.""" + from services.api.api_user import APIUser + return APIUser.login_user(email) + + @staticmethod + def set_ssh(user_content, sshKey): + """set_ssh: Originally can be found under APIUser class.""" + from services.api.api_user import APIUser + return APIUser.set_ssh(user_content, sshKey) + + @staticmethod + def create_engagement(wait_for_gitlab=True): + """create_engagement: Originally can be found under APIVirtualFunction class.""" + from services.api.api_virtual_function import APIVirtualFunction + return APIVirtualFunction.create_engagement(wait_for_gitlab) + + @staticmethod + def frontend_login(email, password): + """login: Originally can be found under FEUser class.""" + from services.frontend.fe_user import FEUser + return FEUser.login(email, password) diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py new file mode 100644 index 0000000..ef7b8a3 --- /dev/null +++ b/services/api/api_checklist.py @@ -0,0 +1,219 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import datetime +import logging + +import requests + +from services.api.api_gitlab import APIGitLab +from services.api.api_user import APIUser +from services.constants import Constants +from services.database.db_general import DBGeneral +from services.helper import Helper +from services.logging_service import LoggingServiceFactory + + +logger = LoggingServiceFactory.get_logger() + + +class APIChecklist: + + @staticmethod + def create_checklist(user_content, files=["file0", "file1"], return_negative_response=False): + r1 = None + postURL = Constants.Default.URL.Checklist.TEXT + \ + user_content['engagement_uuid'] + '/checklist/new/' + logger.debug("Post create checklist URL: " + postURL) + headers = dict() + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + data = dict() + data['checkListAssociatedFiles'] = files + data['checkListName'] = "checklistAPI" + \ + Helper.rand_string('randomString') + data['checkListTemplateUuid'] = DBGeneral.select_where( + "uuid", "ice_checklist_template", "name", Constants.Template.Heat.TEXT, 1) + try: + if not APIGitLab.is_gitlab_ready(user_content): + raise Exception( + "Gitlab is not ready and because of that the test is failed.") + + r1 = requests.post(postURL, json=data, + headers=headers, verify=False) + + Helper.internal_assert_boolean(r1.status_code, 200) + logger.debug("Checklist was created successfully!") + cl_content = r1.json() + return cl_content + except: + if return_negative_response: + return r1 + if r1 is None: + logger.error( + "Failed to create checklist for VF " + user_content['vfName']) + else: + logger.error("Failed to create checklist for VF " + user_content[ + 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def update_checklist(user_content, cl_uuid): + r1 = None + postURL = Constants.Default.URL.Checklist.Update.TEXT + '/' + cl_uuid + logger.debug("Post create checklist URL: " + postURL + '/' + cl_uuid) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + data = dict() # Create JSON data for post request. + data['checklistUuid'] = cl_uuid + data['checkListAssociatedFiles'] = ["file1", "file2"] + data['checkListName'] = "UpdateChecklistAPI" + \ + Helper.rand_string('randomString') + data['checkListTemplateUuid'] = DBGeneral.select_where( + "uuid", "ice_checklist_template", "name", Constants.Template.Heat.TEXT, 1) + try: + r1 = requests.put( + postURL, json=data, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + logger.debug("DBChecklist was created successfully!") + cl_content = r1.json() + return cl_content['uuid'] + except: + if r1 is None: + logger.error( + "Failed to create checklist for VF " + user_content['vfName']) + else: + logger.error("Failed to create checklist for VF " + user_content[ + 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def add_checklist_audit_log(user_content, cl_uuid): + r1 = None + postURL = Constants.Default.URL.Checklist.Update + \ + cl_uuid + '/auditlog/' + logger.debug("Post checklist audit log URL: " + postURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + data = dict() # Create JSON data for post request. + data['description'] = "API audit log test " + \ + Helper.rand_string('randomString') + try: + r1 = requests.post( + postURL, json=data, headers=headers, verify=False) + Helper.internal_assert_boolean(r1.status_code, 200) + logger.debug("Audit log was added successfully!") + except: + if r1 is None: + logger.error( + "Failed to add audit log for checklist uuid: " + cl_uuid) + else: + logger.error("Failed to add audit log for checklist uuid: " + cl_uuid + + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def add_checklist_next_step(user_content, cl_uuid): + r1 = None + postURL = Constants.Default.URL.Checklist.TEXT + \ + user_content['engagement_uuid'] + \ + '/checklist/' + cl_uuid + '/nextstep/' + logger.debug("Post checklist next step URL: " + postURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + data = dict() # Create JSON data for post request. + data['files'] = ["file0"] + data['assigneesUuids'] = [user_content['uuid']] + data['duedate'] = str(datetime.date.today()) + data['description'] = "API next step test " + \ + Helper.rand_string('randomString') + list_data = [] + list_data.append(data) + try: + r1 = requests.post( + postURL, json=list_data, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + logger.debug("Next step was added successfully!") + ns_uuid = r1.json() + return ns_uuid[0]['uuid'] + except: + if r1 is None: + logger.error( + "Failed to add next step for checklist uuid: " + cl_uuid) + else: + logger.error("Failed to add next step for checklist uuid: " + cl_uuid + + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def jump_state(checklistUUID, engLeadEmail): + token = APIUser.login_user(engLeadEmail) + stateURL = Constants.Default.URL.Checklist.Rest.TEXT + \ + str(checklistUUID) + '/state/' + APIChecklist.go_to_next_state(stateURL, token) + + @staticmethod + def go_to_next_state(url, token): + headers = {"Authorization": "token " + token} + body = dict() + body['description'] = "" + body['decline'] = "False" + r = requests.put(url, headers=headers, json=body, verify=False) + if (r.status_code == 201 or r.status_code == 200): + logger.debug("go_to_next_state put request result status: %s" % + r.status_code) + else: + logger.error( + "PUT request failed to change checklist state >>> " + str(r.status_code) + " " + r.reason) + raise Exception("PUT request failed to change checklist state") + + @staticmethod + def move_cl_to_closed(cl_uuid, vf_staff_emails): + api_checklist_obj = APIChecklist() + + for i in range(len(vf_staff_emails)): + logger.debug("Trying to jump state for %s [%s]" % (vf_staff_emails[i], i)) + api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i]) + + # Move CL to closed state. + logger.debug("Trying to jump state 'closed' for %s" % vf_staff_emails[0]) + api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[0]) + diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py new file mode 100644 index 0000000..c7b25e0 --- /dev/null +++ b/services/api/api_gitlab.py @@ -0,0 +1,394 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import logging +import os +import subprocess +import sys +import time + +from django.conf import settings +import git +import requests + +from services.api.api_bridge import APIBridge +from services.constants import Constants +from services.database.db_virtual_function import DBVirtualFunction +from services.helper import Helper +from services.logging_service import LoggingServiceFactory +from services.session import session + + +logger = LoggingServiceFactory.get_logger() + +class APIGitLab: + + @staticmethod + def display_output(p): + while True: + out = p.stderr.read(1) + if out == b'' and p.poll() != None: + break + if out != '': + sys.stdout.write(str(out.decode())) + sys.stdout.flush() + + @staticmethod + def get_git_project(path_with_namespace): + r1 = None + getURL = Constants.Default.URL.GitLab.Projects.TEXT + \ + path_with_namespace + logger.debug("Get project URL: " + getURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN + try: + r1 = requests.get(getURL, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + counter = 0 + while r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + time.sleep(session.wait_until_time_pause) + r1 = requests.get(getURL, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + + if r1.content == b'[]': + logger.error("Got an empty list as a response.") + raise + logger.debug("Project exists on APIGitLab!") + content = r1.json() # Change it from list to dict. + return content + except: + if r1 is None: + logger.error("Failed to get project from APIGitLab.") + else: + logger.error("Failed to get project from APIGitLab, see response >>> %s %s \n %s" + % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + def are_all_list_users_registered_as_project_members(self, users_emails_list, project_path_with_namespace): + for email in users_emails_list: + if not self.validate_git_project_members(project_path_with_namespace, email): + raise Exception( + "Couldn't find the invited users: " + email + " in GitLab.") + logger.debug( + "Invited user: " + email + " found in GitLab.") + + @staticmethod + def validate_git_project_members(path_with_namespace, user_email): + if settings.DATABASE_TYPE != 'local': + r1 = None + headers = dict() + git_user = APIGitLab.get_git_user(user_email) + getURL = Constants.Default.URL.GitLab.Projects.TEXT + \ + path_with_namespace + "/members/" + str(git_user['id']) + logger.debug("Get project members URL: " + getURL) + headers['Content-type'] = 'application/json' + headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN + counter = 0 + while (r1 is None or r1.content == b'[]' or r1.status_code != 200) and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + logger.debug( + "try to get git project members (try #%s)" % counter) + time.sleep(session.wait_until_time_pause) + try: + r1 = requests.get(getURL, headers=headers, verify=False) + counter += 1 + except Exception as e: + if counter >= Constants.GitLabConstants.RETRIES_NUMBER: + logger.error("Failed to get project's team members from APIGitLab, see response >>> %s %s \n %s %s" + % (r1.status_code, r1.reason, str(r1.content, 'utf-8'), e.message)) + return False + if r1.content == b'[]': + logger.error("Got an empty list as a response.") + return False + elif r1.status_code != 200: + logger.error("Got %s %s." % (r1.status_code, r1.reason)) + return False + logger.debug("Got %s %s, user found in project." % + (r1.status_code, r1.reason)) + return True + + @staticmethod + def negative_validate_git_project_member(path_with_namespace, user_email, git_user_id): + if settings.DATABASE_TYPE != 'local': + r1 = None + headers = dict() + getURL = Constants.Default.URL.GitLab.Projects.TEXT + \ + path_with_namespace + "/members/" + git_user_id + logger.debug("Get project members URL: " + getURL) + headers['Content-type'] = 'application/json' + headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN + counter = 0 + while r1 is None or str.encode(user_email) not in r1.content and counter <= Constants.GitLabConstants.RETRIES_NUMBER: + logger.debug( + "try to get git project members (try #%s)" % counter) + time.sleep(session.wait_until_time_pause) + try: + r1 = requests.get(getURL, headers=headers, verify=False) + counter += 1 + except Exception as e: + if counter >= Constants.GitLabConstants.RETRIES_NUMBER: + logger.error("Failed to get project's team members from APIGitLab, see response >>> %s %s \n %s %s" + % (r1.status_code, r1.reason, str(r1.content, 'utf-8'), e.message)) + return False + + if r1.content == b'[]': + logger.debug("Got %s %s, user not found in project." % + (r1.status_code, r1.reason)) + return True + else: + logger.debug("Got %s %s, user found in project." % + (r1.status_code, r1.reason)) + return False + + @staticmethod + def get_git_user(user_email): + if settings.DATABASE_TYPE != 'local': + r1 = None + user_email = user_email.replace("@", "_at_") + getURL = settings.GITLAB_URL + \ + "api/v3/users?username=" + user_email + logger.debug("Get user URL: " + getURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN + try: + r1 = requests.get(getURL, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + counter = 0 + while r1.content == b'[]' and counter <= 60: + logger.info( + "Will try to get gitlab user until will be response... #%s" % counter) + time.sleep(session.wait_until_time_pause_long) + r1 = requests.get(getURL, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + counter += 1 + + if r1.content == b'[]': + logger.error("Got an empty user from gitlab.") + raise Exception("Got an empty user from gitlab.") + + logger.debug("Got %s %s and received user data: %s." % + (r1.status_code, r1.reason, r1.content)) + content = r1.json() + return content[0] + except: + if r1 is None: + logger.error("Failed to get user from APIGitLab.") + else: + logger.error("Failed to get user from APIGitLab, see response >>> %s %s \n %s" + % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def get_git_user_ssh_key(git_user_id): + r1 = None + getURL = Constants.Default.URL.GitLab.Users.TEXT + \ + str(git_user_id) + "/keys" + logger.debug("Get user URL: " + getURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN + try: + r1 = requests.get(getURL, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + if r1.content == '[]': + logger.error("Got an empty list as a response.") + raise + logger.debug("Got %s %s and received user's public key." % + (r1.status_code, r1.reason)) + content = r1.json() # Change it from list to dict. + gitPubKey = content[0]['key'] + return gitPubKey + except: + if r1 is None: + logger.error("Failed to get user's public key from APIGitLab.") + else: + logger.error("Failed to get user's public key from APIGitLab, see response >>> %s %s \n %s" + % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def git_clone_push(user_content): + if settings.DATABASE_TYPE != 'local': + logger.debug( + "About to push files into project's repository on the local folder(not over origin).") + try: + user_content['session_token'] = "token " + \ + APIBridge.login_user(Constants.Users.Admin.EMAIL) + used_email_for_actions = Constants.Users.Admin.EMAIL + repo_dir = Constants.Paths.LocalGitFolder.PATH + \ + user_content['vfName'] + if not os.path.exists(repo_dir): + os.makedirs(repo_dir) + logger.debug("Created the following folder: %s" % repo_dir) + # Create pair of keys for user. + user_pub_key = Helper.get_or_create_rsa_key_for_admin() + DBVirtualFunction.add_admin_to_eng_team( + user_content['engagement_uuid']) + # Set SSH Key for the user. + APIBridge.set_ssh(user_content, user_pub_key) + git_user = APIGitLab.get_git_user(used_email_for_actions) + + counter = 0 + git_user_pub_key = None + while user_pub_key != git_user_pub_key and counter < Constants.GitLabConstants.RETRIES_NUMBER: + try: + git_user_pub_key = APIGitLab.get_git_user_ssh_key( + git_user['id']) + except Exception as e: + pass + + counter += 1 + time.sleep(session.wait_until_time_pause) + + # Check that the SSH key was added to user on APIGitLab. + if user_pub_key != git_user_pub_key: + raise Exception("The SSH Key received does not equal to the" + " one provided! The key from" + "APIGitLab:\n %s ==<>== %s" + % (git_user_pub_key, user_pub_key)) + + gitRepoURL = "git@gitlab:%s/%s.git" % ( + user_content['engagement_manual_id'], user_content['vfName']) + logger.debug("Clone repo from: " + gitRepoURL) + APIGitLab.is_gitlab_ready(user_content) + cmd = 'cd ' + repo_dir + \ + '; git config --global user.email \"' + Constants.Users.Admin.EMAIL + \ + '\"; git config --global user.name \"' + \ + Constants.Users.Admin.FULLNAME + '\";' + # Commit all changes. + p = subprocess_popen = subprocess.Popen( + cmd, shell=True, stderr=subprocess.PIPE) + while subprocess_popen is None: + logger.debug( + "waiting to subprocess command to complete...") + APIGitLab.display_output(p) + # Clone project from APIGitLab. + repo = git.Repo.clone_from(gitRepoURL, repo_dir) + logger.debug("Successfully cloned repo to " + repo_dir) + # Create three files (file0, file1, file2) and add them to git + # index. + for i in range(3): + fileName = repo_dir + '/file' + str(i) + with open(fileName, 'w') as content_file: + os.chmod(fileName, 0o600) + content_file.write("Test file " + fileName) + repo.index.add([fileName]) + logger.debug( + fileName + " was created and added to commit list.") + cmd = 'cd ' + repo_dir + \ + '; git commit -a -m \"Create and add 3 files to git.\"' + # Commit all changes. + p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) + APIGitLab.display_output(p) + logger.debug("All files added to commit list.") + cmd = 'cd ' + repo_dir + '; git push' + # Push commit to APIGitLab. + p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) + APIGitLab.display_output(p) + logger.debug("All files were pushed to APIGitLab.") + except Exception as e: + logger.error( + "_-_-_-_-_- Unexpected error in git_clone_push: " + str(e)) + raise Exception(e) + + @staticmethod + def git_push_commit(user_content): + if settings.DATABASE_TYPE != 'local': + logger.debug( + "About to push files into project's repository on APIGitLab") + try: + git_work = '/tmp/git_work/' + repo_dir = git_work + user_content['vfName'] + # Create three files (file0, file1, file2) and add them to git + # index. + for i in range(3): + fileName = repo_dir + '/file' + str(i) + with open(fileName, 'w') as content_file: + os.chmod(fileName, 0o600) + content_file.write("Edit test file " + fileName) + logger.debug(fileName + " was edited.") + cmd = 'cd ' + repo_dir + \ + '; git commit -a -m \"Create and add 3 files to git.\"' + # Commit all changes. + p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) + APIGitLab.display_output(p) + logger.debug("All edited files were committed.") + cmd = 'cd ' + repo_dir + '; git push' + # Push commit to APIGitLab. + p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) + APIGitLab.display_output(p) + logger.debug( + "All edited files were commited and pushed to APIGitLab.") + except Exception as e: + logger.error( + "_-_-_-_-_- Unexpected error in git_push_commit : " + str(e)) + raise Exception( + "Something went wrong on git_push_commit function, please check logs.") + + @staticmethod + def is_gitlab_ready(user_content): + counter = 1 + gettURL = settings.ICE_EM_URL + '/v1/engmgr/engagement/' + \ + user_content['engagement_uuid'] + '/checklist/new/' + logger.debug( + "Get URL to check if GitLab and Jenkins are ready: " + gettURL) + # Validate with EL + token = "token " + APIBridge.login_user(user_content['el_email']) + headers = dict() # Create header for get request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = token + r1 = requests.get(gettURL, headers=headers, verify=False) + while (r1.content == b'"Create New checklist is not ready yet"' and counter <= + Constants.GitLabConstants.RETRIES_NUMBER): + time.sleep(session.wait_until_time_pause_long) + logger.debug( + "GitLab and Jenkins are not ready yet, trying again (%s of %s)" % + (counter, Constants.GitLabConstants.RETRIES_NUMBER)) + r1 = requests.get(gettURL, headers=headers, verify=False) + counter += 1 + if r1.status_code != 200: + if r1.content == "Create New checklist is not ready yet": + raise Exception("Max retries exceeded, failing test...") + else: + raise Exception("Something went wrong while waiting for GitLab and Jenkins. %s %s" % ( + r1.status_code, r1.reason)) + return False + elif r1.status_code == 200: + logger.debug("Gitlab and Jenkins are ready to continue!") + return True diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py new file mode 100644 index 0000000..e1e1f6e --- /dev/null +++ b/services/api/api_jenkins.py @@ -0,0 +1,81 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from django.conf import settings +import requests +from requests.auth import HTTPBasicAuth + +from services.constants import Constants +from services.helper import Helper +from services.logging_service import LoggingServiceFactory + + +logger = LoggingServiceFactory.get_logger() + +class APIJenkins: + + @staticmethod + def get_jenkins_job(job_name): + r1 = None + getURL = settings.JENKINS_URL + "job/" + job_name + logger.debug("Get APIJenkins job URL: " + getURL) + try: + r1 = requests.get(getURL, auth=HTTPBasicAuth( + settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD)) + Helper.internal_assert(r1.status_code, 200) + logger.debug("Job was created on APIJenkins!") + except: + msg = None + + if r1 is None: + msg = "APIJenkins didn't create job for %s" % job_name + else: + msg = "APIJenkins didn't create job for %s, see response >>> %s %s" % ( + job_name, r1.status_code, r1.reason) + + logger.error(msg) + raise Exception(msg) + + @staticmethod + def find_build_num_out_of_jenkins_log(log): + lines_array = log.splitlines() + for line in lines_array: + if Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.BUILD_IDENTIFIER in line: + parts = line.partition('jenkins') + return parts[2] + diff --git a/services/api/api_rados.py b/services/api/api_rados.py new file mode 100644 index 0000000..61cfa5c --- /dev/null +++ b/services/api/api_rados.py @@ -0,0 +1,161 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import logging +import time + +from boto.s3.connection import S3Connection, OrdinaryCallingFormat +from django.conf import settings + +from rados.rgwa_client_factory import RGWAClientFactory +from services.constants import Constants +from services.logging_service import LoggingServiceFactory +from services.session import session + + +logger = LoggingServiceFactory.get_logger() + +class APIRados: + + @staticmethod + def get_bucket(name): + """Return the Bucket.""" + boto_conn = RGWAClientFactory.standard() + try: + return boto_conn.lookup(name) + except Exception as e: + logger.error("Problem on get bucket", e) + raise e + + @staticmethod + def get_bucketfor_specific_user(name, access_key_id, secret_access_key): + """Return the Bucket.""" + boto_conn = APIRados.specific_client(access_key_id, secret_access_key) + try: + return boto_conn.lookup(name) + except Exception as e: + logger.error("Problem on get bucket for specific user", e) + raise e + + @staticmethod + def get_bucket_grants(bucket_name): + """Return the Grants.""" + counter = 1 + bucket = APIRados.get_bucket(bucket_name) + while not bucket and counter <= Constants.RGWAConstants.BUCKET_RETRIES_NUMBER: + logger.error("Bucket not found. Retry #%s" % counter) + time.sleep(session.wait_until_time_pause_long) + bucket = APIRados.get_bucket(bucket_name) + counter += 1 + if not bucket: + raise TimeoutError("Max retries exceeded, failing test...") + grants = bucket.list_grants() + print("***********grants=", grants) + return grants + + @staticmethod + def is_bucket_ready(bucket_id): + counter = 1 + bucket = APIRados.get_bucket(bucket_id) + while (bucket == None and counter <= + Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): + time.sleep(session.wait_until_time_pause_long) + logger.debug( + "bucket are not ready yet, trying again (%s of 180)" % counter) + bucket = APIRados.get_bucket(bucket_id) + counter += 1 + print("****_+__+bucket= ", str(bucket)) + time.sleep(session.wait_until_time_pause_long) + if bucket == None: + raise TimeoutError("Max retries exceeded, failing test...") + elif bucket != None: + logger.debug("bucket are ready to continue!") + return True + + @staticmethod + def users_of_bucket_ready_after_complete(bucket_id, user_name): + grants = APIRados.get_bucket_grants(bucket_id) + count = 0 + counter = 1 + while (count != 0 and counter <= + Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): + grants = APIRados.get_bucket_grants(bucket_id) + time.sleep(session.wait_until_time_pause_long) + for g in grants: + if g.id == user_name: + count = +1 + time.sleep(session.wait_until_time_pause_long) + if count != 0: + raise Exception("Max retries exceeded, failing test...") + return False + elif count == 0: + logger.debug("users_of_bucket are ready to continue!") + return True + + @staticmethod + def users_of_bucket_ready_after_created(bucket_id, user_name): + grants = APIRados.get_bucket_grants(bucket_id) + count = 0 + counter = 1 + while (count == 0 and counter <= + Constants.RGWAConstants.BUCKET_RETRIES_NUMBER): + grants = APIRados.get_bucket_grants(bucket_id) + time.sleep(session.wait_until_time_pause_long) + for g in grants: + if g.id == user_name: + count = +1 + time.sleep(session.wait_until_time_pause_long) + if count == 0: + raise Exception("Max retries exceeded, failing test...") + return False + elif count > 0: + logger.debug("users_of_bucket are ready to continue!") + return True + + @staticmethod + def specific_client(access_key_id, secret_access_key): + boto_conn = S3Connection( + host=settings.AWS_S3_HOST, + port=settings.AWS_S3_PORT, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + calling_format=OrdinaryCallingFormat(), + is_secure=True,) + + boto_conn.num_retries = 0 + return boto_conn diff --git a/services/api/api_user.py b/services/api/api_user.py new file mode 100644 index 0000000..3e38fd2 --- /dev/null +++ b/services/api/api_user.py @@ -0,0 +1,308 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import re + +from django.conf import settings +import requests + +from services.api.api_bridge import APIBridge +from services.api.api_gitlab import APIGitLab +from services.constants import Constants, ServiceProvider +from services.helper import Helper +from services.logging_service import LoggingServiceFactory + + +logger = LoggingServiceFactory.get_logger() + +class APIUser: + + @staticmethod + # Update account API - only adds new SSH key! + def update_account(user_content): + r1 = None + token = APIUser.login_user(user_content['email']) + user_content['session_token'] = 'token ' + token + sshKey = Helper.generate_sshpub_key() + putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account' + logger.debug("Put user URL: " + putURL) + headers = dict() # Create header for put request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + # headers['Authorization'] = user_content['activation_token'] + put_data = dict() # Create JSON data for put request. + user_content['vendor'] = user_content['company']['name'] + if user_content['vendor'] == "AT&T": + put_data['company'] = "AT&T" + else: + put_data['company'] = user_content['vendor'] + put_data['email'] = user_content['email'] + put_data['full_name'] = user_content['full_name'] + put_data['password'] = "" + put_data['phone_number'] = "+1201" + \ + Helper.rand_string("randomNumber", 6) + put_data['ssh_key'] = sshKey + try: + r1 = requests.put( + putURL, json=put_data, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + logger.debug( + "SSH Key was added successfully to user " + user_content['full_name']) + if not APIBridge.is_gitlab_ready(user_content): + raise + return sshKey + except: + if r1 is None: + logger.error("Failed to add public SSH key to user.") + else: + logger.error("PUT request failed to add SSH key to user, see response >>> %s %s \n %s" % ( + r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + # Update account API - only adds new SSH key! + def update_account_injec_script(user_content): + r1 = None + putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account' + logger.debug("Put user URL: " + putURL) + headers = dict() # Create header for put request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + put_data = dict() # Create JSON data for put request. + if user_content['vendor'] == "AT&T": + put_data['company'] = "AT&T" + else: + put_data['company'] = user_content['vendor'] + put_data['email'] = user_content['email'] + script = "<script>;</script>" + put_data['full_name'] = script + put_data['password'] = "" + put_data['phone_number'] = "+1201" + \ + Helper.rand_string("randomNumber", 6) + try: + r1 = requests.put( + putURL, json=put_data, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + msg = "Testing for Cross site scripting successfully : " + \ + user_content['full_name'] + \ + "stattus Code = " + str(r1.status_code) + logger.debug(msg) + if not APIBridge.is_gitlab_ready(user_content): + raise + return True + except: + if r1 is None: + logger.error("Failed to add public SSH key to user.") + else: + logger.error("PUT request failed to add SSH key to user, see response >>> %s %s \n %s" % ( + r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def create_new_user(company=False, activate=False): + signupUrl = settings.EM_REST_URL + "signup/" + signupParams = APIUser.create_signup_param(company) + r1 = requests.post(signupUrl, json=signupParams, verify=False) + if (r1.status_code == 201 or r1.status_code == 200): + logger.debug("Moving to next test case. status=" + + str(r1.status_code)) + pass # Need to break here. + try: + user_data = r1.json() + except Exception as e: + logger.error("=========== json error ========") + logger.error("r1.content = " + str(r1.content) + + ", status=" + str(r1.status_code)) + logger.error("=========== json error end ========") + raise e + if activate: + APIUser.activate_user( + user_data['uuid'], user_data["user"]["activation_token"]) + return user_data + + @staticmethod + def login_user(email): + postUrl = settings.EM_REST_URL + "login" + user_data = dict() # Create JSON data for post request. + user_data['email'] = email + user_data['password'] = "iceusers" + try: + headers = {'Content-type': 'application/json'} + r = requests.post( + postUrl, json=user_data, headers=headers, verify=False) + logger.debug(str(r.status_code) + " " + r.reason) + decoded_response = r.json() + return decoded_response['token'] + except: + logger.debug("Failed to login.") + raise + + @staticmethod + def set_ssh(user_content, sshKey=None): # Set SSH key to user. + r1 = None + if sshKey is None: + logger.debug("About to generate an ssh key for the user: %s" % + user_content['email']) + sshKey = Helper.generate_sshpub_key() + postURL = settings.ICE_EM_URL + '/v1/engmgr/users/ssh' + logger.debug("Post user URL: " + postURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + post_data = dict() # Create JSON data for post request. + post_data['ssh_key'] = sshKey + try: + r1 = requests.post( + postURL, json=post_data, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + logger.debug( + "SSH Key was added successfully") + if not APIBridge.is_gitlab_ready(user_content): + raise + return sshKey + except: + if r1 is None: + logger.error("Failed to add public SSH key.") + else: + logger.error( + "POST request failed to add SSH key to user, see response >>> %s %s" % (r1.status_code, r1.reason)) + raise + + @staticmethod + def create_signup_param(company=False): + try: # Click on element in UI, by xPath locator. + if not company: + company = ServiceProvider.MainServiceProvider + email_domain = ServiceProvider.email + else: + email_domain = company.lower() + ".com" # + data = { + "company": company, + "full_name": Helper.rand_string("randomString"), + "email": Helper.rand_string("randomString") + "@" + email_domain, + "phone_number": Constants.Default.Phone.TEXT, + "password": Constants.Default.Password.TEXT, + "regular_email_updates": "True" + } + + return data + # If failed - count the failure and add the error to list of errors. + except Exception as e: + errorMsg = "Could not create Sign Up parametrs" + raise Exception(errorMsg, e) + + @staticmethod + def activate_user(userUuid, activationToken): + postUrl = settings.ICE_EM_URL + "/v1/engmgr/users/activate/" + \ + userUuid + "/" + activationToken + logger.debug(postUrl) + r1 = requests.get(postUrl, verify=False) + if (r1.status_code == 200): + logger.debug("APIUser activated successfully!") + return True + else: + raise Exception( + "Failed to activate user >>> %s %s" % (r1.status_code, r1.reason)) + + @staticmethod + def signup_invited_user(company, invited_email, invite_token, invite_url, user_content, is_contact_user="false", activate=False, wait_for_gitlab=True): + r1 = None + postURL = settings.ICE_EM_URL + '/v1/engmgr/signup' + logger.debug("Post signup URL: " + postURL) + if is_contact_user == "true": + fullName = re.sub("http.*full_name=", "", invite_url) + fullName = re.sub("&.*", "", fullName) + logger.debug( + "Invited contact full name is (according to url): " + fullName) + else: + fullName = Helper.rand_string('randomString') + + post_data = dict() # Create JSON data for post request. + post_data['company'] = company + post_data['email'] = invited_email + post_data['full_name'] = fullName + post_data['invitation'] = invite_token + post_data['is_contact_user'] = is_contact_user + post_data['password'] = "iceusers" + post_data['phone_number'] = "+1201" + \ + Helper.rand_string("randomNumber", 6) + post_data['regular_email_updates'] = "False" + post_data['terms'] = "True" + try: + requests.get(invite_url, verify=False) + r1 = requests.post( + postURL, json=post_data, verify=False) + Helper.internal_assert(r1.status_code, 200) + logger.debug("Invited user signed-up successfully!") + + user_data = r1.json() + if activate: + APIUser.activate_user( + user_data['uuid'], user_data["user"]["activation_token"]) + + if wait_for_gitlab: + if not APIBridge.is_gitlab_ready(user_content): + raise + return post_data + except: + if r1 is None: + logger.error("Failed to sign up the invited team member.") + else: + logger.error("POST request failed to sign up the invited team member, see response >>> %s %s \n %s" % ( + r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def create_new_user_content(): + user_content = APIBridge.create_engagement() + APIGitLab.git_clone_push(user_content) + APIBridge.frontend_login( + user_content['email'], Constants.Default.Password.TEXT) + vfName = user_content['vfName'] + uuid = user_content['uuid'] + inviteEmail = Helper.rand_invite_email() + newObj = [vfName, uuid, inviteEmail] + return newObj, user_content + + @staticmethod + def create_new_user_content_login_with_api(): + user_content = APIBridge.create_engagement() + APIGitLab.git_clone_push(user_content) + token = "token " + APIBridge.login_user(user_content['el_email']) + user_content['session_token'] = token + return user_content diff --git a/services/api/api_virtual_function.py b/services/api/api_virtual_function.py new file mode 100644 index 0000000..46610e8 --- /dev/null +++ b/services/api/api_virtual_function.py @@ -0,0 +1,368 @@ + +# ============LICENSE_START========================================== +# org.onap.vvp/test-engine +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import datetime +import json +import time + +from django.conf import settings +import requests + +from services.api.api_gitlab import APIGitLab +from services.api.api_user import APIUser +from services.constants import Constants, ServiceProvider +from services.database.db_general import DBGeneral +from services.helper import Helper +from services.logging_service import LoggingServiceFactory + + +logger = LoggingServiceFactory.get_logger() + + +class APIVirtualFunction: + + @staticmethod + def add_next_step(user_content, files=[]): + r1 = None + postURL = settings.ICE_EM_URL + '/v1/engmgr/engagements/' + \ + user_content['engagement_uuid'] + '/nextsteps' + logger.debug("Post add next step URL: " + postURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + data = dict() # Create JSON data for post request. + files_list = list() + if type(files) is list: + for file in files: + files_list.append(file) + else: + files_list.append(files) + data['files'] = files_list + data['assigneesUuids'] = [user_content['uuid']] + data['duedate'] = str(datetime.date.today()) + data['description'] = "API test - add next step." + list_data = [] + list_data.append(data) + try: + r1 = requests.post( + postURL, json=list_data, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + logger.debug("Next step was added to the engagement!") + ns_uuid = r1.json() + return ns_uuid[0]['uuid'] + except: + if r1 is None: + logger.error( + "Failed to add next step to VF " + user_content['vfName']) + else: + logger.error("Failed to add next step to VF " + user_content[ + 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def create_vf(token): + r1 = None + postUrl = settings.EM_REST_URL + "vf/" + targetVersion = DBGeneral.select_from( + "uuid", "ice_deployment_target", 1) + ecompRelease = DBGeneral.select_from("uuid", "ice_ecomp_release", 1) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = 'token ' + token + jdata = [{"virtual_function": Helper.rand_string("randomString"), + "version": Helper.rand_string("randomString") + Helper.rand_string("randomNumber"), + "target_lab_entry_date": time.strftime("%Y-%m-%d"), + "target_aic_uuid": targetVersion, + "ecomp_release": ecompRelease, + "is_service_provider_internal": False}] + try: + r1 = requests.post( + postUrl, json=jdata, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + logger.debug("Virtual Function created successfully!") + content = r1.content[1:-1] + return content + except: + if r1 is None: + logger.debug("Failed to create VF >>> request failed!") + else: + logger.debug( + "Failed to create VF >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def get_engagement(user_content): + r1 = None + postUrl = settings.EM_REST_URL + 'single-engagement/' + \ + str(user_content['engagement_uuid'],) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + try: + r1 = requests.get( + postUrl, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + logger.debug("Retrieved the Engagement successfully!") + content = r1.content + return json.loads(content) + except: + if r1 is None: + logger.debug( + "Failed to Retrieve the Engagement >>> request failed!") + else: + logger.debug( + "Failed to Retrieve the Engagement >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def invite_team_member(user_content): + r1 = None + postURL = settings.ICE_EM_URL + '/v1/engmgr/invite-team-members/' + logger.debug("Post invite user URL: " + postURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + data = dict() # Create JSON data for post request. + data['email'] = Helper.rand_string( + 'randomString') + "@" + ServiceProvider.email + data['eng_uuid'] = user_content['engagement_uuid'] + list_data = [] + list_data.append(data) + try: + r1 = requests.post( + postURL, json=list_data, headers=headers, verify=False) + Helper.internal_assert_boolean(r1.status_code, 200) + logger.debug("Invite sent successfully to email " + data['email']) + invite_token = DBGeneral.select_where_and("invitation_token", "ice_invitation", "email", data[ + 'email'], "engagement_uuid", user_content['engagement_uuid'], 1) + invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + invite_token + \ + "&email=" + data['email'] + logger.debug("Invitation URL is: " + invite_url) + return data['email'], invite_token, invite_url + except: + if r1 is None: + logger.error("Failed to invite team member.") + else: + logger.error( + "POST request failed to invite team member, see response >>> %s %s" % (r1.status_code, r1.reason)) + raise + + @staticmethod + def add_contact(user_content): + r1 = None + postURL = settings.ICE_EM_URL + '/v1/engmgr/add-contact/' + logger.debug("Post invite vendor contact URL: " + postURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + data = dict() # Create JSON data for post request. + data['company'] = user_content['vendor_uuid'] + data['email'] = Helper.rand_string( + 'randomString') + "@" + ServiceProvider.email + data['eng_uuid'] = user_content['engagement_uuid'] + data['full_name'] = Helper.rand_string('randomString') + data['phone_number'] = "+1201" + Helper.rand_string("randomNumber", 6) + try: + r1 = requests.post( + postURL, json=data, headers=headers, verify=False) + Helper.internal_assert_boolean(r1.status_code, 200) + logger.debug("Invite sent successfully to email " + data['email']) + invite_token = DBGeneral.select_where_and("invitation_token", "ice_invitation", "email", data[ + 'email'], "engagement_uuid", user_content['engagement_uuid'], 1) + invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + invite_token + \ + "&email=" + data['email'] + "&full_name=" + data['full_name'] + \ + "&phone_number=" + \ + data['phone_number'] + "&company=" + \ + data['company'] + "&is_contact_user=true" + logger.debug("Invitation URL is: " + invite_url) + return data['email'], invite_token, invite_url + except: + if r1 is None: + logger.error("Failed to invite vendor contact.") + else: + logger.error("POST request failed to invite vendor contact, see response >>> %s %s \n %s" % ( + r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def edit_next_step(user_content, ns_uuid): + r1 = None + postURL = settings.ICE_EM_URL + '/v1/engmgr/nextsteps/' + ns_uuid + \ + '/engagement/' + user_content['engagement_uuid'] + '/modify/' + logger.debug("Put next step URL: " + postURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = user_content['session_token'] + data = dict() # Create JSON data for post request. + data['files'] = [] + data['assigneesUuids'] = [user_content['uuid']] + data['duedate'] = str(datetime.date.today()) + data['description'] = "API edit next step test " + \ + Helper.rand_string('randomString') + try: + r1 = requests.put( + postURL, json=data, headers=headers, verify=False) + Helper.internal_assert_boolean(r1.status_code, 202) + logger.debug("Next step was edited successfully!") + except: + if r1 is None: + logger.error("Failed to edit next step uuid: " + ns_uuid) + else: + logger.error("Failed to edit next step uuid: " + ns_uuid + + ", see response >>> %s %s" % (r1.status_code, r1.reason)) + raise + + @staticmethod + def get_export_dasboard_excel(token, keywords=""): + postUrl = settings.EM_REST_URL + \ + "engagement/export/?stage=All&keyword=" + keywords + headers = {"Authorization": token} + r1 = requests.get(postUrl, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 200) + if (r1.status_code == 200): + logger.debug("APIUser activated successfully!") + return r1.content + else: + raise Exception( + "Failed to activate user >>> %s %s" % (r1.status_code, r1.reason)) + return False + + @staticmethod + def create_engagement(wait_for_gitlab=True): + user_content = APIUser.create_new_user() + APIUser.activate_user( + user_content['uuid'], user_content['user']['activation_token']) + token = APIUser.login_user(user_content['email']) + vf_content = json.loads(APIVirtualFunction.create_vf(token)) + user_content['vfName'] = vf_content['name'] + user_content['vf_uuid'] = vf_content['uuid'] + user_content['target_aic'] = vf_content['deployment_target']['version'] + # <-- ECOMP RELEASE + user_content['ecomp_release'] = vf_content[ + 'ecomp_release']['name'] + user_content['vnf_version'] = vf_content['version'] + if(vf_content['vendor']['name'] == "AT&T"): + user_content['vendor'] = "AT&T" + else: + user_content['vendor'] = vf_content['vendor']['name'] + user_content['vendor_uuid'] = vf_content['vendor']['uuid'] + user_content['engagement_manual_id'] = vf_content[ + 'engagement']['engagement_manual_id'] + user_content['target_lab_entry_date'] = vf_content[ + 'target_lab_entry_date'] + user_content['el_email'] = vf_content[ + 'engagement']['reviewer']['email'] + user_content['el_name'] = vf_content[ + 'engagement']['reviewer']['full_name'] + user_content['pr_email'] = vf_content[ + 'engagement']['peer_reviewer']['email'] + user_content['pr_name'] = vf_content[ + 'engagement']['peer_reviewer']['full_name'] + user_content['engagement_uuid'] = vf_content['engagement']['uuid'] + user_content['session_token'] = 'token ' + token + user_content['engagement'] = vf_content['engagement'] + user_content['vfStage'] = vf_content['engagement']['engagement_stage'] + + return user_content + + @staticmethod + def set_eng_stage(user_content, requested_stage): + token = APIUser.login_user(user_content['el_email']) + r1 = None + putUrl = Constants.Default.URL.Engagement.SingleEngagement.TEXT + \ + user_content['engagement_uuid'] + "/stage/" + str(requested_stage) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = 'token ' + token + try: + r1 = requests.put( + putUrl, headers=headers, verify=False) + Helper.internal_assert(r1.status_code, 202) + logger.debug( + "Engagement stage was successfully changed to " + str(requested_stage) + "!") + content = r1.content[1:-1] + return content + except: + if r1 is None: + logger.debug("Failed to set eng stage >>> request failed!") + else: + logger.debug( + "Failed to set eng stage >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8'))) + raise + + @staticmethod + def update_aic_version(eng_uuid, aic_version_uuid, session_token): + r1 = None + putURL = Constants.Default.URL.Engagement.EngagementOperations.TEXT + \ + eng_uuid + '/deployment-targets/' + aic_version_uuid + logger.debug("Put next step URL: " + putURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = session_token + try: + r1 = requests.put(putURL, headers=headers, verify=False) + Helper.internal_assert_boolean(r1.status_code, 200) + logger.debug("AIC version has changed!") + except: + if r1 is None: + msg = "Failed to edit AIC version" + else: + msg = "Failed to edit AIC version, see response >>> %s %s" % ( + r1.status_code, r1.reason) + raise msg + + @staticmethod + def update_ecomp_release(eng_uuid, ecomp_release_uuid, session_token): + r1 = None + putURL = Constants.Default.URL.Engagement.EngagementOperations.TEXT + \ + eng_uuid + '/ecomp-releases/' + ecomp_release_uuid + logger.debug("Put next step URL: " + putURL) + headers = dict() # Create header for post request. + headers['Content-type'] = 'application/json' + headers['Authorization'] = session_token + try: + r1 = requests.put(putURL, headers=headers, verify=False) + Helper.internal_assert_boolean(r1.status_code, 200) + logger.debug("AIC version has changed!") + except: + if r1 is None: + msg = "Failed to update ECOMP release" + else: + msg = "Failed to update ECOMP release, see response >>> %s %s" % ( + r1.status_code, r1.reason) + raise msg |