summaryrefslogtreecommitdiffstats
path: root/services/api
diff options
context:
space:
mode:
authorEdan Binshtok <eb578m@intl.att.com>2017-10-04 09:33:23 +0300
committerEdan Binshtok <eb578m@intl.att.com>2017-10-04 09:36:04 +0300
commitf8907f0c4fc0ba4bb97a1d636a50c5b40c2642f2 (patch)
tree3d04d86910c93e42c055e5ed699ab1919482d5be /services/api
parent733e00df0a6fa19dd92ec7392966340345dd1885 (diff)
Initial seed
Initial upload of django test framework Change-Id: I643a7f4efc52cfafe4cc6d92e3178f36a0c1837c Issue-Id: VVP-1 Signed-off-by: Edan Binshtok <eb578m@intl.att.com>
Diffstat (limited to 'services/api')
-rw-r--r--services/api/__init__.py38
-rw-r--r--services/api/__pycache__/__init__.cpython-36.pycbin0 -> 140 bytes
-rw-r--r--services/api/__pycache__/api_bridge.cpython-36.pycbin0 -> 1860 bytes
-rw-r--r--services/api/__pycache__/api_checklist.cpython-36.pycbin0 -> 5491 bytes
-rw-r--r--services/api/__pycache__/api_gitlab.cpython-36.pycbin0 -> 10952 bytes
-rw-r--r--services/api/__pycache__/api_jenkins.cpython-36.pycbin0 -> 1191 bytes
-rw-r--r--services/api/__pycache__/api_user.cpython-36.pycbin0 -> 7760 bytes
-rw-r--r--services/api/__pycache__/api_virtual_function.cpython-36.pycbin0 -> 9579 bytes
-rw-r--r--services/api/api_bridge.py74
-rw-r--r--services/api/api_checklist.py219
-rw-r--r--services/api/api_gitlab.py394
-rw-r--r--services/api/api_jenkins.py81
-rw-r--r--services/api/api_rados.py161
-rw-r--r--services/api/api_user.py308
-rw-r--r--services/api/api_virtual_function.py368
15 files changed, 1643 insertions, 0 deletions
diff --git a/services/api/__init__.py b/services/api/__init__.py
new file mode 100644
index 0000000..30d7152
--- /dev/null
+++ b/services/api/__init__.py
@@ -0,0 +1,38 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
diff --git a/services/api/__pycache__/__init__.cpython-36.pyc b/services/api/__pycache__/__init__.cpython-36.pyc
new file mode 100644
index 0000000..ece5c7a
--- /dev/null
+++ b/services/api/__pycache__/__init__.cpython-36.pyc
Binary files differ
diff --git a/services/api/__pycache__/api_bridge.cpython-36.pyc b/services/api/__pycache__/api_bridge.cpython-36.pyc
new file mode 100644
index 0000000..1da6617
--- /dev/null
+++ b/services/api/__pycache__/api_bridge.cpython-36.pyc
Binary files differ
diff --git a/services/api/__pycache__/api_checklist.cpython-36.pyc b/services/api/__pycache__/api_checklist.cpython-36.pyc
new file mode 100644
index 0000000..6ea208f
--- /dev/null
+++ b/services/api/__pycache__/api_checklist.cpython-36.pyc
Binary files differ
diff --git a/services/api/__pycache__/api_gitlab.cpython-36.pyc b/services/api/__pycache__/api_gitlab.cpython-36.pyc
new file mode 100644
index 0000000..d4dedbc
--- /dev/null
+++ b/services/api/__pycache__/api_gitlab.cpython-36.pyc
Binary files differ
diff --git a/services/api/__pycache__/api_jenkins.cpython-36.pyc b/services/api/__pycache__/api_jenkins.cpython-36.pyc
new file mode 100644
index 0000000..905295c
--- /dev/null
+++ b/services/api/__pycache__/api_jenkins.cpython-36.pyc
Binary files differ
diff --git a/services/api/__pycache__/api_user.cpython-36.pyc b/services/api/__pycache__/api_user.cpython-36.pyc
new file mode 100644
index 0000000..1893ff4
--- /dev/null
+++ b/services/api/__pycache__/api_user.cpython-36.pyc
Binary files differ
diff --git a/services/api/__pycache__/api_virtual_function.cpython-36.pyc b/services/api/__pycache__/api_virtual_function.cpython-36.pyc
new file mode 100644
index 0000000..35349c5
--- /dev/null
+++ b/services/api/__pycache__/api_virtual_function.cpython-36.pyc
Binary files differ
diff --git a/services/api/api_bridge.py b/services/api/api_bridge.py
new file mode 100644
index 0000000..8926a1d
--- /dev/null
+++ b/services/api/api_bridge.py
@@ -0,0 +1,74 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+class APIBridge:
+
+ """
+ This class helps to use functions inside classes with circular import (dependencies).
+ Use this class only when there is circular import in one of the API services.
+ """
+
+ @staticmethod
+ def is_gitlab_ready(user_content):
+ """is_gitlab_ready: Originally can be found under APIGitLab class."""
+ from services.api.api_gitlab import APIGitLab
+ return APIGitLab.is_gitlab_ready(user_content)
+
+ @staticmethod
+ def login_user(email):
+ """login_user: Originally can be found under APIUser class."""
+ from services.api.api_user import APIUser
+ return APIUser.login_user(email)
+
+ @staticmethod
+ def set_ssh(user_content, sshKey):
+ """set_ssh: Originally can be found under APIUser class."""
+ from services.api.api_user import APIUser
+ return APIUser.set_ssh(user_content, sshKey)
+
+ @staticmethod
+ def create_engagement(wait_for_gitlab=True):
+ """create_engagement: Originally can be found under APIVirtualFunction class."""
+ from services.api.api_virtual_function import APIVirtualFunction
+ return APIVirtualFunction.create_engagement(wait_for_gitlab)
+
+ @staticmethod
+ def frontend_login(email, password):
+ """login: Originally can be found under FEUser class."""
+ from services.frontend.fe_user import FEUser
+ return FEUser.login(email, password)
diff --git a/services/api/api_checklist.py b/services/api/api_checklist.py
new file mode 100644
index 0000000..ef7b8a3
--- /dev/null
+++ b/services/api/api_checklist.py
@@ -0,0 +1,219 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import datetime
+import logging
+
+import requests
+
+from services.api.api_gitlab import APIGitLab
+from services.api.api_user import APIUser
+from services.constants import Constants
+from services.database.db_general import DBGeneral
+from services.helper import Helper
+from services.logging_service import LoggingServiceFactory
+
+
+logger = LoggingServiceFactory.get_logger()
+
+
+class APIChecklist:
+
+ @staticmethod
+ def create_checklist(user_content, files=["file0", "file1"], return_negative_response=False):
+ r1 = None
+ postURL = Constants.Default.URL.Checklist.TEXT + \
+ user_content['engagement_uuid'] + '/checklist/new/'
+ logger.debug("Post create checklist URL: " + postURL)
+ headers = dict()
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ data = dict()
+ data['checkListAssociatedFiles'] = files
+ data['checkListName'] = "checklistAPI" + \
+ Helper.rand_string('randomString')
+ data['checkListTemplateUuid'] = DBGeneral.select_where(
+ "uuid", "ice_checklist_template", "name", Constants.Template.Heat.TEXT, 1)
+ try:
+ if not APIGitLab.is_gitlab_ready(user_content):
+ raise Exception(
+ "Gitlab is not ready and because of that the test is failed.")
+
+ r1 = requests.post(postURL, json=data,
+ headers=headers, verify=False)
+
+ Helper.internal_assert_boolean(r1.status_code, 200)
+ logger.debug("Checklist was created successfully!")
+ cl_content = r1.json()
+ return cl_content
+ except:
+ if return_negative_response:
+ return r1
+ if r1 is None:
+ logger.error(
+ "Failed to create checklist for VF " + user_content['vfName'])
+ else:
+ logger.error("Failed to create checklist for VF " + user_content[
+ 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def update_checklist(user_content, cl_uuid):
+ r1 = None
+ postURL = Constants.Default.URL.Checklist.Update.TEXT + '/' + cl_uuid
+ logger.debug("Post create checklist URL: " + postURL + '/' + cl_uuid)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ data = dict() # Create JSON data for post request.
+ data['checklistUuid'] = cl_uuid
+ data['checkListAssociatedFiles'] = ["file1", "file2"]
+ data['checkListName'] = "UpdateChecklistAPI" + \
+ Helper.rand_string('randomString')
+ data['checkListTemplateUuid'] = DBGeneral.select_where(
+ "uuid", "ice_checklist_template", "name", Constants.Template.Heat.TEXT, 1)
+ try:
+ r1 = requests.put(
+ postURL, json=data, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ logger.debug("DBChecklist was created successfully!")
+ cl_content = r1.json()
+ return cl_content['uuid']
+ except:
+ if r1 is None:
+ logger.error(
+ "Failed to create checklist for VF " + user_content['vfName'])
+ else:
+ logger.error("Failed to create checklist for VF " + user_content[
+ 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def add_checklist_audit_log(user_content, cl_uuid):
+ r1 = None
+ postURL = Constants.Default.URL.Checklist.Update + \
+ cl_uuid + '/auditlog/'
+ logger.debug("Post checklist audit log URL: " + postURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ data = dict() # Create JSON data for post request.
+ data['description'] = "API audit log test " + \
+ Helper.rand_string('randomString')
+ try:
+ r1 = requests.post(
+ postURL, json=data, headers=headers, verify=False)
+ Helper.internal_assert_boolean(r1.status_code, 200)
+ logger.debug("Audit log was added successfully!")
+ except:
+ if r1 is None:
+ logger.error(
+ "Failed to add audit log for checklist uuid: " + cl_uuid)
+ else:
+ logger.error("Failed to add audit log for checklist uuid: " + cl_uuid +
+ ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def add_checklist_next_step(user_content, cl_uuid):
+ r1 = None
+ postURL = Constants.Default.URL.Checklist.TEXT + \
+ user_content['engagement_uuid'] + \
+ '/checklist/' + cl_uuid + '/nextstep/'
+ logger.debug("Post checklist next step URL: " + postURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ data = dict() # Create JSON data for post request.
+ data['files'] = ["file0"]
+ data['assigneesUuids'] = [user_content['uuid']]
+ data['duedate'] = str(datetime.date.today())
+ data['description'] = "API next step test " + \
+ Helper.rand_string('randomString')
+ list_data = []
+ list_data.append(data)
+ try:
+ r1 = requests.post(
+ postURL, json=list_data, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ logger.debug("Next step was added successfully!")
+ ns_uuid = r1.json()
+ return ns_uuid[0]['uuid']
+ except:
+ if r1 is None:
+ logger.error(
+ "Failed to add next step for checklist uuid: " + cl_uuid)
+ else:
+ logger.error("Failed to add next step for checklist uuid: " + cl_uuid +
+ ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def jump_state(checklistUUID, engLeadEmail):
+ token = APIUser.login_user(engLeadEmail)
+ stateURL = Constants.Default.URL.Checklist.Rest.TEXT + \
+ str(checklistUUID) + '/state/'
+ APIChecklist.go_to_next_state(stateURL, token)
+
+ @staticmethod
+ def go_to_next_state(url, token):
+ headers = {"Authorization": "token " + token}
+ body = dict()
+ body['description'] = ""
+ body['decline'] = "False"
+ r = requests.put(url, headers=headers, json=body, verify=False)
+ if (r.status_code == 201 or r.status_code == 200):
+ logger.debug("go_to_next_state put request result status: %s" %
+ r.status_code)
+ else:
+ logger.error(
+ "PUT request failed to change checklist state >>> " + str(r.status_code) + " " + r.reason)
+ raise Exception("PUT request failed to change checklist state")
+
+ @staticmethod
+ def move_cl_to_closed(cl_uuid, vf_staff_emails):
+ api_checklist_obj = APIChecklist()
+
+ for i in range(len(vf_staff_emails)):
+ logger.debug("Trying to jump state for %s [%s]" % (vf_staff_emails[i], i))
+ api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[i])
+
+ # Move CL to closed state.
+ logger.debug("Trying to jump state 'closed' for %s" % vf_staff_emails[0])
+ api_checklist_obj.jump_state(cl_uuid, vf_staff_emails[0])
+
diff --git a/services/api/api_gitlab.py b/services/api/api_gitlab.py
new file mode 100644
index 0000000..c7b25e0
--- /dev/null
+++ b/services/api/api_gitlab.py
@@ -0,0 +1,394 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import logging
+import os
+import subprocess
+import sys
+import time
+
+from django.conf import settings
+import git
+import requests
+
+from services.api.api_bridge import APIBridge
+from services.constants import Constants
+from services.database.db_virtual_function import DBVirtualFunction
+from services.helper import Helper
+from services.logging_service import LoggingServiceFactory
+from services.session import session
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class APIGitLab:
+
+ @staticmethod
+ def display_output(p):
+ while True:
+ out = p.stderr.read(1)
+ if out == b'' and p.poll() != None:
+ break
+ if out != '':
+ sys.stdout.write(str(out.decode()))
+ sys.stdout.flush()
+
+ @staticmethod
+ def get_git_project(path_with_namespace):
+ r1 = None
+ getURL = Constants.Default.URL.GitLab.Projects.TEXT + \
+ path_with_namespace
+ logger.debug("Get project URL: " + getURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
+ try:
+ r1 = requests.get(getURL, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ counter = 0
+ while r1.content == b'[]' and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ time.sleep(session.wait_until_time_pause)
+ r1 = requests.get(getURL, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+
+ if r1.content == b'[]':
+ logger.error("Got an empty list as a response.")
+ raise
+ logger.debug("Project exists on APIGitLab!")
+ content = r1.json() # Change it from list to dict.
+ return content
+ except:
+ if r1 is None:
+ logger.error("Failed to get project from APIGitLab.")
+ else:
+ logger.error("Failed to get project from APIGitLab, see response >>> %s %s \n %s"
+ % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ def are_all_list_users_registered_as_project_members(self, users_emails_list, project_path_with_namespace):
+ for email in users_emails_list:
+ if not self.validate_git_project_members(project_path_with_namespace, email):
+ raise Exception(
+ "Couldn't find the invited users: " + email + " in GitLab.")
+ logger.debug(
+ "Invited user: " + email + " found in GitLab.")
+
+ @staticmethod
+ def validate_git_project_members(path_with_namespace, user_email):
+ if settings.DATABASE_TYPE != 'local':
+ r1 = None
+ headers = dict()
+ git_user = APIGitLab.get_git_user(user_email)
+ getURL = Constants.Default.URL.GitLab.Projects.TEXT + \
+ path_with_namespace + "/members/" + str(git_user['id'])
+ logger.debug("Get project members URL: " + getURL)
+ headers['Content-type'] = 'application/json'
+ headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
+ counter = 0
+ while (r1 is None or r1.content == b'[]' or r1.status_code != 200) and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ logger.debug(
+ "try to get git project members (try #%s)" % counter)
+ time.sleep(session.wait_until_time_pause)
+ try:
+ r1 = requests.get(getURL, headers=headers, verify=False)
+ counter += 1
+ except Exception as e:
+ if counter >= Constants.GitLabConstants.RETRIES_NUMBER:
+ logger.error("Failed to get project's team members from APIGitLab, see response >>> %s %s \n %s %s"
+ % (r1.status_code, r1.reason, str(r1.content, 'utf-8'), e.message))
+ return False
+ if r1.content == b'[]':
+ logger.error("Got an empty list as a response.")
+ return False
+ elif r1.status_code != 200:
+ logger.error("Got %s %s." % (r1.status_code, r1.reason))
+ return False
+ logger.debug("Got %s %s, user found in project." %
+ (r1.status_code, r1.reason))
+ return True
+
+ @staticmethod
+ def negative_validate_git_project_member(path_with_namespace, user_email, git_user_id):
+ if settings.DATABASE_TYPE != 'local':
+ r1 = None
+ headers = dict()
+ getURL = Constants.Default.URL.GitLab.Projects.TEXT + \
+ path_with_namespace + "/members/" + git_user_id
+ logger.debug("Get project members URL: " + getURL)
+ headers['Content-type'] = 'application/json'
+ headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
+ counter = 0
+ while r1 is None or str.encode(user_email) not in r1.content and counter <= Constants.GitLabConstants.RETRIES_NUMBER:
+ logger.debug(
+ "try to get git project members (try #%s)" % counter)
+ time.sleep(session.wait_until_time_pause)
+ try:
+ r1 = requests.get(getURL, headers=headers, verify=False)
+ counter += 1
+ except Exception as e:
+ if counter >= Constants.GitLabConstants.RETRIES_NUMBER:
+ logger.error("Failed to get project's team members from APIGitLab, see response >>> %s %s \n %s %s"
+ % (r1.status_code, r1.reason, str(r1.content, 'utf-8'), e.message))
+ return False
+
+ if r1.content == b'[]':
+ logger.debug("Got %s %s, user not found in project." %
+ (r1.status_code, r1.reason))
+ return True
+ else:
+ logger.debug("Got %s %s, user found in project." %
+ (r1.status_code, r1.reason))
+ return False
+
+ @staticmethod
+ def get_git_user(user_email):
+ if settings.DATABASE_TYPE != 'local':
+ r1 = None
+ user_email = user_email.replace("@", "_at_")
+ getURL = settings.GITLAB_URL + \
+ "api/v3/users?username=" + user_email
+ logger.debug("Get user URL: " + getURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
+ try:
+ r1 = requests.get(getURL, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ counter = 0
+ while r1.content == b'[]' and counter <= 60:
+ logger.info(
+ "Will try to get gitlab user until will be response... #%s" % counter)
+ time.sleep(session.wait_until_time_pause_long)
+ r1 = requests.get(getURL, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ counter += 1
+
+ if r1.content == b'[]':
+ logger.error("Got an empty user from gitlab.")
+ raise Exception("Got an empty user from gitlab.")
+
+ logger.debug("Got %s %s and received user data: %s." %
+ (r1.status_code, r1.reason, r1.content))
+ content = r1.json()
+ return content[0]
+ except:
+ if r1 is None:
+ logger.error("Failed to get user from APIGitLab.")
+ else:
+ logger.error("Failed to get user from APIGitLab, see response >>> %s %s \n %s"
+ % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def get_git_user_ssh_key(git_user_id):
+ r1 = None
+ getURL = Constants.Default.URL.GitLab.Users.TEXT + \
+ str(git_user_id) + "/keys"
+ logger.debug("Get user URL: " + getURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['PRIVATE-TOKEN'] = settings.GITLAB_TOKEN
+ try:
+ r1 = requests.get(getURL, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ if r1.content == '[]':
+ logger.error("Got an empty list as a response.")
+ raise
+ logger.debug("Got %s %s and received user's public key." %
+ (r1.status_code, r1.reason))
+ content = r1.json() # Change it from list to dict.
+ gitPubKey = content[0]['key']
+ return gitPubKey
+ except:
+ if r1 is None:
+ logger.error("Failed to get user's public key from APIGitLab.")
+ else:
+ logger.error("Failed to get user's public key from APIGitLab, see response >>> %s %s \n %s"
+ % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def git_clone_push(user_content):
+ if settings.DATABASE_TYPE != 'local':
+ logger.debug(
+ "About to push files into project's repository on the local folder(not over origin).")
+ try:
+ user_content['session_token'] = "token " + \
+ APIBridge.login_user(Constants.Users.Admin.EMAIL)
+ used_email_for_actions = Constants.Users.Admin.EMAIL
+ repo_dir = Constants.Paths.LocalGitFolder.PATH + \
+ user_content['vfName']
+ if not os.path.exists(repo_dir):
+ os.makedirs(repo_dir)
+ logger.debug("Created the following folder: %s" % repo_dir)
+ # Create pair of keys for user.
+ user_pub_key = Helper.get_or_create_rsa_key_for_admin()
+ DBVirtualFunction.add_admin_to_eng_team(
+ user_content['engagement_uuid'])
+ # Set SSH Key for the user.
+ APIBridge.set_ssh(user_content, user_pub_key)
+ git_user = APIGitLab.get_git_user(used_email_for_actions)
+
+ counter = 0
+ git_user_pub_key = None
+ while user_pub_key != git_user_pub_key and counter < Constants.GitLabConstants.RETRIES_NUMBER:
+ try:
+ git_user_pub_key = APIGitLab.get_git_user_ssh_key(
+ git_user['id'])
+ except Exception as e:
+ pass
+
+ counter += 1
+ time.sleep(session.wait_until_time_pause)
+
+ # Check that the SSH key was added to user on APIGitLab.
+ if user_pub_key != git_user_pub_key:
+ raise Exception("The SSH Key received does not equal to the"
+ " one provided! The key from"
+ "APIGitLab:\n %s ==<>== %s"
+ % (git_user_pub_key, user_pub_key))
+
+ gitRepoURL = "git@gitlab:%s/%s.git" % (
+ user_content['engagement_manual_id'], user_content['vfName'])
+ logger.debug("Clone repo from: " + gitRepoURL)
+ APIGitLab.is_gitlab_ready(user_content)
+ cmd = 'cd ' + repo_dir + \
+ '; git config --global user.email \"' + Constants.Users.Admin.EMAIL + \
+ '\"; git config --global user.name \"' + \
+ Constants.Users.Admin.FULLNAME + '\";'
+ # Commit all changes.
+ p = subprocess_popen = subprocess.Popen(
+ cmd, shell=True, stderr=subprocess.PIPE)
+ while subprocess_popen is None:
+ logger.debug(
+ "waiting to subprocess command to complete...")
+ APIGitLab.display_output(p)
+ # Clone project from APIGitLab.
+ repo = git.Repo.clone_from(gitRepoURL, repo_dir)
+ logger.debug("Successfully cloned repo to " + repo_dir)
+ # Create three files (file0, file1, file2) and add them to git
+ # index.
+ for i in range(3):
+ fileName = repo_dir + '/file' + str(i)
+ with open(fileName, 'w') as content_file:
+ os.chmod(fileName, 0o600)
+ content_file.write("Test file " + fileName)
+ repo.index.add([fileName])
+ logger.debug(
+ fileName + " was created and added to commit list.")
+ cmd = 'cd ' + repo_dir + \
+ '; git commit -a -m \"Create and add 3 files to git.\"'
+ # Commit all changes.
+ p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
+ APIGitLab.display_output(p)
+ logger.debug("All files added to commit list.")
+ cmd = 'cd ' + repo_dir + '; git push'
+ # Push commit to APIGitLab.
+ p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
+ APIGitLab.display_output(p)
+ logger.debug("All files were pushed to APIGitLab.")
+ except Exception as e:
+ logger.error(
+ "_-_-_-_-_- Unexpected error in git_clone_push: " + str(e))
+ raise Exception(e)
+
+ @staticmethod
+ def git_push_commit(user_content):
+ if settings.DATABASE_TYPE != 'local':
+ logger.debug(
+ "About to push files into project's repository on APIGitLab")
+ try:
+ git_work = '/tmp/git_work/'
+ repo_dir = git_work + user_content['vfName']
+ # Create three files (file0, file1, file2) and add them to git
+ # index.
+ for i in range(3):
+ fileName = repo_dir + '/file' + str(i)
+ with open(fileName, 'w') as content_file:
+ os.chmod(fileName, 0o600)
+ content_file.write("Edit test file " + fileName)
+ logger.debug(fileName + " was edited.")
+ cmd = 'cd ' + repo_dir + \
+ '; git commit -a -m \"Create and add 3 files to git.\"'
+ # Commit all changes.
+ p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
+ APIGitLab.display_output(p)
+ logger.debug("All edited files were committed.")
+ cmd = 'cd ' + repo_dir + '; git push'
+ # Push commit to APIGitLab.
+ p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
+ APIGitLab.display_output(p)
+ logger.debug(
+ "All edited files were commited and pushed to APIGitLab.")
+ except Exception as e:
+ logger.error(
+ "_-_-_-_-_- Unexpected error in git_push_commit : " + str(e))
+ raise Exception(
+ "Something went wrong on git_push_commit function, please check logs.")
+
+ @staticmethod
+ def is_gitlab_ready(user_content):
+ counter = 1
+ gettURL = settings.ICE_EM_URL + '/v1/engmgr/engagement/' + \
+ user_content['engagement_uuid'] + '/checklist/new/'
+ logger.debug(
+ "Get URL to check if GitLab and Jenkins are ready: " + gettURL)
+ # Validate with EL
+ token = "token " + APIBridge.login_user(user_content['el_email'])
+ headers = dict() # Create header for get request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = token
+ r1 = requests.get(gettURL, headers=headers, verify=False)
+ while (r1.content == b'"Create New checklist is not ready yet"' and counter <=
+ Constants.GitLabConstants.RETRIES_NUMBER):
+ time.sleep(session.wait_until_time_pause_long)
+ logger.debug(
+ "GitLab and Jenkins are not ready yet, trying again (%s of %s)" %
+ (counter, Constants.GitLabConstants.RETRIES_NUMBER))
+ r1 = requests.get(gettURL, headers=headers, verify=False)
+ counter += 1
+ if r1.status_code != 200:
+ if r1.content == "Create New checklist is not ready yet":
+ raise Exception("Max retries exceeded, failing test...")
+ else:
+ raise Exception("Something went wrong while waiting for GitLab and Jenkins. %s %s" % (
+ r1.status_code, r1.reason))
+ return False
+ elif r1.status_code == 200:
+ logger.debug("Gitlab and Jenkins are ready to continue!")
+ return True
diff --git a/services/api/api_jenkins.py b/services/api/api_jenkins.py
new file mode 100644
index 0000000..e1e1f6e
--- /dev/null
+++ b/services/api/api_jenkins.py
@@ -0,0 +1,81 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+from django.conf import settings
+import requests
+from requests.auth import HTTPBasicAuth
+
+from services.constants import Constants
+from services.helper import Helper
+from services.logging_service import LoggingServiceFactory
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class APIJenkins:
+
+ @staticmethod
+ def get_jenkins_job(job_name):
+ r1 = None
+ getURL = settings.JENKINS_URL + "job/" + job_name
+ logger.debug("Get APIJenkins job URL: " + getURL)
+ try:
+ r1 = requests.get(getURL, auth=HTTPBasicAuth(
+ settings.JENKINS_USERNAME, settings.JENKINS_PASSWORD))
+ Helper.internal_assert(r1.status_code, 200)
+ logger.debug("Job was created on APIJenkins!")
+ except:
+ msg = None
+
+ if r1 is None:
+ msg = "APIJenkins didn't create job for %s" % job_name
+ else:
+ msg = "APIJenkins didn't create job for %s, see response >>> %s %s" % (
+ job_name, r1.status_code, r1.reason)
+
+ logger.error(msg)
+ raise Exception(msg)
+
+ @staticmethod
+ def find_build_num_out_of_jenkins_log(log):
+ lines_array = log.splitlines()
+ for line in lines_array:
+ if Constants.Dashboard.Checklist.JenkinsLog.Modal.Body.BUILD_IDENTIFIER in line:
+ parts = line.partition('jenkins')
+ return parts[2]
+
diff --git a/services/api/api_rados.py b/services/api/api_rados.py
new file mode 100644
index 0000000..61cfa5c
--- /dev/null
+++ b/services/api/api_rados.py
@@ -0,0 +1,161 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import logging
+import time
+
+from boto.s3.connection import S3Connection, OrdinaryCallingFormat
+from django.conf import settings
+
+from rados.rgwa_client_factory import RGWAClientFactory
+from services.constants import Constants
+from services.logging_service import LoggingServiceFactory
+from services.session import session
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class APIRados:
+
+ @staticmethod
+ def get_bucket(name):
+ """Return the Bucket."""
+ boto_conn = RGWAClientFactory.standard()
+ try:
+ return boto_conn.lookup(name)
+ except Exception as e:
+ logger.error("Problem on get bucket", e)
+ raise e
+
+ @staticmethod
+ def get_bucketfor_specific_user(name, access_key_id, secret_access_key):
+ """Return the Bucket."""
+ boto_conn = APIRados.specific_client(access_key_id, secret_access_key)
+ try:
+ return boto_conn.lookup(name)
+ except Exception as e:
+ logger.error("Problem on get bucket for specific user", e)
+ raise e
+
+ @staticmethod
+ def get_bucket_grants(bucket_name):
+ """Return the Grants."""
+ counter = 1
+ bucket = APIRados.get_bucket(bucket_name)
+ while not bucket and counter <= Constants.RGWAConstants.BUCKET_RETRIES_NUMBER:
+ logger.error("Bucket not found. Retry #%s" % counter)
+ time.sleep(session.wait_until_time_pause_long)
+ bucket = APIRados.get_bucket(bucket_name)
+ counter += 1
+ if not bucket:
+ raise TimeoutError("Max retries exceeded, failing test...")
+ grants = bucket.list_grants()
+ print("***********grants=", grants)
+ return grants
+
+ @staticmethod
+ def is_bucket_ready(bucket_id):
+ counter = 1
+ bucket = APIRados.get_bucket(bucket_id)
+ while (bucket == None and counter <=
+ Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
+ time.sleep(session.wait_until_time_pause_long)
+ logger.debug(
+ "bucket are not ready yet, trying again (%s of 180)" % counter)
+ bucket = APIRados.get_bucket(bucket_id)
+ counter += 1
+ print("****_+__+bucket= ", str(bucket))
+ time.sleep(session.wait_until_time_pause_long)
+ if bucket == None:
+ raise TimeoutError("Max retries exceeded, failing test...")
+ elif bucket != None:
+ logger.debug("bucket are ready to continue!")
+ return True
+
+ @staticmethod
+ def users_of_bucket_ready_after_complete(bucket_id, user_name):
+ grants = APIRados.get_bucket_grants(bucket_id)
+ count = 0
+ counter = 1
+ while (count != 0 and counter <=
+ Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
+ grants = APIRados.get_bucket_grants(bucket_id)
+ time.sleep(session.wait_until_time_pause_long)
+ for g in grants:
+ if g.id == user_name:
+ count = +1
+ time.sleep(session.wait_until_time_pause_long)
+ if count != 0:
+ raise Exception("Max retries exceeded, failing test...")
+ return False
+ elif count == 0:
+ logger.debug("users_of_bucket are ready to continue!")
+ return True
+
+ @staticmethod
+ def users_of_bucket_ready_after_created(bucket_id, user_name):
+ grants = APIRados.get_bucket_grants(bucket_id)
+ count = 0
+ counter = 1
+ while (count == 0 and counter <=
+ Constants.RGWAConstants.BUCKET_RETRIES_NUMBER):
+ grants = APIRados.get_bucket_grants(bucket_id)
+ time.sleep(session.wait_until_time_pause_long)
+ for g in grants:
+ if g.id == user_name:
+ count = +1
+ time.sleep(session.wait_until_time_pause_long)
+ if count == 0:
+ raise Exception("Max retries exceeded, failing test...")
+ return False
+ elif count > 0:
+ logger.debug("users_of_bucket are ready to continue!")
+ return True
+
+ @staticmethod
+ def specific_client(access_key_id, secret_access_key):
+ boto_conn = S3Connection(
+ host=settings.AWS_S3_HOST,
+ port=settings.AWS_S3_PORT,
+ aws_access_key_id=access_key_id,
+ aws_secret_access_key=secret_access_key,
+ calling_format=OrdinaryCallingFormat(),
+ is_secure=True,)
+
+ boto_conn.num_retries = 0
+ return boto_conn
diff --git a/services/api/api_user.py b/services/api/api_user.py
new file mode 100644
index 0000000..3e38fd2
--- /dev/null
+++ b/services/api/api_user.py
@@ -0,0 +1,308 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import re
+
+from django.conf import settings
+import requests
+
+from services.api.api_bridge import APIBridge
+from services.api.api_gitlab import APIGitLab
+from services.constants import Constants, ServiceProvider
+from services.helper import Helper
+from services.logging_service import LoggingServiceFactory
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class APIUser:
+
+ @staticmethod
+ # Update account API - only adds new SSH key!
+ def update_account(user_content):
+ r1 = None
+ token = APIUser.login_user(user_content['email'])
+ user_content['session_token'] = 'token ' + token
+ sshKey = Helper.generate_sshpub_key()
+ putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account'
+ logger.debug("Put user URL: " + putURL)
+ headers = dict() # Create header for put request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ # headers['Authorization'] = user_content['activation_token']
+ put_data = dict() # Create JSON data for put request.
+ user_content['vendor'] = user_content['company']['name']
+ if user_content['vendor'] == "AT&amp;T":
+ put_data['company'] = "AT&T"
+ else:
+ put_data['company'] = user_content['vendor']
+ put_data['email'] = user_content['email']
+ put_data['full_name'] = user_content['full_name']
+ put_data['password'] = ""
+ put_data['phone_number'] = "+1201" + \
+ Helper.rand_string("randomNumber", 6)
+ put_data['ssh_key'] = sshKey
+ try:
+ r1 = requests.put(
+ putURL, json=put_data, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ logger.debug(
+ "SSH Key was added successfully to user " + user_content['full_name'])
+ if not APIBridge.is_gitlab_ready(user_content):
+ raise
+ return sshKey
+ except:
+ if r1 is None:
+ logger.error("Failed to add public SSH key to user.")
+ else:
+ logger.error("PUT request failed to add SSH key to user, see response >>> %s %s \n %s" % (
+ r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ # Update account API - only adds new SSH key!
+ def update_account_injec_script(user_content):
+ r1 = None
+ putURL = settings.ICE_EM_URL + '/v1/engmgr/users/account'
+ logger.debug("Put user URL: " + putURL)
+ headers = dict() # Create header for put request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ put_data = dict() # Create JSON data for put request.
+ if user_content['vendor'] == "AT&amp;T":
+ put_data['company'] = "AT&T"
+ else:
+ put_data['company'] = user_content['vendor']
+ put_data['email'] = user_content['email']
+ script = "<script>;</script>"
+ put_data['full_name'] = script
+ put_data['password'] = ""
+ put_data['phone_number'] = "+1201" + \
+ Helper.rand_string("randomNumber", 6)
+ try:
+ r1 = requests.put(
+ putURL, json=put_data, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ msg = "Testing for Cross site scripting successfully : " + \
+ user_content['full_name'] + \
+ "stattus Code = " + str(r1.status_code)
+ logger.debug(msg)
+ if not APIBridge.is_gitlab_ready(user_content):
+ raise
+ return True
+ except:
+ if r1 is None:
+ logger.error("Failed to add public SSH key to user.")
+ else:
+ logger.error("PUT request failed to add SSH key to user, see response >>> %s %s \n %s" % (
+ r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def create_new_user(company=False, activate=False):
+ signupUrl = settings.EM_REST_URL + "signup/"
+ signupParams = APIUser.create_signup_param(company)
+ r1 = requests.post(signupUrl, json=signupParams, verify=False)
+ if (r1.status_code == 201 or r1.status_code == 200):
+ logger.debug("Moving to next test case. status=" +
+ str(r1.status_code))
+ pass # Need to break here.
+ try:
+ user_data = r1.json()
+ except Exception as e:
+ logger.error("=========== json error ========")
+ logger.error("r1.content = " + str(r1.content) +
+ ", status=" + str(r1.status_code))
+ logger.error("=========== json error end ========")
+ raise e
+ if activate:
+ APIUser.activate_user(
+ user_data['uuid'], user_data["user"]["activation_token"])
+ return user_data
+
+ @staticmethod
+ def login_user(email):
+ postUrl = settings.EM_REST_URL + "login"
+ user_data = dict() # Create JSON data for post request.
+ user_data['email'] = email
+ user_data['password'] = "iceusers"
+ try:
+ headers = {'Content-type': 'application/json'}
+ r = requests.post(
+ postUrl, json=user_data, headers=headers, verify=False)
+ logger.debug(str(r.status_code) + " " + r.reason)
+ decoded_response = r.json()
+ return decoded_response['token']
+ except:
+ logger.debug("Failed to login.")
+ raise
+
+ @staticmethod
+ def set_ssh(user_content, sshKey=None): # Set SSH key to user.
+ r1 = None
+ if sshKey is None:
+ logger.debug("About to generate an ssh key for the user: %s" %
+ user_content['email'])
+ sshKey = Helper.generate_sshpub_key()
+ postURL = settings.ICE_EM_URL + '/v1/engmgr/users/ssh'
+ logger.debug("Post user URL: " + postURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ post_data = dict() # Create JSON data for post request.
+ post_data['ssh_key'] = sshKey
+ try:
+ r1 = requests.post(
+ postURL, json=post_data, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ logger.debug(
+ "SSH Key was added successfully")
+ if not APIBridge.is_gitlab_ready(user_content):
+ raise
+ return sshKey
+ except:
+ if r1 is None:
+ logger.error("Failed to add public SSH key.")
+ else:
+ logger.error(
+ "POST request failed to add SSH key to user, see response >>> %s %s" % (r1.status_code, r1.reason))
+ raise
+
+ @staticmethod
+ def create_signup_param(company=False):
+ try: # Click on element in UI, by xPath locator.
+ if not company:
+ company = ServiceProvider.MainServiceProvider
+ email_domain = ServiceProvider.email
+ else:
+ email_domain = company.lower() + ".com" #
+ data = {
+ "company": company,
+ "full_name": Helper.rand_string("randomString"),
+ "email": Helper.rand_string("randomString") + "@" + email_domain,
+ "phone_number": Constants.Default.Phone.TEXT,
+ "password": Constants.Default.Password.TEXT,
+ "regular_email_updates": "True"
+ }
+
+ return data
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ errorMsg = "Could not create Sign Up parametrs"
+ raise Exception(errorMsg, e)
+
+ @staticmethod
+ def activate_user(userUuid, activationToken):
+ postUrl = settings.ICE_EM_URL + "/v1/engmgr/users/activate/" + \
+ userUuid + "/" + activationToken
+ logger.debug(postUrl)
+ r1 = requests.get(postUrl, verify=False)
+ if (r1.status_code == 200):
+ logger.debug("APIUser activated successfully!")
+ return True
+ else:
+ raise Exception(
+ "Failed to activate user >>> %s %s" % (r1.status_code, r1.reason))
+
+ @staticmethod
+ def signup_invited_user(company, invited_email, invite_token, invite_url, user_content, is_contact_user="false", activate=False, wait_for_gitlab=True):
+ r1 = None
+ postURL = settings.ICE_EM_URL + '/v1/engmgr/signup'
+ logger.debug("Post signup URL: " + postURL)
+ if is_contact_user == "true":
+ fullName = re.sub("http.*full_name=", "", invite_url)
+ fullName = re.sub("&.*", "", fullName)
+ logger.debug(
+ "Invited contact full name is (according to url): " + fullName)
+ else:
+ fullName = Helper.rand_string('randomString')
+
+ post_data = dict() # Create JSON data for post request.
+ post_data['company'] = company
+ post_data['email'] = invited_email
+ post_data['full_name'] = fullName
+ post_data['invitation'] = invite_token
+ post_data['is_contact_user'] = is_contact_user
+ post_data['password'] = "iceusers"
+ post_data['phone_number'] = "+1201" + \
+ Helper.rand_string("randomNumber", 6)
+ post_data['regular_email_updates'] = "False"
+ post_data['terms'] = "True"
+ try:
+ requests.get(invite_url, verify=False)
+ r1 = requests.post(
+ postURL, json=post_data, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ logger.debug("Invited user signed-up successfully!")
+
+ user_data = r1.json()
+ if activate:
+ APIUser.activate_user(
+ user_data['uuid'], user_data["user"]["activation_token"])
+
+ if wait_for_gitlab:
+ if not APIBridge.is_gitlab_ready(user_content):
+ raise
+ return post_data
+ except:
+ if r1 is None:
+ logger.error("Failed to sign up the invited team member.")
+ else:
+ logger.error("POST request failed to sign up the invited team member, see response >>> %s %s \n %s" % (
+ r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def create_new_user_content():
+ user_content = APIBridge.create_engagement()
+ APIGitLab.git_clone_push(user_content)
+ APIBridge.frontend_login(
+ user_content['email'], Constants.Default.Password.TEXT)
+ vfName = user_content['vfName']
+ uuid = user_content['uuid']
+ inviteEmail = Helper.rand_invite_email()
+ newObj = [vfName, uuid, inviteEmail]
+ return newObj, user_content
+
+ @staticmethod
+ def create_new_user_content_login_with_api():
+ user_content = APIBridge.create_engagement()
+ APIGitLab.git_clone_push(user_content)
+ token = "token " + APIBridge.login_user(user_content['el_email'])
+ user_content['session_token'] = token
+ return user_content
diff --git a/services/api/api_virtual_function.py b/services/api/api_virtual_function.py
new file mode 100644
index 0000000..46610e8
--- /dev/null
+++ b/services/api/api_virtual_function.py
@@ -0,0 +1,368 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import datetime
+import json
+import time
+
+from django.conf import settings
+import requests
+
+from services.api.api_gitlab import APIGitLab
+from services.api.api_user import APIUser
+from services.constants import Constants, ServiceProvider
+from services.database.db_general import DBGeneral
+from services.helper import Helper
+from services.logging_service import LoggingServiceFactory
+
+
+logger = LoggingServiceFactory.get_logger()
+
+
+class APIVirtualFunction:
+
+ @staticmethod
+ def add_next_step(user_content, files=[]):
+ r1 = None
+ postURL = settings.ICE_EM_URL + '/v1/engmgr/engagements/' + \
+ user_content['engagement_uuid'] + '/nextsteps'
+ logger.debug("Post add next step URL: " + postURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ data = dict() # Create JSON data for post request.
+ files_list = list()
+ if type(files) is list:
+ for file in files:
+ files_list.append(file)
+ else:
+ files_list.append(files)
+ data['files'] = files_list
+ data['assigneesUuids'] = [user_content['uuid']]
+ data['duedate'] = str(datetime.date.today())
+ data['description'] = "API test - add next step."
+ list_data = []
+ list_data.append(data)
+ try:
+ r1 = requests.post(
+ postURL, json=list_data, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ logger.debug("Next step was added to the engagement!")
+ ns_uuid = r1.json()
+ return ns_uuid[0]['uuid']
+ except:
+ if r1 is None:
+ logger.error(
+ "Failed to add next step to VF " + user_content['vfName'])
+ else:
+ logger.error("Failed to add next step to VF " + user_content[
+ 'vfName'] + ", see response >>> %s %s.\nContent: %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def create_vf(token):
+ r1 = None
+ postUrl = settings.EM_REST_URL + "vf/"
+ targetVersion = DBGeneral.select_from(
+ "uuid", "ice_deployment_target", 1)
+ ecompRelease = DBGeneral.select_from("uuid", "ice_ecomp_release", 1)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = 'token ' + token
+ jdata = [{"virtual_function": Helper.rand_string("randomString"),
+ "version": Helper.rand_string("randomString") + Helper.rand_string("randomNumber"),
+ "target_lab_entry_date": time.strftime("%Y-%m-%d"),
+ "target_aic_uuid": targetVersion,
+ "ecomp_release": ecompRelease,
+ "is_service_provider_internal": False}]
+ try:
+ r1 = requests.post(
+ postUrl, json=jdata, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ logger.debug("Virtual Function created successfully!")
+ content = r1.content[1:-1]
+ return content
+ except:
+ if r1 is None:
+ logger.debug("Failed to create VF >>> request failed!")
+ else:
+ logger.debug(
+ "Failed to create VF >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def get_engagement(user_content):
+ r1 = None
+ postUrl = settings.EM_REST_URL + 'single-engagement/' + \
+ str(user_content['engagement_uuid'],)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ try:
+ r1 = requests.get(
+ postUrl, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ logger.debug("Retrieved the Engagement successfully!")
+ content = r1.content
+ return json.loads(content)
+ except:
+ if r1 is None:
+ logger.debug(
+ "Failed to Retrieve the Engagement >>> request failed!")
+ else:
+ logger.debug(
+ "Failed to Retrieve the Engagement >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def invite_team_member(user_content):
+ r1 = None
+ postURL = settings.ICE_EM_URL + '/v1/engmgr/invite-team-members/'
+ logger.debug("Post invite user URL: " + postURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ data = dict() # Create JSON data for post request.
+ data['email'] = Helper.rand_string(
+ 'randomString') + "@" + ServiceProvider.email
+ data['eng_uuid'] = user_content['engagement_uuid']
+ list_data = []
+ list_data.append(data)
+ try:
+ r1 = requests.post(
+ postURL, json=list_data, headers=headers, verify=False)
+ Helper.internal_assert_boolean(r1.status_code, 200)
+ logger.debug("Invite sent successfully to email " + data['email'])
+ invite_token = DBGeneral.select_where_and("invitation_token", "ice_invitation", "email", data[
+ 'email'], "engagement_uuid", user_content['engagement_uuid'], 1)
+ invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + invite_token + \
+ "&email=" + data['email']
+ logger.debug("Invitation URL is: " + invite_url)
+ return data['email'], invite_token, invite_url
+ except:
+ if r1 is None:
+ logger.error("Failed to invite team member.")
+ else:
+ logger.error(
+ "POST request failed to invite team member, see response >>> %s %s" % (r1.status_code, r1.reason))
+ raise
+
+ @staticmethod
+ def add_contact(user_content):
+ r1 = None
+ postURL = settings.ICE_EM_URL + '/v1/engmgr/add-contact/'
+ logger.debug("Post invite vendor contact URL: " + postURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ data = dict() # Create JSON data for post request.
+ data['company'] = user_content['vendor_uuid']
+ data['email'] = Helper.rand_string(
+ 'randomString') + "@" + ServiceProvider.email
+ data['eng_uuid'] = user_content['engagement_uuid']
+ data['full_name'] = Helper.rand_string('randomString')
+ data['phone_number'] = "+1201" + Helper.rand_string("randomNumber", 6)
+ try:
+ r1 = requests.post(
+ postURL, json=data, headers=headers, verify=False)
+ Helper.internal_assert_boolean(r1.status_code, 200)
+ logger.debug("Invite sent successfully to email " + data['email'])
+ invite_token = DBGeneral.select_where_and("invitation_token", "ice_invitation", "email", data[
+ 'email'], "engagement_uuid", user_content['engagement_uuid'], 1)
+ invite_url = settings.ICE_PORTAL_URL + "/#/signUp?invitation=" + invite_token + \
+ "&email=" + data['email'] + "&full_name=" + data['full_name'] + \
+ "&phone_number=" + \
+ data['phone_number'] + "&company=" + \
+ data['company'] + "&is_contact_user=true"
+ logger.debug("Invitation URL is: " + invite_url)
+ return data['email'], invite_token, invite_url
+ except:
+ if r1 is None:
+ logger.error("Failed to invite vendor contact.")
+ else:
+ logger.error("POST request failed to invite vendor contact, see response >>> %s %s \n %s" % (
+ r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def edit_next_step(user_content, ns_uuid):
+ r1 = None
+ postURL = settings.ICE_EM_URL + '/v1/engmgr/nextsteps/' + ns_uuid + \
+ '/engagement/' + user_content['engagement_uuid'] + '/modify/'
+ logger.debug("Put next step URL: " + postURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = user_content['session_token']
+ data = dict() # Create JSON data for post request.
+ data['files'] = []
+ data['assigneesUuids'] = [user_content['uuid']]
+ data['duedate'] = str(datetime.date.today())
+ data['description'] = "API edit next step test " + \
+ Helper.rand_string('randomString')
+ try:
+ r1 = requests.put(
+ postURL, json=data, headers=headers, verify=False)
+ Helper.internal_assert_boolean(r1.status_code, 202)
+ logger.debug("Next step was edited successfully!")
+ except:
+ if r1 is None:
+ logger.error("Failed to edit next step uuid: " + ns_uuid)
+ else:
+ logger.error("Failed to edit next step uuid: " + ns_uuid +
+ ", see response >>> %s %s" % (r1.status_code, r1.reason))
+ raise
+
+ @staticmethod
+ def get_export_dasboard_excel(token, keywords=""):
+ postUrl = settings.EM_REST_URL + \
+ "engagement/export/?stage=All&keyword=" + keywords
+ headers = {"Authorization": token}
+ r1 = requests.get(postUrl, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 200)
+ if (r1.status_code == 200):
+ logger.debug("APIUser activated successfully!")
+ return r1.content
+ else:
+ raise Exception(
+ "Failed to activate user >>> %s %s" % (r1.status_code, r1.reason))
+ return False
+
+ @staticmethod
+ def create_engagement(wait_for_gitlab=True):
+ user_content = APIUser.create_new_user()
+ APIUser.activate_user(
+ user_content['uuid'], user_content['user']['activation_token'])
+ token = APIUser.login_user(user_content['email'])
+ vf_content = json.loads(APIVirtualFunction.create_vf(token))
+ user_content['vfName'] = vf_content['name']
+ user_content['vf_uuid'] = vf_content['uuid']
+ user_content['target_aic'] = vf_content['deployment_target']['version']
+ # <-- ECOMP RELEASE
+ user_content['ecomp_release'] = vf_content[
+ 'ecomp_release']['name']
+ user_content['vnf_version'] = vf_content['version']
+ if(vf_content['vendor']['name'] == "AT&amp;T"):
+ user_content['vendor'] = "AT&T"
+ else:
+ user_content['vendor'] = vf_content['vendor']['name']
+ user_content['vendor_uuid'] = vf_content['vendor']['uuid']
+ user_content['engagement_manual_id'] = vf_content[
+ 'engagement']['engagement_manual_id']
+ user_content['target_lab_entry_date'] = vf_content[
+ 'target_lab_entry_date']
+ user_content['el_email'] = vf_content[
+ 'engagement']['reviewer']['email']
+ user_content['el_name'] = vf_content[
+ 'engagement']['reviewer']['full_name']
+ user_content['pr_email'] = vf_content[
+ 'engagement']['peer_reviewer']['email']
+ user_content['pr_name'] = vf_content[
+ 'engagement']['peer_reviewer']['full_name']
+ user_content['engagement_uuid'] = vf_content['engagement']['uuid']
+ user_content['session_token'] = 'token ' + token
+ user_content['engagement'] = vf_content['engagement']
+ user_content['vfStage'] = vf_content['engagement']['engagement_stage']
+
+ return user_content
+
+ @staticmethod
+ def set_eng_stage(user_content, requested_stage):
+ token = APIUser.login_user(user_content['el_email'])
+ r1 = None
+ putUrl = Constants.Default.URL.Engagement.SingleEngagement.TEXT + \
+ user_content['engagement_uuid'] + "/stage/" + str(requested_stage)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = 'token ' + token
+ try:
+ r1 = requests.put(
+ putUrl, headers=headers, verify=False)
+ Helper.internal_assert(r1.status_code, 202)
+ logger.debug(
+ "Engagement stage was successfully changed to " + str(requested_stage) + "!")
+ content = r1.content[1:-1]
+ return content
+ except:
+ if r1 is None:
+ logger.debug("Failed to set eng stage >>> request failed!")
+ else:
+ logger.debug(
+ "Failed to set eng stage >>> %s %s \n %s" % (r1.status_code, r1.reason, str(r1.content, 'utf-8')))
+ raise
+
+ @staticmethod
+ def update_aic_version(eng_uuid, aic_version_uuid, session_token):
+ r1 = None
+ putURL = Constants.Default.URL.Engagement.EngagementOperations.TEXT + \
+ eng_uuid + '/deployment-targets/' + aic_version_uuid
+ logger.debug("Put next step URL: " + putURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = session_token
+ try:
+ r1 = requests.put(putURL, headers=headers, verify=False)
+ Helper.internal_assert_boolean(r1.status_code, 200)
+ logger.debug("AIC version has changed!")
+ except:
+ if r1 is None:
+ msg = "Failed to edit AIC version"
+ else:
+ msg = "Failed to edit AIC version, see response >>> %s %s" % (
+ r1.status_code, r1.reason)
+ raise msg
+
+ @staticmethod
+ def update_ecomp_release(eng_uuid, ecomp_release_uuid, session_token):
+ r1 = None
+ putURL = Constants.Default.URL.Engagement.EngagementOperations.TEXT + \
+ eng_uuid + '/ecomp-releases/' + ecomp_release_uuid
+ logger.debug("Put next step URL: " + putURL)
+ headers = dict() # Create header for post request.
+ headers['Content-type'] = 'application/json'
+ headers['Authorization'] = session_token
+ try:
+ r1 = requests.put(putURL, headers=headers, verify=False)
+ Helper.internal_assert_boolean(r1.status_code, 200)
+ logger.debug("AIC version has changed!")
+ except:
+ if r1 is None:
+ msg = "Failed to update ECOMP release"
+ else:
+ msg = "Failed to update ECOMP release, see response >>> %s %s" % (
+ r1.status_code, r1.reason)
+ raise msg