diff options
author | 2017-09-28 10:03:40 -0700 | |
---|---|---|
committer | 2017-09-28 10:14:55 -0700 | |
commit | bd886d918ef2adbabd16c61fdd2e47984e21dfd7 (patch) | |
tree | d41683dffa58fd698df450d148fab3cc2521b0c5 /django/engagementmanager/tests | |
parent | 474554adad912f3edb7ddc3ad14406abb369fb3c (diff) |
initial seed code commit VVP-5
Change-Id: I6560c87ef48a6d0d1fe8197c7c6439c7e6ad653f
Signed-off-by: Paul McGoldrick <paul.mcgoldrick@att.com>
Diffstat (limited to 'django/engagementmanager/tests')
41 files changed, 7115 insertions, 0 deletions
diff --git a/django/engagementmanager/tests/__init__.py b/django/engagementmanager/tests/__init__.py new file mode 100755 index 0000000..1726c13 --- /dev/null +++ b/django/engagementmanager/tests/__init__.py @@ -0,0 +1,38 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. diff --git a/django/engagementmanager/tests/test_access_credentials.py b/django/engagementmanager/tests/test_access_credentials.py new file mode 100755 index 0000000..73d427c --- /dev/null +++ b/django/engagementmanager/tests/test_access_credentials.py @@ -0,0 +1,175 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from engagementmanager.tests.test_base_entity import TestBaseEntity +import json +from boto.s3.connection import S3Connection, OrdinaryCallingFormat +from django.conf import settings +from wheel.signatures import assertTrue +from django.utils import timezone +from engagementmanager.utils.constants import Constants +from engagementmanager.vm_integration import vm_client +from validationmanager.rados.rgwa_client import RGWAClient +from validationmanager.tests.test_rgwa_client_factory import TestRGWAClientFactory + + +class ActivateTestCase(TestBaseEntity): + + def childSetup(self): # Variables to use in this class. + self.s3_host = '10.252.0.21' # settings.AWS_S3_HOST + self.s3_port = 8080 # settings.AWS_S3_PORT + + self.urlStr = self.urlPrefix + "signup/" + self.createDefaultRoles() + uuid, vendor = self.creator.createVendor(Constants.service_provider_company_name) + self.activation_token_time = timezone.now() + self.activation_token_time = self.activation_token_time.replace( + 2012, 1, 2, 13, 48, 25) + print("This is the time that is going to be added to expiredTokenUser: " + + str(self.activation_token_time)) + self.user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + self.new_user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user.uuid)) + print('Full Name: ' + self.user.full_name) + print('-----------------------------------------------------') + self.params = '{"company":"' + str(self.user.company) + '","full_name":"' + self.user.full_name + '","email":"' + self.user.email + '","phone_number":"' + self.user.phone_number + \ + '","password":"' + self.user.user.password + '","regular_email_updates":"' + \ + str(self.user.regular_email_updates) + \ + '","is_service_provider_contact":"' + str(self.user.is_service_provider_contact) + '"}' + self.userToken = self.loginAndCreateSessionToken(self.user) + self.new_user_token = self.loginAndCreateSessionToken(self.new_user) + + def test_validate_rgwauser_created_after_Activation(self): + if settings.IS_SIGNAL_ENABLED: + vm_client.fire_event_in_bg( + 'send_create_user_in_rgwa_event', self.user) + rgwa = TestRGWAClientFactory.admin() + rgwa_user = rgwa.get_user(self.user.full_name) + if rgwa_user is None: + print("Test Failed!") + else: + access_key = rgwa_user['access_key'] + print("######access_key#################= ", access_key) + secret_key = rgwa_user['secret_key'] + print("######secret_key#################= ", secret_key) + self.assertTrue(access_key and secret_key != None) + print("#################################") + print("Test PASS!") + self.printTestName("Test ended") + + +# Whenever a new user is configured, we should create a RadosGW user for them. +# If unspecified, the access keys are generated on the server and returned here +# in the response. + def testCreateAndGetRgwaUser(self): + if settings.IS_SIGNAL_ENABLED: + base_url = 'http://{S3_HOST}:{S3_PORT}/admin'.format( + S3_HOST=self.s3_host, # settings.AWS_S3_HOST, + S3_PORT=self.s3_port, # settings.AWS_S3_PORT, + ) + admin_conn = RGWAClient(base_url) + print("base_url=" + base_url) + print("S3_HOST = " + self.s3_host) + print("s3_port =" + str(self.s3_port)) + print("admin_conn= ", admin_conn) + username = self.randomGenerator("randomString") + print("username", username) + new_user = admin_conn.create_user( + uid=username, display_name='User "%s"' % username) + print("new_user = " + str(new_user)) + self.assertTrue(new_user['user_id'] != None) + get_user = admin_conn.get_user(new_user['user_id']) + self.assertTrue(new_user['user_id'] == get_user['user_id']) + + def testCreateAndGetRgwaBucket(self): + if settings.IS_SIGNAL_ENABLED: + s3aws_access_key_id = settings.AWS_ACCESS_KEY_ID + s3aws_secret_access_key = settings.AWS_SECRET_ACCESS_KEY + print("s3aws_access_key_id=" + s3aws_access_key_id) + print("s3aws_secret_access_key=" + s3aws_secret_access_key) + print("S3_HOST = " + self.s3_host) + print("s3_port =" + str(self.s3_port)) + + boto_conn = S3Connection(host=self.s3_host, + port=self.s3_port, + aws_access_key_id=s3aws_access_key_id, + aws_secret_access_key=s3aws_secret_access_key, + calling_format=OrdinaryCallingFormat(), + is_secure=False, + ) + boto_conn.num_retries = 0 + bucketname = self.randomGenerator("randomString").lower() + new_bucket = boto_conn.create_bucket(bucketname) + assertTrue(new_bucket != None) + print("new_bucket = " + str(new_bucket)) + bucket = boto_conn.get_bucket(bucketname) + print("bucket = " + str(bucket)) + bucket_acl = bucket.get_acl() + print("bucket_acl = " + str(bucket_acl)) + + def testGetSecretKey(self): + vm_client.send_create_user_in_rgwa_event(self.user) + urlStr = self.urlPrefix + 'users/account/rgwa/' + print("urlStr of get secret key",urlStr) + self.printTestName("testGetSecretKey [Start]") + response = self.c.get(urlStr, data={}, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.userToken}) + print('Got response : ' + str(response.status_code)) + dict_response = json.loads(response.content) + print("api response",dict_response) + self.assertTrue(dict_response["rgwa_secret_key"] is not None) + self.printTestName("testGetSecretKey [End]") + + def testNegativeGetSecretKeyInvalidToken(self): + vm_client.send_create_user_in_rgwa_event(self.user) + urlStr = self.urlPrefix + 'users/account/rgwa/' + print("urlStr of get secret key",urlStr) + self.printTestName("testGetSecretKey [Start]") + response = self.c.get(urlStr, data={}, content_type='application/json', + **{'HTTP_AUTHORIZATION': 'token' + self.new_user_token}) + print('Got response : ' + str(response.status_code)) + dict_response = json.loads(response.content) + print("api response",dict_response) + self.assertTrue(dict_response[ + "detail"] == 'You must authenticate in order to ' + + 'perform this action: Authentication credentials were not provided.') + self.printTestName("testGetSecretKey [End]") diff --git a/django/engagementmanager/tests/test_activation.py b/django/engagementmanager/tests/test_activation.py new file mode 100755 index 0000000..a845373 --- /dev/null +++ b/django/engagementmanager/tests/test_activation.py @@ -0,0 +1,173 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from django.utils import timezone + +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants + + +class ActivateTestCase(TestBaseEntity): + + def childSetup(self): # Variables to use in this class. + self.urlStr = self.urlPrefix + "signup/" + self.createDefaultRoles() + uuid, vendor = self.creator.createVendor(Constants.service_provider_company_name) + self.activation_token_time = timezone.now() + self.activation_token_time = self.activation_token_time.replace(2012, 1, 2, 13, 48, 25) + print("This is the time that is going to be added to expiredTokenUser: " + str(self.activation_token_time)) + self.user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, False) + self.expiredTokenUser = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, False, activation_token_create_time=self.activation_token_time) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user.uuid)) + print('Full Name: ' + self.user.full_name) + print('-----------------------------------------------------') + self.params = '{"company":"' + str(self.user.company) + '","full_name":"' + self.user.full_name + '","email":"' + self.user.email + '","phone_number":"' + self.user.phone_number + \ + '","password":"' + self.user.user.password + '","regular_email_updates":"' + \ + str(self.user.regular_email_updates) + '","is_service_provider_contact":"' + str(self.user.is_service_provider_contact) + '"}' + + def testActivation(self): + print("\n\n$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + print(" Test started: user activation") + print("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + for a in range(2): + print("###############################################") + print(" Before activation, Current User's activation mode: " + str(self.user.user.is_active)) + for a in range(2): + print("###############################################") + user_uuid = self.user.uuid + print(self.urlPrefix + 'activate/' + str(user_uuid) + '/' + str(self.user.user.activation_token)) + print("Activating through the activate_user function: (simulating a GET request)") + response = self.c.get(self.urlPrefix + 'activate/' + str(user_uuid) + + '/' + str(self.user.user.activation_token)) + print("Response: " + str(response.status_code)) + self.user.refresh_from_db() + for a in range(5): + print("******") + for a in range(2): + print("###############################################") + print(" Current User's activation mode: " + str(self.user.user.is_active)) + for a in range(2): + print("###############################################") + print("Current User's activation mode: " + str(self.user.user.is_active)) + if (self.user.user.is_active != True): + for a in range(2): + print("###############################################") + print(" User's activation failed: is_active != True..") + for a in range(2): + print("###############################################") + else: + for a in range(2): + print("#################################") + print(" User's is activated ") + for a in range(2): + print("#################################") + self.printTestName("Test ended") + + def testExpiredTokenActivation(self): + print("\n\n$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + print("Negative test started: Expired token activation") + print("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + for a in range(2): + print("###############################################") + print("Current User's activation mode: " + str(self.user.user.is_active)) + for a in range(2): + print("###############################################") + + urlStrToGET = self.urlPrefix + "users/" + user_uuid = self.expiredTokenUser.uuid + print("\n\nA user with was pre-initiated with old token creation time : 2012-01-02 13:48:25.299000+00:00\n\n") + response = self.c.get(self.urlPrefix + 'activate/' + str(user_uuid) + '/' + + str(self.expiredTokenUser.user.activation_token)) + self.user.refresh_from_db() + for a in range(2): + print("###############################################") + print(" Current User's activation mode: " + str(self.expiredTokenUser.user.is_active)) + for a in range(2): + print("###############################################") + print("\n\n") + if (self.expiredTokenUser.user.activation_token != True): + for a in range(2): + print("###############################################") + print("Test Success!") + for a in range(2): + print("###############################################") + else: + for a in range(2): + print("###############################################") + print("Test Failed!") + for a in range(2): + print("###############################################") + + def testUnMatchingTokenActivation(self): + print("\n\n$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + print("Negative test started: Un-matching token activation test") + print("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + for a in range(2): + print("###############################################") + print("Current User's activation mode: " + str(self.user.user.is_active)) + for a in range(2): + print("###############################################") + + urlStrToGET = self.urlPrefix + "users/" + user_uuid = self.expiredTokenUser.uuid + print("\n\n Trying to activate a user with an unmatching token") + response = self.c.get(self.urlPrefix + 'activate/' + str(self.user.uuid) + + '/' + str(self.expiredTokenUser.user.activation_token)) + self.user.refresh_from_db() + for a in range(2): + print("###############################################") + print(" Current User's activation mode: " + str(self.expiredTokenUser.user.is_active)) + for a in range(2): + print("###############################################") + print("\n\n") + if (self.expiredTokenUser.user.activation_token != True): + for a in range(2): + print("###############################################") + print("Test Success!") + for a in range(2): + print("###############################################") + else: + for a in range(2): + print("###############################################") + print("Test Failed!") + for a in range(2): + print("###############################################") diff --git a/django/engagementmanager/tests/test_activities.py b/django/engagementmanager/tests/test_activities.py new file mode 100755 index 0000000..9e83e9f --- /dev/null +++ b/django/engagementmanager/tests/test_activities.py @@ -0,0 +1,125 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from engagementmanager.bus.messages.activity_event_message import ActivityEventMessage +from engagementmanager.models import Vendor +from engagementmanager.utils.activities_data import UserJoinedEngagementActivityData +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.apps import bus_service +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class ActivityTestCase(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + + self.createDefaultRoles() + # Create a user with role el + vendor = Vendor.objects.get(name=Constants.service_provider_company_name) + self.el_user = self.creator.createUser(vendor, self.randomGenerator( + "main-vendor-email"), self.randomGenerator("randomNumber"), self.randomGenerator("randomString"), self.el, True) + vendor = Vendor.objects.get(name='Other') + self.user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + self.pruser = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + + # Create an Engagement with team + self.engagement = self.creator.createEngagement(self.randomGenerator( + "randomString"), self.randomGenerator("randomString"), None) + self.engagement.engagement_team.add(self.user, self.el_user) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.pruser + self.engagement.save() + + # Create a VF + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, vendor) + self.token = self.loginAndCreateSessionToken(self.user) + + def testCreateActivity(self): + urlStr = self.urlPrefix + 'engagement/${uuid}/activities/' + logger.debug("Starting Activity & notification creation test") + + logger.debug("Starting activity test: User joined") + vendor = Vendor.objects.get(name='Other') + randomUser = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + self.engagement.engagement_team.add(randomUser) + self.engagement.save() + + logger.debug( + "created a new user & added them to the engagement team, going to create the activity and consider it as a notification") + usersList = [] + usersList.append(randomUser) + activity_data = UserJoinedEngagementActivityData(self.vf, usersList, self.engagement) + bus_service.send_message(ActivityEventMessage(activity_data)) + logger.debug( + "activity & notification created successfully, please manually verify that an email was sent / MX server tried to send") + logger.debug("Ended activity test: User joined ") + + logger.debug("Starting pullActivities test") + response = self.c.get(urlStr.replace('${uuid}', str(self.engagement.uuid)), + **{'HTTP_AUTHORIZATION': "token " + self.token}) + content = response.content + status = response.status_code + logger.debug("Got response : " + str(status)) + logger.debug("Got content : " + str(content)) + if (status != 200): + logger.error("Got response : " + str(status) + " , wrong http response returned ") + self.assertEqual(response.status_code, 200) + logger.debug("Ended pullActivities test ") + + logger.debug("Starting activity test: delete user") + logger.debug("Verify that the 'User Joined' activity is deleted from the recent activities") + response = self.c.get(urlStr.replace('${uuid}', str(self.engagement.uuid)), + **{'HTTP_AUTHORIZATION': "token " + self.token}) + content = response.content + status = response.status_code + logger.debug("Got response : " + str(status)) + logger.debug("Got content : " + str(content)) + if (status != 200): + logger.error("Got response : " + str(status) + " , wrong http response returned ") + self.assertEqual(response.status_code, 200) + logger.debug("Ended activity test: delete user ") diff --git a/django/engagementmanager/tests/test_add_contact.py b/django/engagementmanager/tests/test_add_contact.py new file mode 100755 index 0000000..c1af553 --- /dev/null +++ b/django/engagementmanager/tests/test_add_contact.py @@ -0,0 +1,141 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +import random +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.models import Vendor +from engagementmanager.utils.constants import Constants + + +class TestAddContactTestCase(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Other']) + + self.createDefaultRoles() + + # Create a user with role el + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator( + "main-vendor-email"), '55501000199', 'el user', self.el, True) + + self.inviter = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator( + "main-vendor-email"), '55501000199', 'inviter user', self.standard_user, True) + + self.reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator( + "main-vendor-email"), '55501000199', 'reviewer user', self.el, True) + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator( + "main-vendor-email"), '55501000199', 'peer-reviewer user', self.el, True) + # Create an Engagement with team + self.engagement = self.creator.createEngagement( + '123456789', 'Validation', None) + self.engagement.reviewer = self.reviewer + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.save() + self.engagement.engagement_team.add(self.inviter, self.el_user) + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement.uuid)) + print('-----------------------------------------------------') + + # Create a VF + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), self.engagement, + self.deploymentTarget, False, Vendor.objects.get(name='Other')) + + self.urlStr = self.urlPrefix + "add-contact/" + self.data = dict() + self.token = self.loginAndCreateSessionToken(self.el_user) + + def initBody(self): + self.data['company'] = Vendor.objects.get(name=Constants.service_provider_company_name).name + self.data['full_name'] = "full name" + self.data['email'] = self.randomGenerator("main-vendor-email") + self.data['phone_number'] = "12345" + + def addContact(self, expectedStatus=200): + self.contactData = json.dumps(self.data, ensure_ascii=False) + response = self.c.post(self.urlStr, self.contactData, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, expectedStatus) + return response + + def createContactUser(self): + self.contact = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.data['email'], + self.data['phone_number'], self.data['full_name'], self.standard_user, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.contact.uuid)) + print('Full Name: ' + self.contact.full_name) + print('-----------------------------------------------------') + + ### TESTS ### + + def testAddContactForNonExistingContact(self): + self.initBody() + self.data['eng_uuid'] = str(self.engagement.uuid) + self.data['email'] = self.randomGenerator("main-vendor-email") + self.addContact() + + def testAddContactForExistingContactAndEngUuid(self): + self.initBody() + self.createContactUser() + self.data['eng_uuid'] = str(self.engagement.uuid) + self.addContact(200) + + def testNegativeAddContactForNonExistingContact(self): + self.initBody() + self.data['eng_uuid'] = str(self.engagement.uuid) + self.data['email'] = None + print("Negative test: removing mandatory field email --> Should fail on 400") + self.addContact(400) + + def testNegativeAddContactForExistingContactAndFakeEngUUID(self): + self.initBody() + self.createContactUser() + self.data['eng_uuid'] = "FakeUuid" + print("Negative test: Non existing engagement UUID --> Should fail on 500") + self.addContact(401) diff --git a/django/engagementmanager/tests/test_add_feedback.py b/django/engagementmanager/tests/test_add_feedback.py new file mode 100755 index 0000000..33382aa --- /dev/null +++ b/django/engagementmanager/tests/test_add_feedback.py @@ -0,0 +1,105 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +import random + +from rest_framework.status import HTTP_200_OK, HTTP_400_BAD_REQUEST +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.models import Vendor +from engagementmanager.utils.constants import Constants +from engagementmanager.utils.request_data_mgr import request_data_mgr + + +class TestAddContactTestCase(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + self.reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), '55501000199', 'reviewer user', self.el, True) + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + # Create an Engagement with team + self.engagement = self.creator.createEngagement( + '123456789', 'Validation', None) + self.engagement.reviewer = self.reviewer + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.save() + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement.uuid)) + print('-----------------------------------------------------') + + # Create a VF + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), self.engagement, + self.deploymentTarget, False, Vendor.objects.get(name='Other')) + + self.urlStr = self.urlPrefix + "add-feedback/" + self.data = dict() + self.token = self.loginAndCreateSessionToken(self.reviewer) + + def initBody(self): + self.data['description'] = Vendor.objects.get( + name=Constants.service_provider_company_name).name + "ruslan gafiulin" + + def addFeedback(self, expectedStatus=HTTP_200_OK): + self.feedbackData = json.dumps(self.data, ensure_ascii=False) + response = self.c.post(self.urlStr, self.feedbackData, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, expectedStatus) + return response + + ### TESTS ### + + def testAddFeedbackForNonExistingContact(self): + self.initBody() + self.data['description'] = str(self.engagement.uuid) + self.data['eng_uuid'] = str(self.engagement.uuid) + self.addFeedback() + + def testNegativeAddFeedbackWithoutDescription(self): + self.initBody() + self.data['description'] = "" + self.data['eng_uuid'] = str(self.engagement.uuid) + self.addFeedback(HTTP_400_BAD_REQUEST) diff --git a/django/engagementmanager/tests/test_add_next_step_to_checklist.py b/django/engagementmanager/tests/test_add_next_step_to_checklist.py new file mode 100755 index 0000000..e88cddd --- /dev/null +++ b/django/engagementmanager/tests/test_add_next_step_to_checklist.py @@ -0,0 +1,157 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +from uuid import uuid4 + +from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED + +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants + + +class TestAddNextStepToChecklistTestCase(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.vendor = Vendor.objects.get(name=Constants.service_provider_company_name) + self.createDefaultRoles() + + # Create a user with role el + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.el_user.uuid)) + print('Full Name: ' + self.el_user.full_name) + print('-----------------------------------------------------') + + # Create a user with role el + self.peer_reviewer_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.el_user.uuid)) + print('Full Name: ' + self.el_user.full_name) + print('-----------------------------------------------------') + + self.user2 = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), '55501000199', 'user', self.standard_user, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user2.uuid)) + print('Full Name: ' + self.user2.full_name) + print('-----------------------------------------------------') + + # Create an Engagement with team + self.engagement = self.creator.createEngagement('123456789', 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.engagement_team.add(self.el_user) + self.engagement.peer_reviewer = self.peer_reviewer_user + self.engagement.save() + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement.uuid)) + print('-----------------------------------------------------') + + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, self.vendor) + print('-----------------------------------------------------') + print('Created VF:') + print('UUID: ' + str(self.vf.uuid)) + print('-----------------------------------------------------') + + self.template = self.creator.createDefaultCheckListTemplate() + self.data = dict() + self.data['checkListName'] = "ice checklist for test" + self.data['checkListTemplateUuid'] = str(self.template.uuid) + self.data['checkListAssociatedFiles'] = [] + for i in range(0, 2): + self.data['checkListAssociatedFiles'].append("file" + str(i)) + self.token = self.loginAndCreateSessionToken(self.el_user) + datajson = json.dumps(self.data, ensure_ascii=False) + self.clUrlStr = self.urlPrefix + "engagement/@eng_uuid/checklist/new/" + self.checklist = self.c.post(self.clUrlStr.replace("@eng_uuid", str(self.engagement.uuid)), + datajson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + + self.urlStr = self.urlPrefix + "engagement/@eng_uuid/checklist/@checklist_uuid/nextstep/" + self.nextStepList = [] + + def initBody(self): + assigneesData = [str(self.el_user.uuid), str(self.user2.uuid)] + files = ["f1", "f2"] + self.data['assigneesUuids'] = assigneesData + self.data['description'] = "Good Bye Norma Jin" + self.data['files'] = files + self.data['duedate'] = "2016-12-01" + self.nextStepList.append(self.data) + + assigneesData = [str(self.el_user.uuid)] + files = ["f5"] + self.data['assigneesUuids'] = assigneesData + self.data['description'] = "Don't cry for me Argentina" + self.data['files'] = files + self.data['duedate'] = "2017-06-28" + self.nextStepList.append(self.data) + + def testAddNextStepForCheckListPositive(self): + self.initBody() + self.nextStepList = json.dumps(self.nextStepList, ensure_ascii=False) + self.urlStr = self.urlStr.replace("@checklist_uuid", json.loads(self.checklist.content)[ + 'uuid']).replace("@eng_uuid", str(self.engagement.uuid)) + response = self.c.post(self.urlStr, self.nextStepList, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + return response + + def testAddNextStepForCheckListNegative(self): + self.initBody() + self.nextStepList = json.dumps(self.nextStepList, ensure_ascii=False) + self.urlStr = self.urlStr.replace("@checklist_uuid", json.loads( + self.checklist.content)['uuid']).replace("@eng_uuid", str(uuid4())) + response = self.c.post(self.urlStr, self.nextStepList, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('------------------------------> Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + return response diff --git a/django/engagementmanager/tests/test_audit_log_and_decision_api.py b/django/engagementmanager/tests/test_audit_log_and_decision_api.py new file mode 100755 index 0000000..42395ae --- /dev/null +++ b/django/engagementmanager/tests/test_audit_log_and_decision_api.py @@ -0,0 +1,375 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +from uuid import uuid4 +from rest_framework.status import HTTP_200_OK, HTTP_400_BAD_REQUEST,\ + HTTP_401_UNAUTHORIZED, HTTP_500_INTERNAL_SERVER_ERROR,\ + HTTP_405_METHOD_NOT_ALLOWED +from engagementmanager.models import Vendor, Checklist, ChecklistAuditLog, ChecklistDecision, ChecklistLineItem, ChecklistSection +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import CheckListLineType, CheckListDecisionValue, CheckListState, Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class AuditLogAndDecisionAPITest(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Amdocs', 'Other']) + self.createDefaultRoles() + + # Create a user with role el + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), '12323245435', 'el user', self.el, True) + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + # For negative tests + self.user = self.creator.createUser( + Vendor.objects.get(name=Constants.service_provider_company_name), + self.randomGenerator("main-vendor-email"), '12323245435', 'user', self.standard_user, True) + + self.template = self.creator.createDefaultCheckListTemplate() + self.engagement = self.creator.createEngagement( + uuid4(), 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.save() + self.engagement.engagement_team.add(self.el_user, self.user) + self.clbodydata = dict() + self.initCLBody() + self.auditdata = dict() + self.checklist = Checklist.objects.create(uuid=uuid4(), name=self.clbodydata['checkListName'], validation_cycle=1, associated_files=self.clbodydata[ + 'checkListAssociatedFiles'], engagement=self.engagement, template=self.template, creator=self.el_user, owner=self.el_user) + self.section = ChecklistSection.objects.create(uuid=uuid4(), name=self.randomGenerator("randomString"), weight=1.0, description=self.randomGenerator( + "randomString"), validation_instructions=self.randomGenerator("randomString"), template=self.template) + self.line_item = ChecklistLineItem.objects.create(uuid=uuid4(), name=self.randomGenerator("randomString"), weight=1.0, description=self.randomGenerator( + "randomString"), line_type=CheckListLineType.auto.name, validation_instructions=self.randomGenerator("randomString"), template=self.template, section=self.section) # @UndefinedVariable + self.decision = ChecklistDecision.objects.create( + uuid=uuid4(), checklist=self.checklist, template=self.template, lineitem=self.line_item) + self.section.save() + self.line_item.save() + self.decision.save() + self.checklist.save() + self.token = self.loginAndCreateSessionToken(self.user) + self.ELtoken = self.loginAndCreateSessionToken(self.el_user) + + def initCLBody(self): + self.clbodydata['checkListName'] = "ice-checklist-for-test" + self.clbodydata['checkListTemplateUuid'] = str(self.template.uuid) + self.clbodydata[ + 'checkListAssociatedFiles'] = "[\"file0/f69f4ce7-51d5-409c-9d0e-ec6b1e79df28\", \"file1/f69f4ce7-51d5-409c-9d0e-ec6b1e79df28\", \"file2/f69f4ce7-51d5-409c-9d0e-ec6b1e79df28\"]" + + def loggerTestFailedOrSucceded(self, bool): + if bool: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test Succeeded") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + else: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug("Test failed") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + def testCreateAuditLogViaChecklistPositive(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: Create AuditLog Via Checklist") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/@cl_uuid/auditlog/" + + logger.debug("Creating a checklist") + self.auditdata['description'] = "description text" + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.post(self.urlStr.replace("@cl_uuid", str(self.checklist.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_200_OK)) + + check = True + try: + ChecklistAuditLog.objects.get(checklist=self.checklist) + except ChecklistAuditLog.DoesNotExist: + check = False + + if (response.status_code == HTTP_200_OK and check): + self.loggerTestFailedOrSucceded(True) + self.assertEqual(response.status_code, HTTP_200_OK) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, HTTP_200_OK) + + def testCreateAuditLogViaChecklistNegativeEmptyDescription(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug( + " Negative Test started: Create AuditLog Via Checklist with empty description") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/@cl_uuid/auditlog/" + + logger.debug("Creating a checklist") + + self.auditdata['description'] = "" + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.post(self.urlStr.replace("@cl_uuid", str(self.checklist.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_400_BAD_REQUEST)) + + if (response.status_code == HTTP_400_BAD_REQUEST): + self.loggerTestFailedOrSucceded(True) + self.assertEqual(response.status_code, HTTP_400_BAD_REQUEST) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, HTTP_400_BAD_REQUEST) + + def testCreateAuditLogViaChecklistNegativeBadCLUuid(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug( + " Negative Test started: Create AuditLog Via Checklist with bad CL uuid") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/@cl_uuid/auditlog/" + + logger.debug("Creating a checklist") + + self.auditdata['description'] = "description text" + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.post(self.urlStr.replace("@cl_uuid", str(uuid4())), + datajson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_500_INTERNAL_SERVER_ERROR)) + + if (response.status_code == HTTP_500_INTERNAL_SERVER_ERROR): + self.loggerTestFailedOrSucceded(True) + else: + self.loggerTestFailedOrSucceded(False) + + self.assertEqual(response.status_code, HTTP_500_INTERNAL_SERVER_ERROR) + + def testCreateAuditLogViaDecisionPositive(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: Create AuditLog Via Decision") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/decision/@decision_uuid/auditlog/" + + logger.debug("Creating a checklist") + + self.auditdata['description'] = "description text" + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.post(self.urlStr.replace("@decision_uuid", str(self.decision.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_200_OK)) + + check = ChecklistAuditLog.objects.get(decision=self.decision) + if (response.status_code == HTTP_200_OK and check): + self.loggerTestFailedOrSucceded(True) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, HTTP_200_OK) + + def testCreateAuditLogViaDecisionNegativeEmptyDescription(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug( + " Negative Test started: Create AuditLog Via Decision with empty description") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/decision/@decision_uuid/auditlog/" + + logger.debug("Creating a checklist") + + self.auditdata['description'] = "" + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.post(self.urlStr.replace("@decision_uuid", str(self.decision.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_400_BAD_REQUEST)) + + if (response.status_code == HTTP_400_BAD_REQUEST): + self.loggerTestFailedOrSucceded(True) + self.assertEqual(response.status_code, HTTP_400_BAD_REQUEST) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, HTTP_400_BAD_REQUEST) + + def testCreateAuditLogViaDecisionNegativeBadCLUuid(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug( + " Negative Test started: Create AuditLog Via Decision with bad Decision uuid") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/decision/@decision_uuid/auditlog/" + + logger.debug("Creating a checklist") + + self.auditdata['description'] = "description text" + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.post(self.urlStr.replace("@decision_uuid", str(uuid4())), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_400_BAD_REQUEST)) + + if (response.status_code == HTTP_400_BAD_REQUEST): + self.loggerTestFailedOrSucceded(True) + self.assertEqual(response.status_code, HTTP_400_BAD_REQUEST) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + def testSetDecisionPositive(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: Set decision's value to approved") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/decision/@decision_uuid" + + logger.debug("Creating a checklist") + + self.checklist.state = CheckListState.review.name # @UndefinedVariable + self.checklist.owner = self.el_user + self.checklist.save() + # @UndefinedVariable + self.auditdata['value'] = CheckListDecisionValue.approved.name + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.put(self.urlStr.replace("@decision_uuid", str(self.decision.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_200_OK)) + + check = ChecklistDecision.objects.get(checklist=self.checklist) + if (response.status_code == HTTP_200_OK and check.review_value == 'approved'): + self.loggerTestFailedOrSucceded(True) + self.assertEqual(response.status_code, HTTP_200_OK) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, HTTP_200_OK) + + def testSetDecisionNegativeOwnerDifferentThanUser(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Negative Test started: Owner Different Than User") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/decision/@decision_uuid" + + logger.debug("Creating a checklist") + + self.checklist.state = CheckListState.review.name # @UndefinedVariable + self.checklist.owner = self.user + self.checklist.save() + # @UndefinedVariable + self.auditdata['value'] = CheckListDecisionValue.approved.name + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.put(self.urlStr.replace("@decision_uuid", str(self.decision.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_401_UNAUTHORIZED)) + + if (response.status_code == HTTP_401_UNAUTHORIZED): + self.loggerTestFailedOrSucceded(True) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + def testSetDecisionNegativeInvalidDecisionValue(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug("Negative Test started: Invalid Decision Value") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/decision/@decision_uuid" + + logger.debug("Creating a checklist") + + self.checklist.state = CheckListState.review.name # @UndefinedVariable + self.checklist.owner = self.el_user + self.checklist.save() + self.auditdata['value'] = self.randomGenerator("randomString") + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.put(self.urlStr.replace("@decision_uuid", str(self.decision.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_400_BAD_REQUEST)) + + if (response.status_code == HTTP_400_BAD_REQUEST): + self.loggerTestFailedOrSucceded(True) + self.assertEqual(response.status_code, HTTP_400_BAD_REQUEST) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, HTTP_400_BAD_REQUEST) + + def testSetDecisionNegativeInvalidChecklistState(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug("Negative Test started: Invalid checklist state") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/decision/@decision_uuid" + + logger.debug("Creating a checklist") + + self.checklist.state = self.randomGenerator( + "randomString") # @UndefinedVariable + self.checklist.owner = self.el_user + self.checklist.save() + # @UndefinedVariable + self.auditdata['value'] = CheckListDecisionValue.approved.name + datajson = json.dumps(self.auditdata, ensure_ascii=False) + + response = self.c.put(self.urlStr.replace("@decision_uuid", str(self.decision.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Got response : ' + str(response.status_code) + + " Expecting " + str(HTTP_400_BAD_REQUEST)) + + if (response.status_code == HTTP_400_BAD_REQUEST): + self.loggerTestFailedOrSucceded(True) + self.assertEqual(response.status_code, HTTP_400_BAD_REQUEST) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, HTTP_405_METHOD_NOT_ALLOWED) diff --git a/django/engagementmanager/tests/test_auth_service.py b/django/engagementmanager/tests/test_auth_service.py new file mode 100755 index 0000000..b61de8d --- /dev/null +++ b/django/engagementmanager/tests/test_auth_service.py @@ -0,0 +1,244 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from engagementmanager.models import Vendor +from engagementmanager.service.authorization_service import AuthorizationService, Permissions +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.utils.request_data_mgr import request_data_mgr + + +class TestAuthService(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + + self.createDefaultRoles() + + self.admin_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), Constants.service_provider_admin_mail, + '55501000199', 'admin user', self.admin, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.admin_user.uuid)) + print('Full Name: ' + self.admin_user.full_name) + print('-----------------------------------------------------') + + # Create a user with role el + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.el_user.uuid)) + print('Full Name: ' + self.el_user.full_name) + print('-----------------------------------------------------') + + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.peer_reviewer.uuid)) + print('Full Name: ' + self.peer_reviewer.full_name) + print('-----------------------------------------------------') + + # Create another EL + self.another_el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user2', self.el, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.another_el_user.uuid)) + print('Full Name: ' + self.another_el_user.full_name) + print('-----------------------------------------------------') + + # Create a user with role standard_user + self.user = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user.uuid)) + print('Full Name: ' + self.user.full_name) + print('-----------------------------------------------------') + + # Create a user with role standard_user with SSH key + self.user_with_ssh = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'ssh user', self.standard_user, True, 'just-a-fake-ssh-key') + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user_with_ssh.uuid)) + print('Full Name: ' + self.user_with_ssh.full_name) + print('-----------------------------------------------------') + + # Create an Engagement with team + self.engagement = self.creator.createEngagement('just-a-fake-uuid', 'Validation', None) + self.engagement.engagement_team.add(self.user, self.el_user) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.save() + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement.uuid)) + print('-----------------------------------------------------') + + # Create another Engagement with team with SSH Key + self.engagement_ssh = self.creator.createEngagement('just-another-fake-uuid', 'Validation', None) + self.engagement_ssh.engagement_team.add(self.user_with_ssh, self.el_user) + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement_ssh.uuid)) + print('-----------------------------------------------------') + + # Create another Engagement with Main Contact + self.engagement_with_contact = self.creator.createEngagement('yet-just-another-fake-uuid', 'Validation', None) + self.engagement_with_contact.engagement_team.add(self.user_with_ssh, self.el_user) + self.engagement_with_contact.contact_user = self.user_with_ssh + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement_with_contact.uuid)) + print('-----------------------------------------------------') + + # Create another Engagement with Main Contact + self.engagement_4_createNS = self.creator.createEngagement('yet-just-another-fake-uuid2', 'Validation', None) + self.engagement_4_createNS.engagement_team.add(self.user_with_ssh, self.el_user) + self.engagement_4_createNS.contact_user = self.user_with_ssh + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement_4_createNS.uuid)) + print('-----------------------------------------------------') + self.token = self.loginAndCreateSessionToken(self.user) + self.sshtoken = self.loginAndCreateSessionToken(self.user_with_ssh) + self.ELtoken = self.loginAndCreateSessionToken(self.el_user) + + self.checklist_template = self.creator.createDefaultCheckListTemplate() + print('-----------------------------------------------------') + print('Created Check List Template') + print('UUID: ' + str(self.checklist_template.uuid)) + print('-----------------------------------------------------') + + self.checklist = self.creator.createCheckList('some-checklist', 'Automation', 1, '{}', self.engagement, + self.checklist_template, self.el_user, self.peer_reviewer) + print('-----------------------------------------------------') + print('Created Check List') + print('UUID: ' + str(self.checklist.uuid)) + print('-----------------------------------------------------') + + def testAuthorization(self): + auth = AuthorizationService() + + ###################### + # TEST EL PERMISSIONS + ###################### + # Test Add VF for EL + auth_result, message = auth.is_user_able_to(self.el_user, Permissions.add_vf, str(self.engagement.uuid), '') + print('ADD_VF Got Result : ' + str(auth_result) + ' ' + message) + self.assertEquals(auth_result, True) + + auth_result, message = auth.is_user_able_to(self.el_user, Permissions.add_vendor, '', '') + print('ADD_VENDOR Got Result : ' + message) + self.assertEquals(auth_result, True) + + # Check that EL that belong to ENG can create next step + auth_result, message = auth.is_user_able_to( + self.el_user, Permissions.add_nextstep, str(self.engagement.uuid), '') + print('ADD_NEXTSTEP Got Result : ' + message) + self.assertEquals(auth_result, True) + + # Check that EL that does not belong to ENG cannot create next step + auth_result, message = auth.is_user_able_to( + self.another_el_user, Permissions.add_nextstep, str(self.engagement.uuid), '') + print('ADD_NEXTSTEP Got Result : ' + message) + self.assertEquals(auth_result, False) + + # Check that CL can be created only by EL that belongs to ENG + auth_result, message = auth.is_user_able_to( + self.el_user, Permissions.add_checklist, str(self.engagement.uuid), '') + print('ADD_CHECKLIST Got Result : ' + message) + self.assertEquals(auth_result, True) + + # Check that CL can be created only by EL that belongs to ENG (use another el) + auth_result, message = auth.is_user_able_to( + self.another_el_user, Permissions.add_checklist, str(self.engagement.uuid), '') + print('ADD_CHECKLIST Got Result : ' + message) + self.assertEquals(auth_result, False) + + auth_result, message = auth.is_user_able_to(self.user, Permissions.add_checklist, str(self.engagement.uuid), '') + print('ADD_CHECKLIST Got Result : ' + message) + self.assertEquals(auth_result, False) + + # Check that only peer reviewer can do peer review + request_data_mgr.set_cl_uuid(str(self.checklist.uuid)) + auth_result, message = auth.is_user_able_to( + self.peer_reviewer, Permissions.peer_review_checklist, str(self.engagement.uuid), str(self.checklist.uuid)) + print('PEER_REVIEW_CHECKLIST Got Result : ' + message) + self.assertEquals(auth_result, True) + + # Check that a user that is not defined as peer review cannot review + auth_result, message = auth.is_user_able_to( + self.el_user, Permissions.peer_review_checklist, str(self.engagement.uuid), str(self.checklist.uuid)) + print('PEER_REVIEW_CHECKLIST Got Result : ' + message) + self.assertEquals(auth_result, False) + + # Check that admin which is not owner cannot approve CL + # Test is greyed out due to the fact that admin can approve any CL +# auth_result, message = auth.is_user_able_to(self.admin_user, Permissions.admin_approve_checklist, str(self.engagement.uuid), str(self.checklist.uuid)) +# print('ADMIN_APPROVE_CHECKLIST Got Result : ' + message) +# self.assertEquals(auth_result, False) + + # Check that only admin which is the cl owner can approve CL + self.checklist.owner = self.admin_user # Make admin the owner + self.checklist.save() + auth_result, message = auth.is_user_able_to( + self.admin_user, Permissions.admin_approve_checklist, str(self.engagement.uuid), str(self.checklist.uuid)) + print('ADMIN_APPROVE_CHECKLIST Got Result : ' + message) + self.assertEquals(auth_result, True) + + # Check that only admin can approve CL (attempt with regular EL) + auth_result, message = auth.is_user_able_to( + self.el_user, Permissions.admin_approve_checklist, str(self.engagement.uuid), str(self.checklist.uuid)) + print('ADMIN_APPROVE_CHECKLIST Got Result : ' + message) + self.assertEquals(auth_result, False) + + # Check that only admin can approve CL (attempt with regular user) + auth_result, message = auth.is_user_able_to( + self.user, Permissions.admin_approve_checklist, str(self.engagement.uuid), str(self.checklist.uuid)) + print('ADMIN_APPROVE_CHECKLIST Got Result : ' + message) + self.assertEquals(auth_result, False) diff --git a/django/engagementmanager/tests/test_base_entity.py b/django/engagementmanager/tests/test_base_entity.py new file mode 100755 index 0000000..a90be1f --- /dev/null +++ b/django/engagementmanager/tests/test_base_entity.py @@ -0,0 +1,319 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from abc import ABCMeta, abstractmethod +import http.client +import inspect +import re +import django +from django.conf import settings +from django.test import TestCase +from django.test.client import Client +import psycopg2 +from rest_framework.parsers import JSONParser +from wheel.signatures import assertTrue +django.setup() +from engagementmanager.tests.vvpEntitiesCreator import VvpEntitiesCreator +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class TestBaseEntity(TestCase): + __metaclass__ = ABCMeta + + def setUp(self): + logger.debug("---------------------- TestCase " + self.__class__.__name__ + " ----------------------") + self.urlPrefix = "/ice/v1/engmgr/" + self.conn = http.client.HTTPConnection("127.0.0.1", 8000) # @UndefinedVariable + self.c = Client() + self.creator = VvpEntitiesCreator() + settings.IS_SIGNAL_ENABLED = False + self.childSetup() + + def tearDown(self): + settings.IS_SIGNAL_ENABLED = True + self.conn.close() + logger.debug("---------------------- TestCase " + self.__class__.__name__ + " ---------------------- ") + logger.debug("") + logger.debug("") + + def createVendors(self, vendorList): + for vendor in vendorList: + vendorUuid, vendor = self.creator.createVendor(vendor) + logger.debug(vendorUuid) + + def createDefaultRoles(self): + # Create Default Roles if does not exist + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + + def printTestName(self, testNameTrigger): + if testNameTrigger == "START": + logger.debug("[TestName: " + inspect.stack()[1][3] + " - START]") + elif testNameTrigger == "END": + logger.debug("[TestName: " + inspect.stack()[1][3] + " - END]") + + @abstractmethod + def childSetup(self): + pass + + def nativeConnect2Db(self): + return psycopg2.connect("dbname='icedb' user='iceuser' host='localhost' password='Aa123456' port='5433'") + + def deleteRecord(self, urlStr, getRecord, isNegativeTest): + logger.debug("DELETE: " + urlStr + "/" + getRecord) + self.conn.request("DELETE", urlStr + "/" + getRecord) + r1 = self.conn.getresponse() + logger.debug("DELETE response status code: " + str(r1.status)) +# logger.debug("Number of records in the table: " + self.getNumOfRecordViaRest(urlStr)) + if (isNegativeTest == False): + assertTrue(r1.status == 204) + return r1 + elif (isNegativeTest == True): + return r1 + + def createEntityViaPost(self, urlStr, params, headers=None): + logger.debug("POST: " + urlStr + " Body: " + params + " Headers: " + str(headers)) + self.conn.request("POST", urlStr, params, headers) + r1 = self.conn.getresponse() + return r1 + + def editEntityViaPut(self, urlStr, getRecord, params, isNegativeTest): + logger.debug("PUT: " + urlStr + "/" + getRecord + " " + params) + self.conn.request("PUT", urlStr + "/" + getRecord, params) + r1 = self.conn.getresponse() + logger.debug("PUT response status code: " + str(r1.status)) + if (isNegativeTest == False): + assertTrue(r1.status == 200) + return r1 + elif (isNegativeTest == True): + return r1 + + ''' + If you wish to "SELECT *" and get the first record then send queryFilterName=1 and queryFilterValue=1 to the method + ''' + + def filterTableByColAndVal(self, queryColumnName, queryTableName, queryFilterName, queryFilterValue): + dbConn = self.nativeConnect2Db() + cur = dbConn.cursor() + queryStr = "SELECT %s FROM %s WHERE %s ='%s'" % ( + queryColumnName, queryTableName, queryFilterName, queryFilterValue) + logger.debug(queryStr) + cur.execute(queryStr) + result = str(cur.fetchone()) + if (bool(re.search('[^0-9]', result)) == True): + logger.debug("Looks like result (" + result + + ") from DB is in a multi column pattern: [col1, col2,...,coln], omitting it all colm beside " + queryColumnName) + if (result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif (result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + logger.debug("Filter result: " + result) + return result + + def selectLastValue(self, queryColumnName, queryTableName, orderBy="id"): + dbConn = self.nativeConnect2Db() + cur = dbConn.cursor() + queryStr = "select %s from %s ORDER BY %s DESC LIMIT 1;" % (queryColumnName, queryTableName, orderBy) + logger.debug(queryStr) + cur.execute(queryStr) + result = str(cur.fetchone()) + if (bool(re.search('[^0-9]', result)) == True): + logger.debug("Looks like result (" + result + + ") from DB is in a multi column pattern: [col1, col2,...,coln], omitting it all colm beside " + queryColumnName) + if (result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif (result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + logger.debug("Filter result: " + result) + return result + + def columnMaxLength(self, tableName, columnName): + dbConn = self.nativeConnect2Db() + cur = dbConn.cursor() + queryStr = "SELECT character_maximum_length from information_schema.columns WHERE table_name ='%s' and column_name = '%s'" % ( + tableName, columnName) + cur.execute(queryStr) + result = str(cur.fetchone()) + if (bool(re.search('[^0-9]', result)) == True): + if (result.find("',)") != -1): # formatting strings e.g uuid + result = result.partition('\'')[-1].rpartition('\'')[0] + elif (result.find(",)") != -1): # formatting ints e.g id + result = result.partition('(')[-1].rpartition(',')[0] + return int(result) + + def getNumOfRecordViaRest(self, urlStr): + logger.debug("GET: " + urlStr) + self.conn.request("GET", urlStr) + data_list = JSONParser().parse(self.conn.getresponse()) + cnt = str(len(data_list)) + return cnt + + def getMaximalNameValue(self, urlStr): + logger.debug("GET: " + urlStr) + self.conn.request("GET", urlStr) + data_list = JSONParser().parse(self.conn.getresponse()) + maxName = 0 + for asi in data_list: + logger.debug(asi) + for key, value in asi.items(): + if (key == "name"): + if (maxName < int(value)): + maxName = int(value) + logger.debug("MaximalNameValue=" + str(maxName)) + return maxName + + def getRecordAndDeserilizeIt(self, urlStr, getFilter, serializer, isCreateObj): + logger.debug("GET: " + urlStr + "/" + getFilter) + try: + self.conn.request("GET", urlStr + "/" + getFilter) + data = JSONParser().parse(self.conn.getresponse()) + logger.debug("DATA After JSON Parse:" + str(data)) + ser = serializer(data=data) +# logger.debug(" --> Serializer data: "+repr(ser)) + if (isCreateObj == True): + logger.debug("Creating ...") + obj = ser.create(data) + else: + logger.debug("Updating ...") + obj = ser.create(data) + ser.update(obj, data) + except Exception: + logger.debug(Exception) + raise Exception + return obj + + def newParameters(self, oldValue, newValue, uuidValue): + newParams = re.sub(oldValue, newValue, self.params) # Find and replace in string. + newParams = re.sub("}", ', "uuid":"' + uuidValue + '"}', newParams) # Add UUID to params + return newParams + + def negativeTestPost(self, getFilter, allowTwice=False): + logger.debug("POST negative tests are starting now!") + if (allowTwice == False): + logger.debug("Negative 01: Insert the same record twice via REST") + r1 = self.createEntityViaPost(self.urlStr, self.params) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 400) + self.deleteRecord(self.urlStr, getFilter, False) + logger.debug("Negative 02: Insert record with empty value via REST") + paramsEmptyValue = re.sub(r':\".*?\"', ':\"\"', self.params) # Create params with empty values. + r1 = self.createEntityViaPost(self.urlStr, paramsEmptyValue) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 400) + logger.debug("Negative 03: Insert record with missing value via REST") + paramsMissingValue = re.sub(r'{\".*?\":', '{\"\":', self.params) # Create params without records part 1. + # Create params without records part 2. + paramsMissingValue = re.sub(r', \".*?\":', ', \"\":', paramsMissingValue) + r1 = self.createEntityViaPost(self.urlStr, paramsMissingValue) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 400) + logger.debug("Negative 04: Insert record with more than " + str(self.longValueLength) + " chars via REST") + paramsLongValue = re.sub( + r':\".*?\"', ':\"' + str(self.randomGenerator("randomNumber", self.longValueLength)) + '\"', self.params) + r1 = self.createEntityViaPost(self.urlStr, paramsLongValue) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 400) + if (allowTwice == True): + logger.debug("Deleting record from database...") + self.deleteRecord(self.urlStr, getFilter, False) + + def negativeTestPut(self, getFilter, params): + logger.debug("PUT negative tests are starting now!") + match = re.search("\D", getFilter) # Check if getFilter contains non-digit characters. + if not match: + logger.debug("Negative 01: Edit non existing record via REST") + nonExistingValue = self.randomGenerator("randomNumber", 5) + r1 = self.editEntityViaPut(self.urlStr, nonExistingValue, params, True) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 404) + logger.debug("Negative 02: Edit record with string in URL instead of integer via REST") + randStr = self.randomGenerator("randomString") + r1 = self.editEntityViaPut(self.urlStr, randStr, params, True) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 400) + else: + logger.debug("Negative 01: Edit non existing record via REST") + nonExistingValue = self.randomGenerator("randomString") + r1 = self.editEntityViaPut(self.urlStr, nonExistingValue, params, True) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 404) + logger.debug("Negative 02: Edit record without URL pointer via REST") + r1 = self.editEntityViaPut(self.urlStr, "", params, True) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 405) + logger.debug("Negative 03: Edit record with empty value via REST") + paramsEmptyValue = re.sub(r':\".*?\"', ':\"\"', params) # Create params with empty values. + r1 = self.editEntityViaPut(self.urlStr, getFilter, paramsEmptyValue, True) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 400) + logger.debug("Negative 04: Edit record with missing value via REST") + paramsMissingValue = re.sub(r'{\".*?\":', '{\"\":', params) # Create params without records part 1. + # Create params without records part 2. + paramsMissingValue = re.sub(r', \".*?\":', ', \"\":', paramsMissingValue) + r1 = self.editEntityViaPut(self.urlStr, getFilter, paramsMissingValue, True) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 400) + logger.debug("Deleting record from database...") + self.deleteRecord(self.urlStr, getFilter, False) + + def negativeTestDelete(self): + logger.debug("DELETE negative tests are starting now!") + logger.debug("Negative 01: Delete non existing record via REST or wrong URL pointer") + nonExistingValue = self.randomGenerator("randomString") + r1 = self.deleteRecord(self.urlStr, nonExistingValue, True) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 404 or r1.status == 500 or r1.status == 400) + logger.debug("Negative 02: Delete record without URL pointer via REST") + r1 = self.deleteRecord(self.urlStr, "", True) + logger.debug(r1.status, r1.reason) + assertTrue(r1.status == 405) + + def failTest(self, failureReason): + logger.debug(">>> Test failed!\n", failureReason) + self.assertTrue(False) + + def failRestTest(self, r1): + logger.debug("Previous HTTP POST request has failed with " + str(r1.status)) + self.assertTrue(False) + + def randomGenerator(self, typeOfValue, numberOfDigits=0): + return self.creator.randomGenerator(typeOfValue, numberOfDigits) + + def loginAndCreateSessionToken(self, user): + return self.creator.loginAndCreateSessionToken(user) diff --git a/django/engagementmanager/tests/test_base_transaction_entity.py b/django/engagementmanager/tests/test_base_transaction_entity.py new file mode 100755 index 0000000..c4a5159 --- /dev/null +++ b/django/engagementmanager/tests/test_base_transaction_entity.py @@ -0,0 +1,76 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from abc import ABCMeta, abstractmethod +import http.client +import django +from django.conf import settings +from django.test.client import Client +from django.test.testcases import TransactionTestCase +from engagementmanager.tests.vvpEntitiesCreator import VvpEntitiesCreator +from engagementmanager.service.logging_service import LoggingServiceFactory +django.setup() +logger = LoggingServiceFactory.get_logger() + + +class TestBaseTransactionEntity(TransactionTestCase): + __metaclass__ = ABCMeta + + def setUp(self): + logger.debug("---------------------- TransactionTestCase " + self.__class__.__name__ + " ----------------------") + self.urlPrefix = "/ice/v1/engmgr/" + self.conn = http.client.HTTPConnection("127.0.0.1", 8000) # @UndefinedVariable + self.c = Client() + self.creator = VvpEntitiesCreator() + settings.IS_SIGNAL_ENABLED = False + self.childSetup() + + def tearDown(self): + settings.IS_SIGNAL_ENABLED = True + self.conn.close() + logger.debug("---------------------- TransactionTestCase " + self.__class__.__name__ + " ---------------------- ") + + @abstractmethod + def childSetup(self): + pass + + def randomGenerator(self, typeOfValue, numberOfDigits=0): + return self.creator.randomGenerator(typeOfValue, numberOfDigits) + + def loginAndCreateSessionToken(self, user): + return self.creator.loginAndCreateSessionToken(user) diff --git a/django/engagementmanager/tests/test_checklist.py b/django/engagementmanager/tests/test_checklist.py new file mode 100755 index 0000000..a7fa800 --- /dev/null +++ b/django/engagementmanager/tests/test_checklist.py @@ -0,0 +1,177 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +from uuid import uuid4 + +from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED, HTTP_400_BAD_REQUEST + +from engagementmanager.models import ChecklistTemplate, ChecklistLineItem +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants + + +class TestChecklistTestCase(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Amdocs', 'Other']) + self.createDefaultRoles() + + # Create a user with role el + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + # For negative tests + self.user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + self.urlStr = self.urlPrefix + "engagement/@eng_uuid/checklist/new/" + self.data = dict() + self.template = self.creator.createDefaultCheckListTemplate() +# self.engagement = Engagement.objects.create(uuid='just-a-fake-uuid',engagement_stage='Validation') + self.engagement = self.creator.createEngagement( + uuid4(), 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.engagement_team.add(self.user) + self.engagement.engagement_team.add(self.el_user) + self.engagement.save() + self.vendor = Vendor.objects.get(name='Other') + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, self.vendor) + + def initBody(self): + self.data['checkListName'] = "ice checklist for test" + self.data['checkListTemplateUuid'] = str(self.template.uuid) + self.data['checkListAssociatedFiles'] = list() + self.data['checkListAssociatedFiles'].append("file0") + self.data['checkListAssociatedFiles'].append("file1") + self.data['checkListAssociatedFiles'].append("file2") + + def getOrCreateChecklist(self, expectedStatus=HTTP_200_OK, httpMethod="GET"): + if (httpMethod == "GET"): + response = self.c.get(self.urlStr.replace("@eng_uuid", str(self.engagement.uuid)), + **{'HTTP_AUTHORIZATION': "token " + self.token}) + elif (httpMethod == "POST"): + datajson = json.dumps(self.data, ensure_ascii=False) + response = self.c.post(self.urlStr.replace("@eng_uuid", str(self.engagement.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + + print('Got response : ' + str(response.status_code) + + " Expecting " + str(expectedStatus)) + self.assertEqual(response.status_code, expectedStatus) + return response + + def testGetChecklistPositive(self): + self.token = self.loginAndCreateSessionToken(self.el_user) + print("testGetChecklistPositive") + self.getOrCreateChecklist(HTTP_200_OK) + + def testGetChecklistNegativeNoEng(self): + self.token = self.loginAndCreateSessionToken(self.el_user) + self.engagement.uuid = uuid4() + print("testGetChecklistPositive") + self.getOrCreateChecklist(HTTP_401_UNAUTHORIZED) +# + + def testPostChecklistPositive(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.el_user) + print("testPostChecklistPositive") + self.getOrCreateChecklist(HTTP_200_OK, "POST") + + def testPostChecklistNegativeNotElUser(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.user) + print("testPostChecklistNegativeNotElUser") + self.getOrCreateChecklist(HTTP_401_UNAUTHORIZED, "POST") + + def testPostChecklistNegativeNoEng(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.el_user) + self.engagement.uuid = uuid4() + print("testPostChecklistNegativeNoEng") + self.getOrCreateChecklist(HTTP_401_UNAUTHORIZED, "POST") + + def testPostChecklistNegativeMissingParam(self): + self.token = self.loginAndCreateSessionToken(self.el_user) + print("testPostChecklistNegativeMissingParam") + self.getOrCreateChecklist(HTTP_400_BAD_REQUEST, "POST") + + def testPostTestEngineDecisions(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.el_user) + print("testGetChecklistPositive") + response = self.getOrCreateChecklist(HTTP_200_OK, "POST") + + checklist = json.loads(response.content) + self.urlStr = self.urlPrefix + "checklist/@checklist_uuid/testengine/" + + # ChecklistLineItem.objects.get(uuid=decision['line_item_id']); + + print(checklist['template']['uuid']) + template = ChecklistTemplate.objects.get( + uuid=checklist['template']['uuid']) + print(template) + line_items = ChecklistLineItem.objects.filter(template=template) + for line_item in line_items: + print(line_item) + + return + self.data = dict() + self.data['decisions'] = "{123}" + datajson = json.dumps(self.data, ensure_ascii=False) + response = self.c.get(self.urlStr.replace("@checklist_uuid", str(checklist['uuid'])), + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + + print(json.loads(response.content)) + + return + + self.data = dict() + self.data['decisions'] = "{123}" + datajson = json.dumps(self.data, ensure_ascii=False) + response = self.c.post(self.urlStr.replace("@checklist_uuid", str(checklist['uuid'])), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) diff --git a/django/engagementmanager/tests/test_checklist_template.py b/django/engagementmanager/tests/test_checklist_template.py new file mode 100755 index 0000000..c3c63e4 --- /dev/null +++ b/django/engagementmanager/tests/test_checklist_template.py @@ -0,0 +1,168 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json + +import mock +from rest_framework.status import HTTP_200_OK, HTTP_404_NOT_FOUND,\ + HTTP_500_INTERNAL_SERVER_ERROR + +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants + + +def dummy_true(): + return True + + +@mock.patch('engagementmanager.service.checklist_service.CheckListSvc.decline_all_template_checklists', dummy_true) +class TestChecklistTestCase(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Amdocs', 'Other']) + self.createDefaultRoles() + # Create a user with role el + self.el_user = self.creator.createUser( + Vendor.objects.get(name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + self.admin_user = self.creator.createUser(Vendor.objects.get(name=Constants.service_provider_company_name), + Constants.service_provider_admin_mail, '55501000199', + 'admin user', self.admin, True) + # For negative tests + self.user = self.creator.createUser( + Vendor.objects.get(name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + self.urlStrForSave = self.urlPrefix + "checklist/template/" + self.urlStrForGetTmpl = self.urlPrefix + "checklist/template/@template_uuid" + self.urlStrForGetTmpls = self.urlPrefix + "checklist/templates/" + self.data = dict() + self.template = self.creator.createDefaultCheckListTemplate() + self.engagement = self.creator.createEngagement('just-a-fake-uuid', 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.el_user + self.engagement.engagement_team.add(self.user) + self.engagement.engagement_team.add(self.el_user) + self.engagement.save() + self.vendor = Vendor.objects.get(name='Other') + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF( + self.randomGenerator("randomString"), self.engagement, self.deploymentTarget, False, self.vendor) + + def initBody(self): + self.data['uuid'] = str(self.template.uuid) + self.data['name'] = self.template.name + self.data['sections'] = list() + li1 = dict() + li1["uuid"] = "newEntity" + li1["name"] = "li1_name" + li1["description"] = "li1_description" + li1["validation_instructions"] = "li1_validation_instructions" + + sec1 = dict() + sec1["uuid"] = "newEntity" + sec1["name"] = "sec1_name" + sec1["description"] = "sec1_description" + sec1["validation_instructions"] = "sec1_validation_instructions" + + sec1["lineItems"] = list() + sec1["lineItems"].append(li1) + + self.data['sections'].append(sec1) + print(self.data) + + def getOrCreateChecklistTemplate(self, urlStr, expectedStatus=HTTP_200_OK, httpMethod="GET"): + if (httpMethod == "GET"): + if (urlStr == self.urlStrForGetTmpls): + response = self.c.get(urlStr, + **{'HTTP_AUTHORIZATION': "token " + self.token}) + else: + response = self.c.get(urlStr.replace("@template_uuid", str(self.template.uuid)), + **{'HTTP_AUTHORIZATION': "token " + self.token}) + elif (httpMethod == "PUT"): + datajson = json.dumps(self.data, ensure_ascii=False) + response = self.c.put( + urlStr, datajson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code) + " Expecting " + str(expectedStatus)) + print('Got response : ' + str(response.status_code) + " Expecting " + str(expectedStatus)) + self.assertEqual(response.status_code, expectedStatus) + return response + + ### TESTS ### + def testSaveChecklistTemplate(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.admin_user) + print("testSaveChecklistTemplate") + self.getOrCreateChecklistTemplate(self.urlStrForSave, httpMethod="PUT") + + def testSaveChecklistTemplateMissingTemplateUuid(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.admin_user) + self.data['uuid'] = "" + print("testSaveChecklistTemplateMissingTemplateUuid") + self.getOrCreateChecklistTemplate(self.urlStrForSave, expectedStatus=HTTP_404_NOT_FOUND, httpMethod="PUT") + + def testSaveChecklistTemplateNotExistingTemplate(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.admin_user) + print("testSaveChecklistTemplateNoExistanceTemplate") + self.data['uuid'] = 'fake_uuid' + self.getOrCreateChecklistTemplate(self.urlStrForSave, expectedStatus=HTTP_404_NOT_FOUND, httpMethod="PUT") + + def testSaveChecklistTemplateMissingKey(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.admin_user) + print("testSaveChecklistTemplateNoExistanceTemplate") + # take the first line item (li1_name) which is newEntity and remove its name, expect 500 + self.data['sections'][0]["lineItems"][0]["name"] = None + self.getOrCreateChecklistTemplate( + self.urlStrForSave, expectedStatus=HTTP_500_INTERNAL_SERVER_ERROR, httpMethod="PUT") + + def testGetChecklistTemplates(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.admin_user) + print("testSaveChecklistTemplate") + self.getOrCreateChecklistTemplate(self.urlStrForGetTmpls, httpMethod="GET") + + def testGetChecklistTemplate(self): + self.initBody() + self.token = self.loginAndCreateSessionToken(self.admin_user) + print("testSaveChecklistTemplate") + self.getOrCreateChecklistTemplate(self.urlStrForGetTmpl, httpMethod="GET") diff --git a/django/engagementmanager/tests/test_cms_documentation_search.py b/django/engagementmanager/tests/test_cms_documentation_search.py new file mode 100755 index 0000000..2d13f96 --- /dev/null +++ b/django/engagementmanager/tests/test_cms_documentation_search.py @@ -0,0 +1,103 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +import mock +from rest_framework.status import HTTP_200_OK +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +def search_page_mock(keyword): + result = [{u'id': 1, u'title': 'exists_title', u'children': [], u'status': '', u'_order': 1}, ] + return result + + +def search_empty_page_mock(keyword): + return [] + + +class TestCMSDocumentationSearch(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + self.user = self.creator.createUser(Vendor.objects.get(name='Other'), + self.randomGenerator("main-vendor-email"), 'Aa123456', + 'user', self.standard_user, True) + self.token = self.loginAndCreateSessionToken(self.user) + + def testSearchEmptyString(self): + urlStr = self.urlPrefix + 'cms/pages/search/?keyword=' + self.printTestName("testSearchEmptyString [Start]") + logger.debug("action should success (200), and return empty array") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + content = json.loads(response.content) + self.assertEqual(content, []) + self.printTestName("testSearchEmptyString [End]") + + @mock.patch('engagementmanager.apps.cms_client.search_pages', search_empty_page_mock) + def testSearchNotExistsKeyword(self): + urlStr = self.urlPrefix + 'cms/pages/search/?keyword=somewordnotexists' + self.printTestName("testSearchNotExistsKeyword [Start]") + logger.debug("action should success (200), and return empty array") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + content = json.loads(response.content) + self.assertEqual(content, []) + self.printTestName("testSearchNotExistsKeyword [End]") + + @mock.patch('engagementmanager.apps.cms_client.search_pages', search_page_mock) + def testSearchExistsKeyword(self): + urlStr = self.urlPrefix + 'cms/pages/search/?keyword=exists_title' + self.printTestName("testSearchExistsKeyword [Start]") + logger.debug("action should success (200), and return array with one page (by mock)") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + content = json.loads(response.content) + self.assertEqual(content[0]['title'], 'exists_title') + self.printTestName("testSearchExistsKeyword [End]") diff --git a/django/engagementmanager/tests/test_cms_pages.py b/django/engagementmanager/tests/test_cms_pages.py new file mode 100755 index 0000000..4fd9473 --- /dev/null +++ b/django/engagementmanager/tests/test_cms_pages.py @@ -0,0 +1,135 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +import mock +from rest_framework.status import HTTP_200_OK +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +def get_pages_mock(title=""): + result = [{u'meta_description': u'Content of page #1', u'parent': None, u'title': u'Page #1', + u'login_required': True, u'children': [], u'id': 1, + u'content': u'<p>Content of page #1</p>', u'content_model': u'richtextpage', + u'publish_date': u'2017-01-01T13:47:15Z', u'slug': u'documentation/page#1', u'tags': u''}, + {u'meta_description': u'Content of page #2', u'parent': None, u'title': u'Page #2', + u'login_required': True, u'children': [], u'id': 2, + u'content': u'<p>Content of page #2</p>', u'content_model': u'richtextpage', + u'publish_date': u'2017-01-01T13:47:15Z', u'slug': u'documentation/page#2', u'tags': u''}, + {u'meta_description': u'Content of page #3', u'parent': None, u'title': u'Page #3', + u'login_required': True, u'children': [], u'id': 3, + u'content': u'<p>Content of page #3</p>', u'content_model': u'richtextpage', + u'publish_date': u'2017-01-01T13:47:15Z', u'slug': u'documentation/page#3', u'tags': u''}, + {u'meta_description': u'Content of page #4', u'parent': None, u'title': u'Page #4', + u'login_required': True, u'children': [], u'id': 4, + u'content': u'<p>Content of page #4</p>', u'content_model': u'richtextpage', + u'publish_date': u'2017-01-01T13:47:15Z', u'slug': u'documentation/page#4', u'tags': u''}, + {u'meta_description': u'Content of page #5', u'parent': None, u'title': u'Page #5', + u'login_required': True, u'children': [], u'id': 5, + u'content': u'<p>Content of page #5</p>', u'content_model': u'richtextpage', + u'publish_date': u'2017-01-01T13:47:15Z', u'slug': u'documentation/page#5', u'tags': u''}, + {u'meta_description': u'Content of page #6', u'parent': None, u'title': u'Page #6', + u'login_required': True, u'children': [], u'id': 6, + u'content': u'<p>Content of page #6</p>', u'content_model': u'richtextpage', + u'publish_date': u'2017-01-01T13:47:15Z', u'slug': u'documentation/page#6', u'tags': u''}] + if title != "": + return [result[0]] + else: + return result + + +def get_page_mock(id): + result = {u'meta_description': u'Content of page #1', u'parent': None, u'title': u'Page #1', + u'login_required': True, u'children': [], u'id': 1, + u'content': u'<p>Content of page #1</p>', u'content_model': u'richtextpage', + u'publish_date': u'2017-01-01T13:47:15Z', u'slug': u'documentation/page#1', u'tags': u''} + + return result + + +@mock.patch('engagementmanager.apps.cms_client.get_pages', get_pages_mock) +@mock.patch('engagementmanager.apps.cms_client.get_page', get_page_mock) +class CMSGetPagesTestCase(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + self.user = self.creator.createUser(Vendor.objects.get(name='Other'), + self.randomGenerator("main-vendor-email"), 'Aa123456', + 'user', self.standard_user, True) + self.token = self.loginAndCreateSessionToken(self.user) + + def testGetPageById(self): + urlStr = self.urlPrefix + 'cms/pages/1/' + self.printTestName("testGetPageById [Start]") + logger.debug("action should success (200), and return page by id") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + content = json.loads(response.content) + self.assertEqual(content["title"], 'Page #1') + self.printTestName("testGetPageById [End]") + + def testGetPages(self): + urlStr = self.urlPrefix + 'cms/pages/' + self.printTestName("testGetPages [Start]") + logger.debug("action should success (200), and return all pages") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.assertEqual(len(json.loads(response.content)), 6) # Suppose to be 6 The amount of Pages in the mock. + self.printTestName("testGetPages [End]") + + def testGetPagesByTitle(self): + urlStr = self.urlPrefix + 'cms/pages/?title=Documentation' + print(urlStr) + self.printTestName("testGetPagesByTitle [Start]") + logger.debug("action should success (200), and return filtered pages") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + content = json.loads(response.content) + self.assertEqual(len(content), 1) # Suppose to be 1 The amount of Documentation titled pages in the mock. + self.assertEqual(content[0]["title"], 'Page #1') + self.printTestName("testGetPagesByTitle [End]") diff --git a/django/engagementmanager/tests/test_cms_posts.py b/django/engagementmanager/tests/test_cms_posts.py new file mode 100755 index 0000000..7132c2c --- /dev/null +++ b/django/engagementmanager/tests/test_cms_posts.py @@ -0,0 +1,137 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +import mock +from rest_framework.status import HTTP_200_OK +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +def get_posts_mock(offset=0, limit=10, category="", date_min=""): + result = [{u'updated': u'2017-01-17T15:57:19.567778Z', u'title': u'announcement..new.1', u'url': u'http://127.0.0.1:8001/blog/announcementnew1/', u'short_url': u'/blog/announcementnew1/', u'tags': u'', u'excerpt': u'announcement..new.announcement..new.', u'allow_comments': True, u'comments': [], u'slug': u'announcementnew1', u'content': u'<p>announcement..new.announcement..new.</p>', u'publish_date': u'2017-01-14T19:53:52Z', u'user': {u'username': u'al942u', u'first_name': u'', u'last_name': u'', u'email': u'al942u@mail.com', u'is_staff': True, u'id': 1}, u'featured_image': u'', u'comments_count': 0, u'id': 1, u'categories': [{u'slug': u'announcement', u'id': 1, u'title': u'Announcement'}]}, + {u'updated': u'2017-01-16T15:57:19.567778Z', u'title': u'announcement..new.2', u'url': u'http://127.0.0.1:8001/blog/announcementnew2/', u'short_url': u'/blog/announcementnew2/', u'tags': u'', u'excerpt': u'announcement..new.announcement..new.', u'allow_comments': True, u'comments': [], u'slug': u'announcementnew2', + u'content': u'<p>announcement..new.announcement..new.</p>', u'publish_date': u'2017-01-15T19:53:52Z', u'user': {u'username': u'al942u', u'first_name': u'', u'last_name': u'', u'email': u'al942u@mail.com', u'is_staff': True, u'id': 1}, u'featured_image': u'', u'comments_count': 0, u'id': 2, u'categories': [{u'slug': u'announcement', u'id': 1, u'title': u'Announcement'}]}, + {u'updated': u'2017-01-15T15:57:19.567778Z', u'title': u'announcement..new.3', u'url': u'http://127.0.0.1:8001/blog/announcementnew3/', u'short_url': u'/blog/announcementnew3/', u'tags': u'', u'excerpt': u'announcement..new.announcement..new.', u'allow_comments': True, u'comments': [], u'slug': u'announcementnew3', + u'content': u'<p>announcement..new.announcement..new.</p>', u'publish_date': u'2017-01-16T19:53:52Z', u'user': {u'username': u'al942u', u'first_name': u'', u'last_name': u'', u'email': u'al942u@mail.com', u'is_staff': True, u'id': 1}, u'featured_image': u'', u'comments_count': 0, u'id': 3, u'categories': [{u'slug': u'announcement', u'id': 1, u'title': u'Announcement'}]}, + {u'updated': u'2017-01-14T15:57:19.567778Z', u'title': u'news..new.1', u'url': u'http://127.0.0.1:8001/blog/news1/', u'short_url': u'/blog/news1/', u'tags': u'', u'excerpt': u'news..new.news..new.', u'allow_comments': True, u'comments': [], u'slug': u'news1', u'content': u'<p>news..new.news..new.</p>', + u'publish_date': u'2017-01-14T19:53:52Z', u'user': {u'username': u'al942u', u'first_name': u'', u'last_name': u'', u'email': u'al942u@mail.com', u'is_staff': True, u'id': 1}, u'featured_image': u'', u'comments_count': 0, u'id': 4, u'categories': [{u'slug': u'news', u'id': 1, u'title': u'News'}]}, + {u'updated': u'2017-01-13T15:57:19.567778Z', u'title': u'news..new.2', u'url': u'http://127.0.0.1:8001/blog/news2/', u'short_url': u'/blog/news2/', u'tags': u'', u'excerpt': u'news..new.news..new.', u'allow_comments': True, u'comments': [], u'slug': u'news2', u'content': u'<p>news..new.news..new.</p>', + u'publish_date': u'2017-01-15T19:53:52Z', u'user': {u'username': u'al942u', u'first_name': u'', u'last_name': u'', u'email': u'al942u@mail.com', u'is_staff': True, u'id': 1}, u'featured_image': u'', u'comments_count': 0, u'id': 5, u'categories': [{u'slug': u'news', u'id': 1, u'title': u'News'}]}, + {u'updated': u'2017-01-12T15:57:19.567778Z', u'title': u'news..new.3', u'url': u'http://127.0.0.1:8001/blog/news3/', u'short_url': u'/blog/news3/', u'tags': u'', u'excerpt': u'news..new.news..new.', u'allow_comments': True, u'comments': [], u'slug': u'news3', u'content': u'<p>news..new.news..new.</p>', u'publish_date': u'2017-01-16T19:53:52Z', u'user': {u'username': u'al942u', u'first_name': u'', u'last_name': u'', u'email': u'al942u@mail.com', u'is_staff': True, u'id': 1}, u'featured_image': u'', u'comments_count': 0, u'id': 6, u'categories': [{u'slug': u'news', u'id': 1, u'title': u'News'}]}] + + if category == "News": + return [result[3], result[4], result[5]] + elif date_min != "": + return [result[0], result[1]] + elif category == "Announcement" and limit == 1: + return [result[0]] + elif limit != 10: + return result[:int(limit)] + else: + return result + + +@mock.patch('engagementmanager.apps.cms_client.get_posts', get_posts_mock) +class CMSGetPostsTestCase(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + self.user = self.creator.createUser(Vendor.objects.get(name='Other'), + self.randomGenerator("main-vendor-email"), 'Aa123456', + 'user', self.standard_user, True) + self.token = self.loginAndCreateSessionToken(self.user) + + def testGetPostsByCategory(self): + urlStr = self.urlPrefix + 'cms/posts/?category=News' + print(urlStr) + self.printTestName("GetPostsByCategoryTest [Start]") + logger.debug("action should success (200), and return filtered posts") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.assertEqual(len(json.loads(response.content)), 3) # Suppose to be 3 The amount of News in the mock. + self.printTestName("GetPostsByCategoryTest [End]") + + def testGetAllPosts(self): + urlStr = self.urlPrefix + 'cms/posts/?limit=10' + self.printTestName("GetAllPostsTest [Start]") + logger.debug("action should success (200), and return all posts") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + # Suppose to be 6 items like the amount of items of the mock. + self.assertEqual(len(json.loads(response.content)), 6) + self.printTestName("GetAllPostsTest [End]") + + def testGetLimitedPosts(self): + limit = 2 + urlStr = self.urlPrefix + 'cms/posts/?limit=' + str(limit) + self.printTestName("GetLimitedPostsTest [Start]") + logger.debug("action should success (200), and filtered with limit all posts") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.assertEqual(len(json.loads(response.content)), limit) # Suppose to be {{limit}} amount of items + self.printTestName("GetLimitedPostsTest [End]") + + def testGetPostsByDateMin(self): + urlStr = self.urlPrefix + 'cms/posts/?fromLastDays=2&limit=10' + self.printTestName("GetPostsByDateMinTest [Start]") + logger.debug("action should success (200), and return filtered posts") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.assertEqual(len(json.loads(response.content)), 2) # Suppose to be 2 items that are after this date + self.printTestName("GetPostsByDateMinTest [End]") + + def testGetOneAnnouncementPost(self): + urlStr = self.urlPrefix + 'cms/posts/?limit=1&category=Announcement' + self.printTestName("GetOneAnnouncementPostTest [Start]") + logger.debug("action should success (200), and return one announcement post") + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.assertEqual(len(json.loads(response.content)), 1) # Suppose to be just 1 item like that. + self.printTestName("GetOneAnnouncementPostTest [End]") diff --git a/django/engagementmanager/tests/test_deployment_target_sites.py b/django/engagementmanager/tests/test_deployment_target_sites.py new file mode 100755 index 0000000..ddfb448 --- /dev/null +++ b/django/engagementmanager/tests/test_deployment_target_sites.py @@ -0,0 +1,163 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class DeploymentTargetSitesTestCase(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + + # Create users: + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + self.el_user = self.creator.createUser(Vendor.objects.get(name=Constants.service_provider_company_name), + self.randomGenerator("main-vendor-email"), 'Aa123456', + 'el user1', self.el, True) + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + self.user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), 'Aa123456', + 'user', self.standard_user, True) + self.admin_user = self.creator.createUser(Vendor.objects.get(name=Constants.service_provider_company_name), + Constants.service_provider_admin_mail, 'Aa123456', + 'admin user', self.admin, True) + + # Create an Engagement with team + self.engagement = self.creator.createEngagement( + 'just-a-fake-uuid', 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.engagement_team.add(self.user, self.el_user) + self.engagement.save() + self.deploymentTarget = self.creator.createDeploymentTarget(self.randomGenerator("randomString"), + self.randomGenerator("randomString")) + self.vendor = Vendor.objects.get(name=Constants.service_provider_company_name) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), self.engagement, + self.deploymentTarget, False, self.vendor) + + # Login with users: + self.user_token = self.loginAndCreateSessionToken(self.user) + self.el_token = self.loginAndCreateSessionToken(self.el_user) + self.admin_token = self.loginAndCreateSessionToken(self.admin_user) + + def testPostDeploymentTargetSitesForStandardUser(self): + urlStr = self.urlPrefix + 'dtsites/' + + myjson = '{"name": "Middletown (ICENJ)", "vf_uuid": "' + \ + str(self.vf.uuid) + '"}' + print(myjson) + + response = self.c.post(urlStr, myjson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.user_token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + def testPostDeploymentTargetSitesForELUser(self): + urlStr = self.urlPrefix + 'dtsites/' + + myjson = '{"name": "Middletown (ICENJ)", "vf_uuid": "' + \ + str(self.vf.uuid) + '"}' + print(myjson) + + response = self.c.post(urlStr, myjson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.el_token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + def testPostDeploymentTargetSitesForAdminUser(self): + urlStr = self.urlPrefix + 'dtsites/' + + myjson = '{"name": "Middletown (ICENJ)", "vf_uuid": "' + \ + str(self.vf.uuid) + '"}' + print(myjson) + + response = self.c.post(urlStr, myjson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + def testGetDeploymentTargetSitesForStandardUser(self): + urlStr = self.urlPrefix + 'vf/' + str(self.vf.uuid) + '/dtsites/' + print(urlStr) + self.printTestName("testGetDeploymentTargetSites [Start]") + logger.debug("action should unauthorized (401)") + response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.user_token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + self.printTestName("testGetDeploymentTargetSites [End]") + + def testGetDeploymentTargetSitesForELUser(self): + urlStr = self.urlPrefix + 'vf/' + str(self.vf.uuid) + '/dtsites/' + print(urlStr) + self.printTestName("testGetDeploymentTargetSitesForELUser [Start]") + logger.debug("action should authorized (200)") + response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.el_token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.printTestName("testGetDeploymentTargetSitesForELUser [End]") + + def testGetDeploymentTargetSitesForAdminUser(self): + urlStr = self.urlPrefix + 'vf/' + str(self.vf.uuid) + '/dtsites/' + print(urlStr) + self.printTestName("testGetDeploymentTargetSitesForAdminUser [Start]") + logger.debug("action should authorized (200)") + response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.printTestName("testGetDeploymentTargetSitesForAdminUser [End]") + + def testDelDeploymentTargetSitesForStandardUser(self): + urlStr = self.urlPrefix + 'vf/' + str(self.vf.uuid) + '/dtsites/' + print(urlStr) + logger.debug("action should unauthorized (401)") + response = self.c.delete( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.user_token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) diff --git a/django/engagementmanager/tests/test_digest_email_notifications.py b/django/engagementmanager/tests/test_digest_email_notifications.py new file mode 100755 index 0000000..b3e75df --- /dev/null +++ b/django/engagementmanager/tests/test_digest_email_notifications.py @@ -0,0 +1,128 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +import mock +from rest_framework.status import HTTP_202_ACCEPTED +from engagementmanager.bus.messages.activity_event_message import ActivityEventMessage +from engagementmanager.bus.messages.daily_scheduled_message import DailyScheduledMessage +from engagementmanager.models import Vendor +from engagementmanager.utils.activities_data import UserJoinedEngagementActivityData +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants, EngagementStage +from engagementmanager.apps import bus_service +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +def digest_mock(self, bus_message): + DigestEmailNotificationsTestCase.is_digested_mock_sent = True + DigestEmailNotificationsTestCase.message = bus_message + + +# engagementmanager/bus/handlers/digest_email_notification_handler.py +@mock.patch('engagementmanager.bus.handlers.digest_email_notification_handler.' + 'DigestEmailNotificationHandler.handle_message', digest_mock) +class DigestEmailNotificationsTestCase(TestBaseEntity): + is_digested_mock_sent = False + message = None + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + + self.createDefaultRoles() + vendor = Vendor.objects.get(name=Constants.service_provider_company_name) + self.el_user = self.creator.createUser(vendor, self.randomGenerator( + "main-vendor-email"), self.randomGenerator("randomNumber"), self.randomGenerator("randomString"), self.el, True) + vendor = Vendor.objects.get(name='Other') + self.user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + self.pruser = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + + self.engagement = self.creator.createEngagement(self.randomGenerator( + "randomString"), self.randomGenerator("randomString"), None) + self.engagement.engagement_team.add(self.user, self.el_user) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.pruser + self.engagement.save() + + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, vendor) + + def testDigestEmailForActivities(self): + """ + Will check if the service bus deliver the message of sending digested mails + No need to check if the mail is sent or if the python scheduling is working + """ + # Create the activities: + self.urlStr = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + "/stage/@stage" + + random_user = self.creator.createUser(Vendor.objects.get(name='Other'), + self.randomGenerator("email"), + self.randomGenerator("randomNumber"), + self.randomGenerator("randomString"), + self.el, True) + + self.engagement.engagement_team.add(random_user) + self.engagement.save() + + users_list = [] + users_list.append(random_user) + activity_data = UserJoinedEngagementActivityData(self.vf, users_list, self.engagement) + bus_service.send_message(ActivityEventMessage(activity_data)) + + token = self.loginAndCreateSessionToken(random_user) + response = self.c.put(self.urlStr.replace("@stage", EngagementStage.Active.name), + json.dumps(dict(), ensure_ascii=False), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + token}) + self.assertEqual(response.status_code, HTTP_202_ACCEPTED) + + urlStr = self.urlPrefix + 'engagement/${uuid}/activities/' + response = self.c.get(urlStr.replace('${uuid}', str(self.engagement.uuid)), + **{'HTTP_AUTHORIZATION': "token " + token}) + content = json.loads(response.content) + self.assertEqual(len(content), 2) + + message = DailyScheduledMessage() + bus_service.send_message(message) + self.assertTrue(self.is_digested_mock_sent) + self.assertEqual(self.message, message) diff --git a/django/engagementmanager/tests/test_eng_progress.py b/django/engagementmanager/tests/test_eng_progress.py new file mode 100755 index 0000000..563c3cd --- /dev/null +++ b/django/engagementmanager/tests/test_eng_progress.py @@ -0,0 +1,99 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from rest_framework.status import HTTP_401_UNAUTHORIZED, HTTP_202_ACCEPTED, HTTP_500_INTERNAL_SERVER_ERROR +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class EngProgressTestCase(TestBaseEntity): + + def childSetup(self): # Variables to use in this class. + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + + # Create a user with role el + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + # Create a user with role standard_user + self.user = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + # Create an Engagement with team + self.engagement = self.creator.createEngagement( + 'just-a-fake-uuid', 'Validation', None) + self.engagement.engagement_team.add(self.user, self.el_user) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.save() + self.token = self.loginAndCreateSessionToken(self.user) + self.ELtoken = self.loginAndCreateSessionToken(self.el_user) + + def testSetProgress(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/progress' + + self.printTestName("START") + + logger.debug( + "action should fail (401), Only Engagement Lead can set Engagement Progress") + response = self.c.put(urlStr, '{ "progress" : 50 }', content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + response = self.c.put(urlStr, '{ "progress" : 50 }', content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_202_ACCEPTED) + + response = self.c.put(urlStr, '{ "progress" : 101 }', content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_500_INTERNAL_SERVER_ERROR) + + self.printTestName("END") diff --git a/django/engagementmanager/tests/test_eng_status.py b/django/engagementmanager/tests/test_eng_status.py new file mode 100755 index 0000000..62f787d --- /dev/null +++ b/django/engagementmanager/tests/test_eng_status.py @@ -0,0 +1,161 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +from uuid import uuid4 +from rest_framework.status import HTTP_401_UNAUTHORIZED, HTTP_200_OK +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class EngagementStatusTestCase(TestBaseEntity): + + def childSetup(self): # Variables to use in this class. + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + + # Create a user with role el + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + self.peer_review_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + # Create a user with role standard_user + self.user = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + self.user_not_team = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'user not team', self.standard_user, True) + # Create an Engagement with team + self.engagement = self.creator.createEngagement(uuid4(), 'Validation', None) + self.engagement.engagement_team.add(self.user, self.el_user) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_review_user + self.engagement.save() + self.token = self.loginAndCreateSessionToken(self.user) + self.ELtoken = self.loginAndCreateSessionToken(self.el_user) + self.user_not_team_token = self.loginAndCreateSessionToken(self.user_not_team) + + urlStr = self.urlPrefix + 'engagements/${uuid}/status' + myjson = json.dumps({"description": "blah blah"}, ensure_ascii=False) + response = self.c.post(urlStr.replace('${uuid}', str(self.engagement.uuid)), myjson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + self.created_status = json.loads(response.content) + + def testPutStatus(self): + urlStr = self.urlPrefix + 'engagements/${uuid}/status' + + self.printTestName("START - testPutStatus") + + logger.debug("action should fail (401), Only Engagement Lead can set Engagement Progress") + + myjson = json.dumps( + {"eng_status_uuid": self.created_status['uuid'], "description": "blah2 blah2"}, ensure_ascii=False) + response = self.c.put(urlStr.replace('${uuid}', str(self.engagement.uuid)), myjson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + myjson = json.dumps( + {"eng_status_uuid": self.created_status['uuid'], "description": "blah2 blah2"}, ensure_ascii=False) + response = self.c.put(urlStr.replace('${uuid}', str(self.engagement.uuid)), myjson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + self.printTestName("END - testPutStatus") + + def testPostStatus(self): + urlStr = self.urlPrefix + 'engagements/${uuid}/status' + + self.printTestName("START - testPostStatus") + + logger.debug("action should fail (401), Only Engagement Lead can set Engagement Progress") + + myjson = json.dumps({"description": "blah blah"}, ensure_ascii=False) + response = self.c.post(urlStr.replace('${uuid}', str(self.engagement.uuid)), myjson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + logger.debug("action should fail (400), For fake engagement uuid") + myjson = json.dumps({"description": "blah blah"}, ensure_ascii=False) + response = self.c.post(urlStr.replace( + '${uuid}', str(uuid4())), myjson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + myjson = json.dumps({"description": "blah blah"}, ensure_ascii=False) + response = self.c.post(urlStr.replace('${uuid}', str(self.engagement.uuid)), myjson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + self.printTestName("END - testPostStatus") + + def testGetStatus(self): + urlStr = self.urlPrefix + 'engagements/${uuid}/status' + + self.printTestName("START - testGetStatus") + + response = self.c.get(urlStr.replace('${uuid}', str(self.engagement.uuid)), + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + logger.debug("action should fail (401), Only team members can get status") + + response = self.c.get(urlStr.replace('${uuid}', str( + self.engagement.uuid)), content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.user_not_team_token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + logger.debug("action should fail (401), Only existing eng_uuid cab ne fetched") + response = self.c.get(urlStr.replace('${uuid}', str( + uuid4())), content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + self.printTestName("END - testGetStatus") diff --git a/django/engagementmanager/tests/test_engagement_admin_operations.py b/django/engagementmanager/tests/test_engagement_admin_operations.py new file mode 100755 index 0000000..a50e49f --- /dev/null +++ b/django/engagementmanager/tests/test_engagement_admin_operations.py @@ -0,0 +1,306 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from engagementmanager.models import Vendor, Engagement, IceUserProfile, \ + Checklist +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import EngagementStage, Constants +from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED +import json +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class EngagementAdminOperationsTestCase(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + self.el_user = self.creator.createUser(Vendor.objects.get(name=Constants.service_provider_company_name), + self.randomGenerator("main-vendor-email"), '55501000199', + 'el user', self.el, True) + self.second_el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), '55501000199', + 'el user2', self.el, True) + self.el_user_to_update = self.creator.createUser( + Vendor.objects.get(name=Constants.service_provider_company_name), + self.randomGenerator("main-vendor-email"), '55501000199', + 'el user 3', self.el, True) + self.user = self.creator.createUser(Vendor.objects.get(name='Other'), + self.randomGenerator("main-vendor-email"), '55501000199', + 'user', self.standard_user, True) + self.admin_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), Constants.service_provider_admin_mail, '55501000199', + 'admin user', self.admin, True) + + self.engagement = self.creator.createEngagement( + 'just-a-fake-uuid', 'Validation', None) + self.engagement.engagement_team.add( + self.user, self.el_user, self.second_el_user) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.second_el_user + self.engagement.save() + # Create a VF + self.deploymentTarget = self.creator.createDeploymentTarget(self.randomGenerator("randomString"), + self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), self.engagement, self.deploymentTarget, + False, Vendor.objects.get(name='Other')) + + self.userToken = self.loginAndCreateSessionToken(self.user) + self.elToken = self.loginAndCreateSessionToken(self.el_user) + self.elUserToUpdateToken = self.loginAndCreateSessionToken( + self.el_user_to_update) + self.adminToken = self.loginAndCreateSessionToken(self.admin_user) + + def testGetAllEls(self): + num_of_els = IceUserProfile.objects.filter(role=self.el).count() + urlStr = self.urlPrefix + 'users/engagementleads/' + print(urlStr) + self.printTestName("testGetAllEls [Start]") + logger.debug( + "action should success (200), and return all els exists in the system") + response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.adminToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + data = json.loads(response.content) + self.assertEqual(len(data), num_of_els) + self.assertEqual(data[0]['full_name'] == 'el user 3' or data[ + 0]['full_name'] == 'el user', True) + self.assertEqual(data[1]['full_name'] == 'el user 3' or data[ + 1]['full_name'] == 'el user2', True) + self.printTestName("testGetAllEls [End]") + + def testArchiveEngagement(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/archive' + print(urlStr) + self.printTestName("testArchiveEngagement [Start]") + logger.debug("action should success (202), and archive the engagement") + response = self.c.put(urlStr, json.dumps({'reason': 'test_reason'}), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.adminToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + eng_object = Engagement.objects.get(uuid=self.engagement.uuid) + # @UndefinedVariable + self.assertEqual( + eng_object.engagement_stage, EngagementStage.Archived.name) + self.assertEqual(eng_object.archive_reason, 'test_reason') + self.printTestName("testArchiveEngagement [End]") + + def testArchiveEngagementValidateDate(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/archive' + print(urlStr) + self.printTestName("testArchiveEngagement [Start]") + logger.debug("action should success (202), and archive the engagement") + response = self.c.put(urlStr, json.dumps({'reason': 'test_reason'}), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.adminToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + eng_object = Engagement.objects.get(uuid=self.engagement.uuid) + self.assertEqual(eng_object.engagement_stage, + EngagementStage.Archived.name) # @UndefinedVariable + self.assertTrue(eng_object.archived_time) + print(eng_object.archived_time) + self.printTestName("testArchiveEngagement [End]") + + def testSetEngagementReviewer(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/reviewer/' + print(urlStr) + self.printTestName("testSetEngagementReviewer [Start]") + logger.debug( + "action should success (200), and set the engagement reviewer") + response = self.c.put(urlStr, json.dumps({'reviewer': str(self.el_user_to_update.uuid)}), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.adminToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + eng_object = Engagement.objects.get(uuid=self.engagement.uuid) + self.assertEqual( + eng_object.reviewer.uuid, str(self.el_user_to_update.uuid)) + self.printTestName("testSetEngagementReviewer [End]") + + def testSetEngagementPeerReviewer(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/peerreviewer/' + print(urlStr) + self.printTestName("testSetEngagementPeerReviewer [Start]") + logger.debug( + "action should success (200), and set the engagement peer reviewer") + response = self.c.put(urlStr, json.dumps({'peerreviewer': str(self.el_user_to_update.uuid)}), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.adminToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + eng_object = Engagement.objects.get(uuid=self.engagement.uuid) + self.assertEqual(eng_object.peer_reviewer.uuid, + str(self.el_user_to_update.uuid)) + self.printTestName("testSetEngagementPeerReviewer [End]") + + def testNegativeGetAllEls(self): + urlStr = self.urlPrefix + 'users/engagementleads/' + print(urlStr) + self.printTestName("testNegativeGetAllEls [Start]") + logger.debug("action should failed due to missing permissions (401)") + response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.elToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + self.printTestName("testNegativeGetAllEls [End]") + + def testNegativeArchiveEngagement(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/archive' + print(urlStr) + self.printTestName("testNegativeArchiveEngagement [Start]") + logger.debug("action should failed due to missing permissions (401)") + response = self.c.put(urlStr, json.dumps({'reason': 'test_reason'}), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.elToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + self.printTestName("testNegativeArchiveEngagement [End]") + + def testNegativeSetEngagementReviewer(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/reviewer/' + print(urlStr) + self.printTestName("testNegativeSetEngagementReviewer [Start]") + logger.debug("action should failed due to missing permissions (401)") + response = self.c.put(urlStr, json.dumps({'reviewer': str(self.el_user_to_update.uuid)}), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.elToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + self.printTestName("testNegativeSetEngagementReviewer [End]") + + def testNegativeSetEngagementPeerReviewer(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/peerreviewer/' + print(urlStr) + self.printTestName("testNegativeSetEngagementPeerReviewer [Start]") + logger.debug("action should failed due to missing permissions (401)") + response = self.c.put(urlStr, json.dumps({'peerreviewer': str(self.el_user_to_update.uuid)}), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.elToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + self.printTestName("testNegativeSetEngagementPeerReviewer [End]") + + def testSwitchEngagementReviewers(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/switch-reviewers/' + print(urlStr) + self.printTestName("testSetEngagementReviewer [Start]") + logger.debug( + "action should success (200), and switch between the engagement reviewer and peer reviewer") + logger.debug("Reviewer: %s, PeerReviewer: %s" % + (self.engagement.reviewer, self.engagement.peer_reviewer)) + response = self.c.put(urlStr, json.dumps({'reviewer': str(self.second_el_user.uuid), 'peerreviewer': str( + self.el_user.uuid)}), content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.adminToken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + eng_object = Engagement.objects.get(uuid=self.engagement.uuid) + self.assertEqual( + eng_object.reviewer.uuid, str(self.second_el_user.uuid)) + self.assertEqual( + eng_object.peer_reviewer.uuid, str(self.el_user.uuid)) + logger.debug("Reviewer: %s, PeerReviewer: %s" % + (eng_object.reviewer, eng_object.peer_reviewer)) + self.printTestName("testSetEngagementReviewer [End]") + + def testChecklistOwnerAfterChangeReviewer(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/reviewer/' + print(urlStr) + self.printTestName("testChecklistOwnerAfterChangeReviewer [Start]") + cl_template = self.creator.createDefaultCheckListTemplate() + checklist = self.creator.createCheckList( + "cl-name", "review", 1, None, self.engagement, cl_template, + self.admin_user, self.el_user) + logger.debug( + "action should success (200), set the engagement reviewer and change checklist owner") + response = self.c.put(urlStr, json.dumps({'reviewer': str(self.el_user_to_update.uuid)}), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.adminToken}) + print('Got response : ' + str(response.status_code)) + checklist = Checklist.objects.get(uuid=checklist.uuid) + self.assertEqual(checklist.owner, self.el_user_to_update) + self.printTestName("testChecklistOwnerAfterChangeReviewer [End]") + + def testChecklistOwnerAfterChangePeerReviewer(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/peerreviewer/' + print(urlStr) + self.printTestName("testChecklistOwnerAfterChangePeerReviewer [Start]") + cl_template = self.creator.createDefaultCheckListTemplate() + checklist = self.creator.createCheckList( + "cl-name", "peer_review", 1, None, self.engagement, cl_template, + self.admin_user, self.second_el_user) + logger.debug( + "action should success (200), set the engagement peer reviewer and change checklist owner") + response = self.c.put(urlStr, json.dumps({'peerreviewer': str(self.el_user_to_update.uuid)}), content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.adminToken}) + print('Got response : ' + str(response.status_code)) + checklist = Checklist.objects.get(uuid=checklist.uuid) + self.assertEqual(checklist.owner, self.el_user_to_update) + self.printTestName("testChecklistOwnerAfterChangePeerReviewer [End]") + + def testChecklistOwnerAfterSwitchReviewers(self): + urlStr = self.urlPrefix + 'engagements/' + \ + str(self.engagement.uuid) + '/switch-reviewers/' + print(urlStr) + self.printTestName("testChecklistOwnerAfterSwitchReviewers [Start]") + cl_template = self.creator.createDefaultCheckListTemplate() + checklist = self.creator.createCheckList( + "cl-name", "review", 1, None, self.engagement, cl_template, + self.admin_user, self.el_user) + logger.debug( + "action should success (200), switch between the engagement reviewer and peer reviewer and change checklist owner") + logger.debug("Reviewer: %s, PeerReviewer: %s" % + (self.engagement.reviewer, self.engagement.peer_reviewer)) + response = self.c.put(urlStr, json.dumps({'reviewer': str(self.second_el_user.uuid), 'peerreviewer': str( + self.el_user.uuid)}), content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.adminToken}) + print('Got response : ' + str(response.status_code)) + checklist = Checklist.objects.get(uuid=checklist.uuid) + self.assertEqual(checklist.owner, self.second_el_user) + self.printTestName("testChecklistOwnerAfterSwitchReviewers [End]") diff --git a/django/engagementmanager/tests/test_engagement_export.py b/django/engagementmanager/tests/test_engagement_export.py new file mode 100755 index 0000000..9cbceb0 --- /dev/null +++ b/django/engagementmanager/tests/test_engagement_export.py @@ -0,0 +1,161 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from rest_framework.status import HTTP_200_OK,\ + HTTP_500_INTERNAL_SERVER_ERROR +from engagementmanager.models import Vendor, ECOMPRelease +from engagementmanager.service.engagement_service import get_expanded_engs_for_export +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import EngagementStage, Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class EngagementExportTestCase(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + self.admin = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), Constants.service_provider_admin_mail, + 'Aa123456', 'admin user', self.el, True) + self.user = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + 'Aa123456', 'user', self.standard_user, True) + + # Create an VF with Engagement (Active) - #1 + self.engagement = self.creator.createEngagement( + 'just-a-fake-uuid', 'Validation', None) + self.engagement.engagement_team.add(self.user, self.admin) + self.engagement.reviewer = self.admin + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.engagement_stage = EngagementStage.Active.name + self.engagement.save() + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), self.engagement, + self.deploymentTarget, False, Vendor.objects.get(name=Constants.service_provider_company_name)) + self.vf.ecomp_release = ECOMPRelease.objects.create( + uuid='uuid1', name='ActiveECOMPRelease') + self.vf.save() + + # Create an VF with Engagement (Intake) - #2 + self.engagement2 = self.creator.createEngagement( + 'just-a-fake-uuid2', 'Validation', None) + self.engagement2.engagement_team.add(self.user, self.admin) + self.engagement2.reviewer = self.admin + self.engagement2.peer_reviewer = self.peer_reviewer + self.engagement2.engagement_stage = EngagementStage.Intake.name + self.engagement2.save() + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf2 = self.creator.createVF(self.randomGenerator("randomString"), self.engagement2, + self.deploymentTarget, False, Vendor.objects.get(name='Other')) + self.vf2.ecomp_release = ECOMPRelease.objects.create( + uuid='uuid2', name='IntakeECOMPRelease') + self.vf2.save() + + self.token = self.loginAndCreateSessionToken(self.user) + self.adminToken = self.loginAndCreateSessionToken(self.admin) + + def testFailExport(self): + urlStr = self.urlPrefix + 'engagement/export/' + self.printTestName("Failed export [start]") + logger.debug( + "action should fail (500), missing arguments - stage and keyword") + response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_500_INTERNAL_SERVER_ERROR) + self.printTestName("Failed export [end]") + + def testSuccessExport(self): + self.printTestName("Success export [start]") + + urlStr = self.urlPrefix + 'engagement/export/?stage=Active&keyword' + logger.debug( + "action should success (200), and return one active engagement") + response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.assertEqual(len(response.content) > 0, True) + vfs, deployment_targets = get_expanded_engs_for_export( + "Active", "", self.user) + self.assertEqual(len(vfs) == 1, True) + self.assertEqual('ecomp_release__name' in vfs[0] and vfs[0] + ['ecomp_release__name'] == "ActiveECOMPRelease", True) + + urlStr = self.urlPrefix + 'engagement/export/?stage=Intake&keyword' + logger.debug( + "action should success (200), and return one intake engagement") + response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.assertEqual(len(response.content) > 0, True) + vfs, deployment_targets = get_expanded_engs_for_export( + "Intake", "", self.user) + self.assertEqual(len(vfs) == 1, True) + self.assertEqual('ecomp_release__name' in vfs[0] and vfs[0] + ['ecomp_release__name'] == "IntakeECOMPRelease", True) + + # Check keyword filtering: + urlStr = self.urlPrefix + 'engagement/export/?stage=All&keyword=Active' + logger.debug( + "action should success (200), and not return Intake engagement") + response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + self.assertEqual(len(response.content) > 0, True) + vfs, deployment_targets = get_expanded_engs_for_export( + "All", "Active", self.user) + self.assertEqual(len(vfs) == 0, True) + + # Check overview sheet data procedure: + logger.debug("Check if the store procedure return data as expected (suppose to return 4 rows of overview " + "sheet) and by that assume that it exists") + self.assertEqual(len(deployment_targets), 4) + + self.printTestName("Success export [end]") diff --git a/django/engagementmanager/tests/test_expanded_eng.py b/django/engagementmanager/tests/test_expanded_eng.py new file mode 100755 index 0000000..ec89d0b --- /dev/null +++ b/django/engagementmanager/tests/test_expanded_eng.py @@ -0,0 +1,325 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +import random +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.models import Vendor, VF +from engagementmanager.utils.constants import Constants, EngagementStage +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class testGetExpandedEngsAndSearch(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + + self.createDefaultRoles() + # Create a user with role el + vendor = Vendor.objects.get(name=Constants.service_provider_company_name) + self.el_user = self.creator.createUser(vendor, self.randomGenerator( + "main-vendor-email"), self.randomGenerator("randomNumber"), self.randomGenerator("randomString"), self.el, True) + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.el_user.uuid)) + print('Full Name: ' + self.el_user.full_name) + print('-----------------------------------------------------') + + # Create a user with role standard_user + self.vendor = Vendor.objects.get(name='Other') + self.user = self.creator.createUser(self.vendor, "Johnny@d2ice.com", self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user.uuid)) + print('Full Name: ' + self.user.full_name) + print('-----------------------------------------------------') + self.service_provider = Vendor.objects.get(name=Constants.service_provider_company_name) + self.peer_reviewer = self.creator.createUser(self.service_provider, self.randomGenerator( + "main-vendor-email"), self.randomGenerator("randomNumber"), self.randomGenerator("randomString"), self.el, True) + logger.debug('-----------------------------------------------------') + logger.debug('Created Peer Reviewer:') + logger.debug('UUID: ' + str(self.peer_reviewer.uuid)) + logger.debug('Full Name: ' + self.peer_reviewer.full_name) + logger.debug('-----------------------------------------------------') + + # Create an Engagement with team + engStageList = [EngagementStage.Intake.name, EngagementStage.Active.name, + EngagementStage.Validated.name, EngagementStage.Completed.name] # @UndefinedVariable + self.random_stage = engStageList[(random.randint(0, 3) * 2 + 1) % 4] + self.names_array = list() + for i in range(0, 14): + self.engagement = self.creator.createEngagement( + self.randomGenerator("randomString"), self.randomGenerator("randomString"), None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.engagement_manual_id = self.randomGenerator( + "randomString") + self.engagement.engagement_team.add( + self.el_user, self.peer_reviewer) + self.engagement.engagement_stage = engStageList[( + random.randint(0, 3) * 2 + 1) % 4] + self.engagement.save() + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement.uuid)) + print('-----------------------------------------------------') + self.names_array.append(self.engagement.engagement_manual_id) + # Create a VF + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + # self.asInfrastructure = self.creator.createApplicationServiceInfrastructure(self.randomGenerator("randomString")) + self.vf = self.creator.createVF( + "vf_" + str(i), self.engagement, self.deploymentTarget, False, vendor) + self.vf.save() + self.names_array.append(self.vf.name) + print('-----------------------------------------------------') + print('Created VF:') + print('UUID: ' + str(self.vf.uuid)) + print('-----------------------------------------------------') + if (i % 2 == 0): + self.engagement.engagement_team.add(self.user) + vfc = self.creator.createVFC( + "vfc_" + str(i), self.randomGenerator("randomNumber"), self.vendor, self.vf, self.el_user) + self.names_array.append(vfc.name) + self.engagement.save() + + self.random_keyword = self.names_array[( + random.randint(0, len(self.names_array) - 1))] + self.token = self.loginAndCreateSessionToken(self.user) + + def loggerTestFailedOrSucceded(self, bool): + if bool: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test Succeeded") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + else: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug("Test failed") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + def test_get_expanded_even_numbered_engs_by_standard_user_no_keyword(self): + urlStr = self.urlPrefix + 'engagement/expanded/' + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug( + " Test started: test_get_expanded_even_numbered_engs_by_standard_user_no_keyword") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + postData = {'stage': 'All', 'keyword': '', 'offset': 0, 'limit': 15} + datajson = json.dumps(postData, ensure_ascii=False) + response = self.c.post( + urlStr, datajson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + status = response.status_code + logger.debug("Got response : " + str(status)) + if (status != 200): + logger.error("Got response : " + str(status) + + " , wrong http response returned ") + self.assertEqual(response.status_code, 200) + logger.debug("Got content : " + str(response.content)) + data = json.loads(response.content) + if (data['num_of_objects'] != 7): + self.loggerTestFailedOrSucceded(False) + logger.error( + "num of objects returned from server != 7 (all even numbers)") + self.assertEqual(data['num_of_objects'], 7) + for x in data['array']: + shortened_name = x['vf__name'].split('_') + if (int(shortened_name[1]) % 2 != 0): + self.loggerTestFailedOrSucceded(False) + logger.error("Odd VF name returned") + self.assertEqual(int(shortened_name[1]) % 2, 0) + self.loggerTestFailedOrSucceded(True) + + def test_get_expanded_engs_with_filter_no_keyword(self): + urlStr = self.urlPrefix + 'engagement/expanded/' + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug( + " Test started: test_get_expanded_engs_with_filter_no_keyword") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + postData = {'stage': self.random_stage, + 'keyword': '', 'offset': 0, 'limit': 15} + datajson = json.dumps(postData, ensure_ascii=False) + response = self.c.post( + urlStr, datajson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + status = response.status_code + logger.debug("Got response : " + str(status)) + if (status != 200): + logger.error("Got response : " + str(status) + + " , wrong http response returned ") + self.assertEqual(response.status_code, 200) + logger.debug("Got content : " + str(response.content)) + data = json.loads(response.content) + print("random stage chosen: " + self.random_stage) + for x in data['array']: + print("engagement's stage: " + x['engagement__engagement_stage']) + if (x['engagement__engagement_stage'] != self.random_stage): + self.loggerTestFailedOrSucceded(False) + logger.error( + "VF With different stage than defined filter was retrieved") + self.assertEqual( + x['engagement__engagement_stage'], self.random_stage) + self.loggerTestFailedOrSucceded(True) + + def test_get_expanded_engs_with_keyword_nofilter(self): + urlStr = self.urlPrefix + 'engagement/expanded/' + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug( + " Test started: test_get_expanded_engs_with_filter_no_keyword") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + postData = {'stage': 'All', 'keyword': self.random_keyword, + 'offset': 0, 'limit': 15} + datajson = json.dumps(postData, ensure_ascii=False) + response = self.c.post( + urlStr, datajson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + status = response.status_code + logger.debug("Got response : " + str(status)) + self.assertEqual(response.status_code, 200) + logger.debug("Got content : " + str(response.content)) + data = json.loads(response.content) + print("random keyword chosen: " + self.random_keyword) + for x in data['array']: + bool = False + if (x['engagement__engagement_manual_id'] != self.random_keyword and x['vf__name'] != self.random_keyword): + vf = VF.objects.get(engagement__uuid=x['engagement__uuid']) + urlStr = self.urlPrefix + 'vf/' + str(vf.uuid) + '/vfcs/' + vfc_of_x_response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + vfc_list = json.loads(vfc_of_x_response.content) + for vfc in vfc_list: + if vfc['name'] == self.random_keyword: + bool = True + break + else: + continue + else: + bool = True + if (not bool): + self.loggerTestFailedOrSucceded(False) + logger.error( + "VF With different stage than filter was retrieved") + self.assertEqual( + x['engagement__engagement_stage'], self.random_stage) + self.loggerTestFailedOrSucceded(True) + + def test_get_expanded_engs_with_keyword_and_filter(self): + urlStr = self.urlPrefix + 'engagement/expanded/' + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug( + " Test started: test_get_expanded_engs_with_filter_no_keyword") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + postData = {'stage': self.random_stage, + 'keyword': self.random_keyword, 'offset': 0, 'limit': 15} + datajson = json.dumps(postData, ensure_ascii=False) + response = self.c.post( + urlStr, datajson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + status = response.status_code + logger.debug("Got response : " + str(status)) + if (status != 200): + logger.error("Got response : " + str(status) + + " , wrong http response returned ") + self.assertEqual(response.status_code, 200) + logger.debug("Got content : " + str(response.content)) + data = json.loads(response.content) + print("random keyword chosen: " + self.random_keyword) + for x in data['array']: + bool = False + if (x['engagement__engagement_stage'] != self.random_stage): + self.loggerTestFailedOrSucceded(False) + logger.error( + "VF With different stage than defined filter was retrieved") + self.assertEqual( + x['engagement__engagement_stage'], self.random_stage) + if (x['engagement__engagement_manual_id'] != self.random_keyword and x['vf__name'] != self.random_keyword): + vf = VF.objects.get(engagement__uuid=x['engagement__uuid']) + urlStr = self.urlPrefix + 'vf' + str(vf.uuid) + '/vfcs/' + vfc_of_x_response = self.c.get( + urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + vfc_list = json.loads(vfc_of_x_response.content) + for vfc in vfc_list: + if (vfc['name'] == self.random_keyword): + bool = True + break + else: + continue + else: + bool = True + if (not bool): + self.loggerTestFailedOrSucceded(False) + logger.error( + "VF With different stage than filter was retrieved") + self.assertEqual( + x['engagement__engagement_stage'], self.random_stage) + self.loggerTestFailedOrSucceded(True) + + def test_get_expanded_engs_with_keyword_email(self): + urlStr = self.urlPrefix + 'engagement/expanded/' + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug( + " Test started: test_get_expanded_engs_with_filter_no_keyword") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + email_test = "Johnny@d2ice.com" + postData = {'stage': 'All', 'keyword': email_test, + 'offset': 0, 'limit': 15} + datajson = json.dumps(postData, ensure_ascii=False) + response = self.c.post( + urlStr, datajson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + status = response.status_code + logger.debug("Got response : " + str(status)) + self.assertEqual(response.status_code, 200) + logger.debug("Got content : " + str(response.content)) + data = json.loads(response.content) + print("random keyword chosen: " + email_test) + if data['num_of_objects'] >= 1: + for vf_record in data['array']: + vf = VF.objects.get( + engagement__uuid=vf_record['engagement__uuid']) + if (not vf.engagement.engagement_team.filter(uuid=self.user.uuid).exists()): + self.loggerTestFailedOrSucceded(False) + logger.error( + "VF With different stage than filter was retrieved") + self.loggerTestFailedOrSucceded(True) + else: + self.loggerTestFailedOrSucceded(False) diff --git a/django/engagementmanager/tests/test_import_engagement_xls.py b/django/engagementmanager/tests/test_import_engagement_xls.py new file mode 100755 index 0000000..8513998 --- /dev/null +++ b/django/engagementmanager/tests/test_import_engagement_xls.py @@ -0,0 +1,72 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from io import StringIO +import os + +from django.core import management + +from engagementmanager.management.commands.initial_populate_db import execute_bootstrap_actions +from engagementmanager.tests.test_base_entity import TestBaseEntity + + +class TestImportEngagementXLSTestCase(TestBaseEntity): + + def childSetup(self): + execute_bootstrap_actions() + pass + + def initBody(self): + pass + + ### TESTS ### + def testImportEngagementXLSTestCase(self): + self.initBody() + xls_path = 'engagementmanager/tests/D2_ICE_Engagements_Import_Example.xlsx' + + if not os.path.exists(xls_path): + print("File doesnt exists") + return True + + response = StringIO() + + management.call_command('import_xls', xls_path, '2', stdout=response) + response = response.getvalue() + if response.find('True') != -1: + response = True + self.assertEqual(response, True) diff --git a/django/engagementmanager/tests/test_invite_members.py b/django/engagementmanager/tests/test_invite_members.py new file mode 100755 index 0000000..460eded --- /dev/null +++ b/django/engagementmanager/tests/test_invite_members.py @@ -0,0 +1,164 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from engagementmanager.tests.test_base_entity import TestBaseEntity +import json +from rest_framework.status import HTTP_200_OK, HTTP_400_BAD_REQUEST +from engagementmanager.models import Vendor, IceUserProfile +from engagementmanager.utils.constants import Constants + + +class TestInviteMembersTestCase(TestBaseEntity): + + def createEng(self, name): + # Create an Engagement with team + engagement = self.creator.createEngagement(name, 'Validation', None) + engagement.reviewer = self.reviewer + engagement.peer_reviewer = self.reviewer + engagement.save() + engagement.engagement_team.add(self.inviter, self.el_user) + self.engList.append(engagement) + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(engagement.uuid)) + print('-----------------------------------------------------') + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Other']) + + self.createDefaultRoles() + self.engList = [] + self.vfList = [] + + # Create a user with role el + self.el_user_email = self.randomGenerator("main-vendor-email") + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), + self.el_user_email, '55501000199', 'el user', self.el, True) + self.inviter = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'inviter user', self.standard_user, True) + self.reviewer = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'reviewer user', self.standard_user, True) + + self.createEng('123456789') + self.createEng('abcdefghi') + + # Create a VF + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + + for eng in self.engList: + vf = self.creator.createVF(self.randomGenerator("randomString"), eng, + self.deploymentTarget, False, Vendor.objects.get(name='Other')) + self.vfList.append(vf) + + self.urlStr = self.urlPrefix + "invite-team-members/" + self.data = dict() + self.data2 = dict() + self.token = self.loginAndCreateSessionToken(self.inviter) + + def initBody(self): + self.invitedData = [] + + self.data['email'] = self.randomGenerator("main-vendor-email") + self.data['eng_uuid'] = str(self.engList[0].uuid) + self.invitedData.append(self.data) + + self.data2['email'] = self.el_user_email + self.data2['eng_uuid'] = str(self.engList[0].uuid) + self.invitedData.append(self.data2) + + def inviteContact(self, expectedStatus=HTTP_200_OK): + self.invitedDataStr = json.dumps(self.invitedData, ensure_ascii=False) + response = self.c.post(self.urlStr, self.invitedDataStr, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code), str(response.content)) + self.assertEqual(response.status_code, expectedStatus) + return response + + def createContactUser(self): + self.contact = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.data['email'], self.data['phone_number'], self.data['full_name'], self.standard_user, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.contact.uuid)) + print('Full Name: ' + self.contact.full_name) + print('-----------------------------------------------------') + + ### TESTS ### + def testAddContactForNonExistingContact(self): + self.initBody() + self.inviteContact() + + def testThrotellingInviteSameEmailSameEng(self): + self.invitedData = [] + + self.data['email'] = self.el_user_email + self.data['eng_uuid'] = str(self.engList[0].uuid) + self.invitedData.append(self.data) + + self.data2['email'] = self.el_user_email + self.data2['eng_uuid'] = str(self.engList[0].uuid) + self.invitedData.append(self.data2) + + self.inviteContact(expectedStatus=HTTP_400_BAD_REQUEST) + + def testThrotellingInviteMoreThan5EmailsToSameEng(self): + self.invitedData = [] + + for i in range(0, 30): + data = dict() + data['email'] = self.randomGenerator("main-vendor-email") + data['eng_uuid'] = str(self.engList[0].uuid) + self.invitedData.append(data) + + self.inviteContact(expectedStatus=HTTP_400_BAD_REQUEST) + + def testMultipleInvitationsForSameUserDiffEng(self): + for i in range(0, 2): + data = dict() + data['email'] = self.el_user_email + data['eng_uuid'] = str(self.engList[i].uuid) + self.invitedData = [] + self.invitedData.append(data) + self.inviteContact(expectedStatus=HTTP_200_OK) + invitedUser = IceUserProfile.objects.get(email=data['email']) + self.assertTrue(invitedUser in self.engList[i].engagement_team.all()) diff --git a/django/engagementmanager/tests/test_next_steps.py b/django/engagementmanager/tests/test_next_steps.py new file mode 100755 index 0000000..95a8227 --- /dev/null +++ b/django/engagementmanager/tests/test_next_steps.py @@ -0,0 +1,80 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import unittest + +from mock import MagicMock + +from engagementmanager.service.nextstep_service import NextStepSvc +from engagementmanager.utils.constants import NextStepState + + +class NextStepTest(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def testStandardUserActions(self): + print('---------------------------------- Testing standard_user Actions ------------------------------------') + user = MagicMock() + user.role.name = 'standard_user' + + NextStepSvc().validate_state_transition( + user, NextStepState.Incomplete, NextStepState.Completed) + + NextStepSvc().validate_state_transition( + user, NextStepState.Completed, NextStepState.Incomplete) + + def testELUserActions(self): + print('---------------------------------- Testing EL Actions ------------------------------------') + user = MagicMock() + user.role.name = 'el' + + print(user.role) + + update_type = NextStepSvc().validate_state_transition( + user, NextStepState.Completed, NextStepState.Incomplete) + assert update_type == 'Denied' + + +if __name__ == "__main__": + # import sys;sys.argv = ['', 'Test.testSetState'] + unittest.main() diff --git a/django/engagementmanager/tests/test_next_steps_api.py b/django/engagementmanager/tests/test_next_steps_api.py new file mode 100755 index 0000000..b0be468 --- /dev/null +++ b/django/engagementmanager/tests/test_next_steps_api.py @@ -0,0 +1,367 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +from uuid import uuid4 + +from rest_framework import status +from rest_framework.status import HTTP_403_FORBIDDEN, HTTP_200_OK, \ + HTTP_204_NO_CONTENT + +from engagementmanager.models import Vendor, NextStep +from engagementmanager.service.nextstep_service import NextStepSvc +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants + + +class TestNextStepsAPI(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + + self.createDefaultRoles() + + # Create a user with role el + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.el_user.uuid)) + print('Full Name: ' + self.el_user.full_name) + print('-----------------------------------------------------') + + # Create a user with role standard_user + self.user = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user.uuid)) + print('Full Name: ' + self.user.full_name) + print('-----------------------------------------------------') + + # Create a user with role standard_user + self.pruser = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.pruser.uuid)) + print('Full Name: ' + self.pruser.full_name) + print('-----------------------------------------------------') + + # Create a user with role standard_user with SSH key + self.user_with_ssh = self.creator.createUser(Vendor.objects.get( + name='Other'), self.randomGenerator("main-vendor-email"), + '55501000199', 'ssh user', self.standard_user, True, 'just-a-fake-ssh-key') + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user_with_ssh.uuid)) + print('Full Name: ' + self.user_with_ssh.full_name) + print('-----------------------------------------------------') + + # Create an Engagement with team + self.engagement = self.creator.createEngagement('just-a-fake-uuid', 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.el_user + self.engagement.engagement_team.add(self.user, self.el_user) + self.engagement.save() + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement.uuid)) + print('-----------------------------------------------------') + + # Create another Engagement with team with SSH Key + self.engagement_ssh = self.creator.createEngagement('just-another-fake-uuid', 'Validation', None) + self.engagement_ssh.engagement_team.add(self.user_with_ssh, self.el_user) + self.engagement_ssh.peer_reviewer = self.pruser + self.engagement_ssh.save() + + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement_ssh.uuid)) + print('-----------------------------------------------------') + + # Create another Engagement with Main Contact + self.engagement_with_contact = self.creator.createEngagement('yet-just-another-fake-uuid', 'Validation', None) + self.engagement_with_contact.engagement_team.add(self.user, self.el_user) + self.engagement_with_contact.contact_user = self.user + self.engagement_with_contact.peer_reviewer = self.pruser + self.engagement_with_contact.save() + + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement_with_contact.uuid)) + print('-----------------------------------------------------') + + # Create another Engagement with Main Contact + self.engagement_4_createNS = self.creator.createEngagement('yet-just-another-fake-uuid2', 'Validation', None) + self.engagement_4_createNS.reviewer = self.el_user + self.engagement_4_createNS.peer_reviewer = self.el_user + self.engagement_4_createNS.engagement_team.add(self.user_with_ssh, self.el_user) + self.engagement_4_createNS.contact_user = self.user_with_ssh + self.engagement_4_createNS.save() + + # Create engagement for order + self.engagement_4_order = self.creator.createEngagement('yet-just-another-fake-uuid3', 'Validation', None) + self.engagement_4_order.reviewer = self.el_user + self.engagement_4_order.peer_reviewer = self.el_user + self.engagement_4_order.engagement_team.add(self.user_with_ssh, self.el_user) + self.engagement_4_order.contact_user = self.user_with_ssh + self.engagement_4_order.save() + + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vendor = Vendor.objects.get(name='Other') + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement_4_createNS, self.deploymentTarget, False, self.vendor) + + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement_4_createNS.uuid)) + print('-----------------------------------------------------') + + self.token = self.loginAndCreateSessionToken(self.user) + self.sshtoken = self.loginAndCreateSessionToken(self.user_with_ssh) + self.ELtoken = self.loginAndCreateSessionToken(self.el_user) + + def testCreateDefaultNextStepsAndSetNextStepsState(self): + urlStr = self.urlPrefix + 'engagements/${uuid}/nextsteps/Intake' + NextStepSvc().create_default_next_steps(self.user, self.engagement, self.el_user) + NextStepSvc().create_default_next_steps_for_user(self.user, self.el_user) + num_of_steps = 3 + + response = self.c.get(urlStr.replace('${uuid}', str(self.engagement.uuid)), + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + print("DATA After JSON Parse:" + str(response.content)) + +# content = response.content + + print('Test that GET nextsteps was successful') + self.assertEqual(response.status_code, HTTP_200_OK) + + print('Test that GET nextsteps in Intake state returns 3 items ') + self.assertEqual(len(response.json()), num_of_steps) + + print('Test First item state is Incomplete: ' + response.json()[0]['state']) + self.assertEqual(response.json()[0]['state'], 'Incomplete') + + step_uuid = response.json()[0]['uuid'] + urlStr = self.urlPrefix + 'nextsteps/' + step_uuid + '/state' + + print('attempt change state of next step Incomplete->COMPLETED by standard_user. This should succeed...') + response = self.c.put(urlStr, '{ "state" : "Completed" }', + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + print('Negative: attempt change state of next step COMPLETED->Incomplete by EL. This should success') + response = self.c.put(urlStr, '{ "state" : "Incomplete" }', + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + def testCreateDefaultNextStepsForUserWithSSHKey(self): + urlStr = self.urlPrefix + 'engagements/${uuid}/nextsteps/Intake' + NextStepSvc().create_default_next_steps(self.user_with_ssh, self.engagement_ssh, self.el_user) + # Would not create any next step, due to the reason that the user already has an SSH + NextStepSvc().create_default_next_steps_for_user(self.user_with_ssh, self.el_user) + num_of_steps = 2 + + response = self.c.get(urlStr.replace('${uuid}', str(self.engagement_ssh.uuid)), + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + print("DATA After JSON Parse:" + str(response.content)) + + print('Test that GET nextsteps was successful') + self.assertEqual(response.status_code, HTTP_200_OK) + + print('Test that GET nextsteps in Intake state returns 3 items ') + self.assertEqual(len(response.json()), num_of_steps) + + def testOrderNextSteps(self): + + nsDict = {} + nsDict["position"] = 4 + nsDict["creator_uuid"] = str(self.el_user.uuid) + nsDict[ + "description"] = "Please submit the first version of the VF package. If you have any problems or questions please contact your Engagement Lead (EL)" + nsDict["state"] = "Incomplete" + nsDict["engagement_stage"] = "Active" + + myjson = json.dumps([nsDict], ensure_ascii=False) + + create_nextsteps_url = self.urlPrefix + "engagements/" + str(self.engagement_4_order.uuid) + "/nextsteps/" + response = self.c.post(create_nextsteps_url, myjson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + response = self.c.post(create_nextsteps_url, myjson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + response = self.c.post(create_nextsteps_url, myjson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + + get_nextsteps_url = self.urlPrefix + 'engagements/${uuid}/nextsteps/Intake' + response = self.c.get(get_nextsteps_url.replace('${uuid}', str( + self.engagement_4_order.uuid)), **{'HTTP_AUTHORIZATION': "token " + self.sshtoken}) + + decoded_ns = json.loads(response.content) + myjson = json.dumps(decoded_ns, ensure_ascii=False) + order_nextsteps_url = self.urlPrefix + 'engagements/' + \ + str(self.engagement_4_order.uuid) + '/nextsteps/order_next_steps/' + response = self.c.put(order_nextsteps_url, myjson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + + response = self.c.get(get_nextsteps_url.replace('${uuid}', str( + self.engagement_4_order.uuid)), **{'HTTP_AUTHORIZATION': "token " + self.sshtoken}) + + decoded_ns = json.loads(response.content) + counter = 0 + for next_step in decoded_ns: + self.assertEqual(next_step['position'], counter) + counter += 1 + + def testCreateDefaultNextStepsWhenENGContactExist(self): + urlStr = self.urlPrefix + 'engagements/${uuid}/nextsteps/Intake' + NextStepSvc().create_default_next_steps(self.user, self.engagement_with_contact, self.el_user) + NextStepSvc().create_default_next_steps_for_user(self.user, self.el_user) + num_of_steps = 2 + + response = self.c.get(urlStr.replace('${uuid}', str( + self.engagement_with_contact.uuid)), **{'HTTP_AUTHORIZATION': "token " + self.token}) + + print('Got response : ' + str(response.status_code)) + print("DATA After JSON Parse:" + str(response.content)) + + print('Test that GET nextsteps was successful') + self.assertEqual(response.status_code, HTTP_200_OK) + + print('Test that GET nextsteps in Intake state returns 3 items ') + self.assertEqual(len(response.json()), num_of_steps) + + def testAddNextStepToEng(self): + urlStr = self.urlPrefix + "engagements/" + str(self.engagement_4_createNS.uuid) + "/nextsteps/" + + NextStepSvc().create_default_next_steps(self.user_with_ssh, self.engagement_4_createNS, self.el_user) + # Would not create any next step, due to the reason that the user already has an SSH + NextStepSvc().create_default_next_steps_for_user(self.user_with_ssh, self.el_user) + + nsDict = {} + nsDict["position"] = "4" + nsDict["creator_uuid"] = str(self.el_user.uuid) + nsDict[ + "description"] = "Please submit the first version of the VF package. If you have any problems or questions please contact your Engagement Lead (EL)" + nsDict["state"] = "Incomplete" + nsDict["engagement_stage"] = "Active" + + myjson = json.dumps([nsDict], ensure_ascii=False) + print(myjson) + + response = self.c.post(urlStr, myjson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + print("DATA After JSON Parse:" + str(response.content)) + + self.assertEqual(response.status_code, HTTP_200_OK) + + def testDelNextStepToEng(self): + nsObj = NextStep.objects.create(uuid=uuid4(), creator=self.el_user, position=2, description="testDelNextStepToEng", + state='Incomplete', engagement_stage="Intake", engagement=self.engagement) + urlStr = self.urlPrefix + "nextsteps/" + str(nsObj.uuid) + + response = self.c.delete(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + print("DATA After JSON Parse:" + str(response.content)) + + self.assertEqual(response.status_code, HTTP_204_NO_CONTENT) + + def testNegativeDelNextStepToEngNoHeader(self): + nsObj = NextStep.objects.create(uuid=uuid4(), creator=self.el_user, position=2, description="testDelNextStepToEng", + state='Incomplete', engagement_stage="Intake", engagement=self.engagement) + urlStr = self.urlPrefix + "nextsteps/" + str(nsObj.uuid) + + response = self.c.delete(urlStr) + print('Negative: Expecting response 403, got: ' + str(response.status_code)) + print("DATA After JSON Parse:" + str(response.content)) + + self.assertEqual(response.status_code, HTTP_403_FORBIDDEN) + + def testEditNextSteps(self): + print( + '---------------------------------- testEditNextSteps, Expecting 200 ------------------------------------') + print('---------------------------------- Creating a next step ------------------------------------') + step = NextStep.objects.create(position="4", creator=self.el_user, engagement=self.engagement, + description="Please submit the first version of the VF package. If you have any problems or questions please contact your Engagement Lead (EL)", + state="TODO", engagement_stage="Active", + uuid=uuid4()) + print('---------------------------------- Editing the next step ------------------------------------') + urlStr = self.urlPrefix + "nextsteps/" + str(step.uuid) + body = {} + files = [] + for i in range(4): + files.append(self.randomGenerator('randomString')) + body['files'] = files + body['duedate'] = "2012-01-03" + body['description'] = self.randomGenerator('randomString') + body['assigneesUuids'] = [str(self.user.uuid), str(self.el_user.uuid)] + myjson = json.dumps(body, ensure_ascii=False) + print(myjson) + response = self.c.put(urlStr, myjson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print(urlStr) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def testUserNextSteps(self): + NextStepSvc().create_default_next_steps(self.user_with_ssh, self.engagement_4_createNS, self.el_user) + # Needs to return 0 elements for regular user which is not the assignee: + urlStr = self.urlPrefix + 'engagements/user/nextsteps/' + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.sshtoken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + data = json.loads(response.content) + self.assertEqual(data["data"], []) + self.assertEqual(data["count"], 0) + + # Needs to return 3 elements to el user which is the assignee: + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + data = json.loads(response.content) + self.assertEqual(data["count"], 3) diff --git a/django/engagementmanager/tests/test_notify_inactive_engagements.py b/django/engagementmanager/tests/test_notify_inactive_engagements.py new file mode 100755 index 0000000..afeb835 --- /dev/null +++ b/django/engagementmanager/tests/test_notify_inactive_engagements.py @@ -0,0 +1,232 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import random +from uuid import uuid4 + +from django.utils.timezone import timedelta +import mock +from engagementmanager.tests.test_base_entity import TestBaseEntity + +from engagementmanager.apps import bus_service +from engagementmanager.bus.messages.daily_scheduled_message import DailyScheduledMessage +from engagementmanager.models import Vendor, Engagement, Activity, Notification +from engagementmanager.utils.constants import EngagementStage, ActivityType, Constants +from django.utils import timezone + + +def mocked_max_empty_date_negative(self, creation_time): + max_empty_time = creation_time + timedelta(days=-1) + return max_empty_time + + +def with_files(vf): + list_of_files = list() + for index in range(3): + current_file_dict = dict() + current_file_dict['id'] = ''.join(random.choice( + '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ') for i in range(16)) + current_file_dict['name'] = 'file%d' % index + current_file_dict['type'] = 'blob' + current_file_dict['mode'] = '100644' + list_of_files.append(current_file_dict) + return list_of_files + + +def get_days_delta_plus_1(self, start_time, end_time): + end_time = end_time + timedelta(days=1) + return (end_time - start_time).days + + +def get_days_delta_plus_7(self, start_time, end_time): + end_time = end_time + timedelta(days=7) + return (end_time - start_time).days + + +def get_days_delta_plus_23(self, start_time, end_time): + end_time = end_time + timedelta(days=23) + return (end_time - start_time).days + + +def get_days_delta_plus_29(self, start_time, end_time): + end_time = end_time + timedelta(days=29) + return (end_time - start_time).days + + +def no_files(vf): + return [] + + +class TestNotifyInactiveEngagements(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Amdocs', 'Other']) + self.createDefaultRoles() + + # Create a user with role el + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + # For negative tests + self.user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + self.urlStr = self.urlPrefix + "engagement/@eng_uuid/checklist/new/" + self.data = dict() + self.template = self.creator.createDefaultCheckListTemplate() +# self.engagement = Engagement.objects.create(uuid='just-a-fake-uuid',engagement_stage='Validation') + self.engagement = self.creator.createEngagement(uuid4(), 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.el_user + self.engagement.engagement_team.add(self.user) + self.engagement.engagement_manual_id = str(timezone.now().year) + "-1" + self.engagement.engagement_team.add(self.el_user) + self.engagement.engagement_stage = EngagementStage.Active.name + self.engagement.save() + self.vendor = Vendor.objects.get(name='Other') + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, self.vendor) + + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', with_files) + def testOnlyNotActiveEngagementsAreNotEffected(self): + self.engagement.engagement_stage = EngagementStage.Archived.name + self.engagement.save() + + bus_service.send_message(DailyScheduledMessage()) + updated_engagement = Engagement.objects.get(uuid=self.engagement.uuid) + + self.assertEqual(updated_engagement.is_with_files, False) + + @mock.patch('engagementmanager.bus.handlers.daily_notify_inactive_engagements_handler.DailyNotifyInactiveEngagementsHandler.get_max_empty_date', mocked_max_empty_date_negative) + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', no_files) + def testEngagementsWithouthFilesOlderThan30DaysAreArchive(self): + bus_service.send_message(DailyScheduledMessage()) + updated_engagement = Engagement.objects.get(uuid=self.engagement.uuid) + + self.assertEqual(updated_engagement.is_with_files, False) + self.assertEqual(updated_engagement.engagement_stage, EngagementStage.Archived.name) + + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', no_files) + def testEngagementsWithouthFilesYoungerThan30DaysNotArchive(self): + bus_service.send_message(DailyScheduledMessage()) + updated_engagement = Engagement.objects.get(uuid=self.engagement.uuid) + + self.assertEqual(updated_engagement.is_with_files, False) + self.assertEqual(updated_engagement.engagement_stage, EngagementStage.Active.name) + + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', no_files) + def testEngagementsWithouthFilesIsNotMarked(self): + bus_service.send_message(DailyScheduledMessage()) + updated_engagement = Engagement.objects.get(uuid=self.engagement.uuid) + + self.assertEqual(updated_engagement.is_with_files, False) + + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', with_files) + def testEngagementsWithoFilesNotSentEmail(self): + bus_service.send_message(DailyScheduledMessage()) + new_activity = Activity.objects.filter( + engagement=self.engagement, + activity_type=ActivityType.notice_empty_engagement.name + ) + self.assertEqual(0, len(new_activity)) + + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', no_files) + @mock.patch('engagementmanager.bus.handlers.daily_notify_inactive_engagements_handler.DailyNotifyInactiveEngagementsHandler.get_days_delta', get_days_delta_plus_1) + def testEngagementsWithoutFilesOneDayEmailNotSent(self): + bus_service.send_message(DailyScheduledMessage()) + new_activity = Activity.objects.filter( + engagement=self.engagement, + activity_type=ActivityType.notice_empty_engagement.name + ) + self.assertEqual(0, len(new_activity)) + + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', no_files) + @mock.patch('engagementmanager.bus.handlers.daily_notify_inactive_engagements_handler.DailyNotifyInactiveEngagementsHandler.get_days_delta', get_days_delta_plus_7) + def testEngagementsWithoutFiles7DayEmailSent(self): + bus_service.send_message(DailyScheduledMessage()) + new_activity = Activity.objects.get( + engagement=self.engagement, + activity_type=ActivityType.notice_empty_engagement.name + ) + + new_notifications = Notification.objects.filter(activity=new_activity) + self.assertNotEqual(0, len(new_notifications)) + + for notifcation in new_notifications: + self.assertEqual(notifcation.is_sent, True) + + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', no_files) + @mock.patch('engagementmanager.bus.handlers.daily_notify_inactive_engagements_handler.DailyNotifyInactiveEngagementsHandler.get_days_delta', get_days_delta_plus_23) + def testEngagementsWithoutFiles23DayEmailSent(self): + bus_service.send_message(DailyScheduledMessage()) + new_activity = Activity.objects.get( + engagement=self.engagement, + activity_type=ActivityType.notice_empty_engagement.name + ) + + new_notifications = Notification.objects.filter(activity=new_activity) + self.assertNotEqual(0, len(new_notifications)) + + for notifcation in new_notifications: + self.assertEqual(notifcation.is_sent, True) + + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', no_files) + @mock.patch('engagementmanager.bus.handlers.daily_notify_inactive_engagements_handler.DailyNotifyInactiveEngagementsHandler.get_days_delta', get_days_delta_plus_29) + def testEngagementsWithoutFiles29DayEmailSent(self): + bus_service.send_message(DailyScheduledMessage()) + new_activity = Activity.objects.get( + engagement=self.engagement, + activity_type=ActivityType.notice_empty_engagement.name + ) + + new_notifications = Notification.objects.filter(activity=new_activity) + self.assertNotEqual(0, len(new_notifications)) + + for notifcation in new_notifications: + self.assertEqual(notifcation.is_sent, True) + + @mock.patch('validationmanager.em_integration.vm_api.get_list_of_repo_files_callback', with_files) + def testEngagementsWithFilesIsMarked(self): + + bus_service.send_message(DailyScheduledMessage()) + updated_engagement = Engagement.objects.get(uuid=self.engagement.uuid) + + self.assertEqual(updated_engagement.is_with_files, True) diff --git a/django/engagementmanager/tests/test_pull_notifications.py b/django/engagementmanager/tests/test_pull_notifications.py new file mode 100755 index 0000000..1c22437 --- /dev/null +++ b/django/engagementmanager/tests/test_pull_notifications.py @@ -0,0 +1,134 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.apps import bus_service +from engagementmanager.bus.messages.activity_event_message import ActivityEventMessage +from engagementmanager.models import Vendor +from engagementmanager.utils.constants import Constants, ActivityType +from engagementmanager.utils.activities_data import ActivityData, UserJoinedEngagementActivityData +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class NotificationsTestCase(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + + self.createDefaultRoles() + # Create a user with role el + vendor = Vendor.objects.get(name=Constants.service_provider_company_name) + self.peer_reviewer = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + self.el_user = self.creator.createUser(vendor, + self.randomGenerator("main-vendor-email"), + self.randomGenerator("randomNumber"), + self.randomGenerator("randomString"), self.el, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.el_user.uuid)) + print('Full Name: ' + self.el_user.full_name) + print('-----------------------------------------------------') + + # Create a user with role standard_user + vendor = Vendor.objects.get(name='Other') + self.user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user.uuid)) + print('Full Name: ' + self.user.full_name) + print('-----------------------------------------------------') + + # Create an Engagement with team + self.engagement = self.creator.createEngagement(self.randomGenerator( + "randomString"), self.randomGenerator("randomString"), None) + self.engagement.engagement_team.add(self.user, self.el_user) + print('-----------------------------------------------------') + print('Created Engagement:') + print('UUID: ' + str(self.engagement.uuid)) + print('-----------------------------------------------------') + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + # Create a VF + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, vendor) + + print('-----------------------------------------------------') + print('Created VF:') + print('UUID: ' + str(vendor.uuid)) + print('-----------------------------------------------------') + + self.token = self.loginAndCreateSessionToken(self.user) + + def testPullNotifications(self): + + urlStr = self.urlPrefix + 'notifications/' + + logger.debug("Starting pull notification test") + logger.debug("Creating a random new user") + vendor = Vendor.objects.get(name='Other') + randomUser = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + self.engagement.engagement_team.add(randomUser) + self.engagement.save() + + logger.debug( + "created a new user & added them to the engagement team, going to create the activity and consider it as a notification") + usersList = [] + usersList.append(randomUser) + activity_data = UserJoinedEngagementActivityData( + self.vf, usersList, self.engagement) + bus_service.send_message(ActivityEventMessage(activity_data)) + + response = self.c.get(urlStr + str(randomUser.uuid) + + "/0/1", **{'HTTP_AUTHORIZATION': "token " + self.token}) + content = response.content + status = response.status_code + logger.debug("Got response : " + str(status)) + logger.debug("Got response : " + str(content)) + if (status != 200): + logger.error("Got response : " + str(status) + + " , wrong http response returned ") + self.assertEqual(response.status_code, 200) + logger.debug("Ended pullNotifications test ") diff --git a/django/engagementmanager/tests/test_rados_gateway.py b/django/engagementmanager/tests/test_rados_gateway.py new file mode 100755 index 0000000..0d81ecc --- /dev/null +++ b/django/engagementmanager/tests/test_rados_gateway.py @@ -0,0 +1,178 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +import time +from engagementmanager.tests.test_base_transaction_entity import TestBaseTransactionEntity +from django.test.testcases import TransactionTestCase +from django.conf import settings +import mock +from rest_framework.status import HTTP_202_ACCEPTED +from engagementmanager.models import Vendor, Engagement, VF +from engagementmanager.utils.constants import Constants, EngagementStage +from wheel.signatures import assertTrue + + +def get_or_create_bucket_mock(name): + bucket = {'bucket': name, + 'categories': [ + {'bytes_received': 0, + 'bytes_sent': 1388, + 'category': 'list_buckets', + 'ops': 4, + 'successful_ops': 4}], + 'epoch': 1499821200, + 'owner': 'staticfiles', + 'time': '2017-07-12 01:00:00.000000Z'} + + return bucket + + +def add_bucket_user_mock(user, bucket): + RadosGatewayTestCase.users_added_to_mock.append(user) + RadosGatewayTestCase.added_bucket = bucket + print("*****RadosGatewayTestCase.added_bucket*****",RadosGatewayTestCase.added_bucket) + + +def remove_bucket_user_grants_mock(bucket, user): + RadosGatewayTestCase.added_bucket = bucket + RadosGatewayTestCase.users_added_to_mock.remove(user) + + +def blank_mock(vf): + print("===--blank mock was activated--===") + pass + + +@mock.patch('validationmanager.em_integration.vm_api.get_or_create_bucket', get_or_create_bucket_mock) +@mock.patch('validationmanager.em_integration.vm_api.add_bucket_user', add_bucket_user_mock) +@mock.patch('validationmanager.em_integration.vm_api.remove_bucket_user_grants', remove_bucket_user_grants_mock) +@mock.patch('validationmanager.em_integration.vm_api.ensure_git_entities', blank_mock) +@mock.patch('validationmanager.em_integration.vm_api.ensure_jenkins_job', blank_mock) +@mock.patch('validationmanager.em_integration.vm_api.ensure_checklists', blank_mock) +class RadosGatewayTestCase(TestBaseTransactionEntity): + users_added_to_mock = [] + added_bucket = None + + def changeEngagementStage(self, stage): + self.urlStr = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + "/stage/@stage" + datajson = json.dumps(self.data, ensure_ascii=False) + response = self.c.put(self.urlStr.replace("@stage", stage), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + self.assertEqual(response.status_code, HTTP_202_ACCEPTED) + + def waitForBucket(self): + counter = 1 + while (RadosGatewayTestCase.added_bucket == None and counter <=20): + time.sleep(1) + time.sleep(1) + if RadosGatewayTestCase.added_bucket == None : + raise Exception("Max retries exceeded, failing test...") + return False + elif RadosGatewayTestCase.added_bucket != None: + return True + + def childSetup(self): + RadosGatewayTestCase.users_added_to_mock = [] + RadosGatewayTestCase.added_bucket = None + settings.IS_SIGNAL_ENABLED = True + self.s3_host = settings.AWS_S3_HOST + self.s3_port = settings.AWS_S3_PORT + + vendor_uuid, self.service_provider = self.creator.createVendor(Constants.service_provider_company_name) + self.urlStr = self.urlPrefix + "signup/" + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + + self.data = dict() + uuid, vendor = self.creator.createVendor(Constants.service_provider_company_name) + self.user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + + self.params = '{"company":"' + str(self.user.company) + '","full_name":"' + self.user.full_name + '","email":"' \ + + self.user.email + '","phone_number":"' + self.user.phone_number + \ + '","password":"' + self.user.user.password + '","regular_email_updates":"' + \ + str(self.user.regular_email_updates) + \ + '","is_service_provider_contact":"' + str(self.user.is_service_provider_contact) + '"}' + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + self.peer_reviewer = self.creator.createUser(self.service_provider, self.randomGenerator( + "main-vendor-email"), self.randomGenerator("randomNumber"), self.randomGenerator("randomString"), self.el, True) + + self.engagement = self.creator.createEngagement('just-a-fake-uuid', 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.engagement_team.add(self.el_user, self.user) + self.engagement.engagement_manual_id = self.randomGenerator("randomString") + self.engagement.save() + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, self.service_provider) + self.userToken = self.loginAndCreateSessionToken(self.user) + self.ELtoken = self.loginAndCreateSessionToken(self.el_user) + + def testCreateBucketWithUser(self): + self.assertTrue(self.added_bucket is None) + self.changeEngagementStage(EngagementStage.Active.name) + self.assertTrue(RadosGatewayTestCase.added_bucket is not None) + team_members_list = [entry for entry in self.engagement.engagement_team.all()] + for team_member in team_members_list: + assertTrue(any(team_member.full_name == entity.full_name for entity in RadosGatewayTestCase.users_added_to_mock)) + + + def testDeleteUsersFromBucket(self): + self.changeEngagementStage(EngagementStage.Active.name) + self.changeEngagementStage(EngagementStage.Validated.name) + self.waitForBucket() + self.assertTrue(RadosGatewayTestCase.added_bucket is not None) + self.assertTrue(RadosGatewayTestCase.users_added_to_mock == []) + + def testDeleteUsersFromBucketWhichNotCreated(self): + self.assertTrue(RadosGatewayTestCase.added_bucket is None) + self.changeEngagementStage(EngagementStage.Validated.name) + self.waitForBucket() + self.assertTrue(RadosGatewayTestCase.added_bucket is not None) + self.assertTrue(RadosGatewayTestCase.users_added_to_mock == []) + + def testDeleteUsersFromBucketWhwenStageArchive(self): + self.assertTrue(RadosGatewayTestCase.added_bucket is None) + self.changeEngagementStage(EngagementStage.Archived.name) + self.waitForBucket() + self.assertTrue(RadosGatewayTestCase.added_bucket is not None) + self.assertTrue(RadosGatewayTestCase.users_added_to_mock == []) diff --git a/django/engagementmanager/tests/test_remove_user_from_eng_team.py b/django/engagementmanager/tests/test_remove_user_from_eng_team.py new file mode 100755 index 0000000..15bcf2d --- /dev/null +++ b/django/engagementmanager/tests/test_remove_user_from_eng_team.py @@ -0,0 +1,210 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +from rest_framework.status import HTTP_204_NO_CONTENT +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class TestEngagementSetStage(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Amdocs']) + self.vendor = Vendor.objects.get(name='Amdocs') + self.service_provider = Vendor.objects.get(name=Constants.service_provider_company_name) + self.createDefaultRoles() + + # For negative tests + self.user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + self.user2 = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user2', self.standard_user, True) + # Create users with role el (el+peer reviwer) + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + self.peer_reviewer = self.creator.createUser(self.service_provider, self.randomGenerator( + "main-vendor-email"), self.randomGenerator("randomNumber"), self.randomGenerator("randomString"), self.el, True) + # Create a user with admin role + self.admin_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), Constants.service_provider_admin_mail, + '55501000199', 'admin user', self.admin, True) + + self.engagement = self.creator.createEngagement('just-a-fake-uuid', 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.engagement_team.add(self.el_user, self.user, self.user2) + self.engagement.save() + + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) +# self.asInfrastructure = self.creator.createApplicationServiceInfrastructure(self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, self.vendor) +# self.vf.service_infrastructures.add(self.asInfrastructure) + + self.data = dict() + self.user_token = self.loginAndCreateSessionToken(self.user) + self.user2_token = self.loginAndCreateSessionToken(self.user2) + self.admin_token = self.loginAndCreateSessionToken(self.admin_user) + + def loggerTestFailedOrSucceded(self, bool): + if bool: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test Succeeded") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + else: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug("Test failed") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + def test_remove_user_from_eng_team_by_admin(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test 2 started: Admin removes user from the eng team!") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.get_engagement_url = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + self.urlStr = self.urlPrefix + "engagements/engagement-team/" + self.data['eng_uuid'] = str(self.engagement.uuid) + self.data['user_uuid'] = str(self.user.uuid) + + datajson = json.dumps(self.data, ensure_ascii=False) + + logger.debug("**********************************************************************") + logger.debug("----- sending put request with body -----") + logger.debug("**********************************************************************") + + response = self.c.put(self.urlStr, datajson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + if (response.status_code != HTTP_204_NO_CONTENT): + print(response.status_code) + response2 = self.c.get(self.get_engagement_url, {}, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + + # Check if the user it still in the engagement team + received_eng = json.loads(response2.content) + found = False + for item in received_eng["engagement"]["engagement_team"]: + if (self.user.email == item["email"]): + found = True + break + if found: + self.loggerTestFailedOrSucceded(False) + self.assert_(False, "user is still in the eng_team") + else: + self.loggerTestFailedOrSucceded(True) + + def test_negative_remove_user_from_eng_team_by_another_user(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test 3 (Negative) started: User2 removes user1 from the eng team!") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.get_engagement_url = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + self.urlStr = self.urlPrefix + "engagements/engagement-team/" + self.data['eng_uuid'] = str(self.engagement.uuid) + self.data['user_uuid'] = str(self.user.uuid) + + datajson = json.dumps(self.data, ensure_ascii=False) + + logger.debug("**********************************************************************") + logger.debug("----- sending put request with body -----") + logger.debug("**********************************************************************") + + response = self.c.put(self.urlStr, datajson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.user2_token}) + if (response.status_code != HTTP_204_NO_CONTENT): + print(response.status_code) + response2 = self.c.get(self.get_engagement_url, {}, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + + # Check if the user it still in the engagement team(it is supposed to remain there) + received_eng = json.loads(response2.content) + found = False + for item in received_eng["engagement"]["engagement_team"]: + if (self.user.email == item["email"]): + found = True + break + if not found: + self.loggerTestFailedOrSucceded(False) + self.assert_(False, "user is NOT in the eng_team") + else: + self.loggerTestFailedOrSucceded(True) + + def test_negative_remove_el_user_from_eng_team_by_admin(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test 4 (Negative) started: admin removes el_user from the eng team!") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.get_engagement_url = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + self.urlStr = self.urlPrefix + "engagements/engagement-team/" + self.data['eng_uuid'] = str(self.engagement.uuid) + self.data['user_uuid'] = str(self.el_user.uuid) + + datajson = json.dumps(self.data, ensure_ascii=False) + + logger.debug("**********************************************************************") + logger.debug("----- sending put request with body -----") + logger.debug("**********************************************************************") + + response = self.c.put(self.urlStr, datajson, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + if (response.status_code != HTTP_204_NO_CONTENT): + print(response.status_code) + response2 = self.c.get(self.get_engagement_url, {}, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + + # Check if the user it still in the engagement team(it is supposed to remain there) + received_eng = json.loads(response2.content) + found = False + for item in received_eng["engagement"]["engagement_team"]: + if (self.el_user.email == item["email"]): + found = True + break + if not found: + self.loggerTestFailedOrSucceded(False) + self.assert_(False, "el user was NOT found in the eng_team") + else: + self.loggerTestFailedOrSucceded(True) diff --git a/django/engagementmanager/tests/test_request_data_manager.py b/django/engagementmanager/tests/test_request_data_manager.py new file mode 100755 index 0000000..580301e --- /dev/null +++ b/django/engagementmanager/tests/test_request_data_manager.py @@ -0,0 +1,205 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from concurrent.futures import ThreadPoolExecutor +import json +from random import randint +import threading +import time +from django.db import connections +from django.test.client import Client +from django.test.testcases import TransactionTestCase +from rest_framework.status import HTTP_401_UNAUTHORIZED, HTTP_200_OK +from engagementmanager.models import Vendor +from engagementmanager.tests.vvpEntitiesCreator import VvpEntitiesCreator +from engagementmanager.utils.authentication import JWTAuthentication +from engagementmanager.utils.constants import Constants +from engagementmanager.utils.request_data_mgr import request_data_mgr +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class TestRequestDataManager(TransactionTestCase): + + def childSetup(self): + logger.debug("---------------------- TestCase " + self.__class__.__name__ + " ----------------------") + self.urlPrefix = "/ice/v1/engmgr/" + self.c = Client() + self.creator = VvpEntitiesCreator() + + for vendor in [Constants.service_provider_company_name, 'Other']: + vendorUuid, vendor = self.creator.createVendor(vendor) + logger.debug(vendorUuid) + + self.admin, self.el, self.standard_user = self.creator.createAndGetDefaultRoles() + + # Create a user with role el + self.el_user = self.creator.createUser( + Vendor.objects.get(name=Constants.service_provider_company_name), self.creator.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + self.peer_review_user = self.creator.createUser( + Vendor.objects.get(name=Constants.service_provider_company_name), self.creator.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + # Create a user with role standard_user + self.user = self.creator.createUser( + Vendor.objects.get(name='Other'), self.creator.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + self.user_not_team = self.creator.createUser( + Vendor.objects.get(name='Other'), self.creator.randomGenerator("main-vendor-email"), + '55501000199', 'user2', self.standard_user, True) +# # Create an Engagement with team + self.engagement = self.creator.createEngagement('just-a-fake-uuid', 'Validation', None) + self.engagement.engagement_team.add(self.user, self.el_user) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_review_user + self.engagement.save() + + self.jwt_service = JWTAuthentication() + self.token = self.jwt_service.create_token(self.user.user) + self.ELtoken = self.jwt_service.create_token(self.el_user.user) + + urlStr = self.urlPrefix + 'engagements/${uuid}/status' + myjson = json.dumps({"description": "blah blah"}, ensure_ascii=False) + response = self.c.post(urlStr.replace('${uuid}', str( + self.engagement.uuid)), myjson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + self.created_status = json.loads(response.content) + + def my_task(self, eng_id): + thread_local_id = threading.currentThread().ident + + # Inject an attribute into request data + request_data_mgr.set_eng_uuid(eng_id) + + # Inject thread id into requeat data + request_data_mgr.set_cl_uuid(thread_local_id) + + return self.my_inner_funcion(eng_id) + + def my_inner_funcion(self, eng_id): + thread_local_id = threading.currentThread().ident + + assert request_data_mgr.get_request_data() + + assert request_data_mgr.get_request_data()._eng_uuid == eng_id + assert request_data_mgr.get_eng_uuid() == eng_id + + # Checks that the allocated thread from testRequestDataManager is the same thread running in inner function + assert request_data_mgr.get_request_data()._cl_uuid == thread_local_id + assert request_data_mgr.get_cl_uuid() == thread_local_id + + print('thread: ' + str(thread_local_id) + '. request data : ' + str(request_data_mgr.get_request_data_vars())) + return "OK" + + def lauchTests(self): + executor = ThreadPoolExecutor(max_workers=10) + + for i in range(0, 100): + future1 = executor.submit(self.my_task, "eng#" + str(i)) + assert future1.result() == "OK" + + ########### TESTS ########### + + def testRequestDataManager(self): + executor = ThreadPoolExecutor(max_workers=2) + executor.submit(self.lauchTests) + executor.submit(self.lauchTests) + + def testMultipleRequestsInParallel(self): + self.childSetup() + number_of_concurrent_requests = 10 + executor = ThreadPoolExecutor(max_workers=number_of_concurrent_requests) + + def close_db_connections(func, *args, **kwargs): + """ + Decorator to explicitly close db connections during threaded execution + + Note this is necessary to work around: + https://code.djangoproject.com/ticket/22420 + """ + def _close_db_connections(*args, **kwargs): + ret = None + try: + ret = func(*args, **kwargs) + finally: + for conn in connections.all(): + logger.debug("Closing DB connection. connection=" + str(conn)) + conn.close() + return ret + return _close_db_connections + + @close_db_connections + def invokeRequest(metadata, token): + urlStr = self.urlPrefix + 'engagements/${uuid}/status' + + logger.debug("START - " + metadata) + + myjson = json.dumps( + {"eng_status_uuid": self.created_status['uuid'], "description": "blah2 blah2"}, ensure_ascii=False) + response = self.c.put(urlStr.replace('${uuid}', str( + self.engagement.uuid)), myjson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + + time.sleep(randint(0, 1)) + + myjson = json.dumps( + {"eng_status_uuid": self.created_status['uuid'], "description": "blah2 blah2"}, ensure_ascii=False) + response = self.c.put(urlStr.replace('${uuid}', str( + self.engagement.uuid)), myjson, content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + token}) + print('Got response : ' + str(response.status_code)) + self.assertEqual(response.status_code, HTTP_200_OK) + + logger.debug("END - " + metadata) + + return metadata + + for i in range(0, number_of_concurrent_requests): + eluser = self.creator.createUser( + Vendor.objects.get(name='Other'), self.creator.randomGenerator("main-vendor-email"), + '55501000199', 'user' + str(i), self.el, True) + token = self.jwt_service.create_token(eluser.user) + self.engagement = self.creator.createEngagement('just-a-fake-uuid', 'Validation', None) + self.engagement.engagement_team.add(eluser) + self.engagement.reviewer = eluser + self.engagement.peer_reviewer = self.peer_review_user + self.engagement.save() + + metadata = "test " + str(i) + future = executor.submit(invokeRequest, metadata, token) + assert future.result() == metadata diff --git a/django/engagementmanager/tests/test_resend_activation_email.py b/django/engagementmanager/tests/test_resend_activation_email.py new file mode 100755 index 0000000..b544001 --- /dev/null +++ b/django/engagementmanager/tests/test_resend_activation_email.py @@ -0,0 +1,62 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants + + +class TestResendActivationEmail(TestBaseEntity): + + def childSetup(self): + self.createDefaultRoles() + uuid, vendor = self.creator.createVendor(Constants.service_provider_company_name) + self.user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, False) + self.urlStr = self.urlPrefix + \ + "users/activation-mail/" + self.user.uuid + self.params = {"email": self.user.email} + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user.uuid)) + print('Full Name: ' + self.user.full_name) + print('-----------------------------------------------------') + + def test_resend_activation_email(self): + response = self.c.get( + self.urlStr, self.params, content_type='application/json') + self.assertTrue(response.status_code, "200") diff --git a/django/engagementmanager/tests/test_reset_password.py b/django/engagementmanager/tests/test_reset_password.py new file mode 100755 index 0000000..7cfdd54 --- /dev/null +++ b/django/engagementmanager/tests/test_reset_password.py @@ -0,0 +1,75 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json + +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.models import Vendor +from engagementmanager.utils.constants import Constants + + +class TestResetPasswordTestCase(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Amdocs']) + self.createDefaultRoles() + + # Create a user with role el + self.user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + + self.urlStr = self.urlPrefix + "users/pwd/sendresetinstr/" + self.data = dict() + self.token = self.loginAndCreateSessionToken(self.user) + + def initBody(self): + self.data['email'] = self.user.email + + def resetPwd(self, expectedStatus=200, httpMethod="PUT"): + self.accountData = json.dumps(self.data, ensure_ascii=False) + if (httpMethod == "PUT"): + response = self.c.put(self.urlStr, self.accountData, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + elif (httpMethod == "POST"): + response = self.c.post(self.urlStr, self.accountData, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code) + " Expecting " + str(expectedStatus)) + self.assertEqual(response.status_code, expectedStatus) + return response diff --git a/django/engagementmanager/tests/test_rgwa_client.py b/django/engagementmanager/tests/test_rgwa_client.py new file mode 100755 index 0000000..a65f5b3 --- /dev/null +++ b/django/engagementmanager/tests/test_rgwa_client.py @@ -0,0 +1,214 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from engagementmanager.utils.constants import Constants +from validationmanager.rados.rgwa_client import _validate_args, RGWAClient + + +try: + from unittest.mock import patch, ANY +except ImportError: + from mock import patch, ANY + +try: + from unittest import assertRaises, expectedFailure +except ImportError: + from pytest import raises as assertRaises + from pytest import mark + expectedFailure = mark.xfail + + +class Test_ValidateArgs(object): + + def setup(self): + self.valid_args = { + 'foo': ['foo1', 'foo2'], + 'bar': [1, 2, 3], + } + + def test_unconstrained(self): + _validate_args(self.valid_args, baz="quux") + + def test_none_value(self): + _validate_args(self.valid_args, foo=None) + + def test_good_value(self): + _validate_args(self.valid_args, foo="foo1") + + def test_bad_value_raises(self): + with assertRaises(ValueError): + _validate_args(self.valid_args, foo="foo3") + + +@patch('ice_rgwa_client.request') +class TestRGWAClientMethods(object): + """Tests that all the methods invoke requests.request() with the + appropriate arguments. + + """ + + def setup(self): + self.access_key = 'ABCDEFGHIJKLMNOPQRST' + self.secret_key = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn' + self.conn = RGWAClient( + access_key=self.access_key, + secret_key=self.secret_key, + base_url=Constants.rgwa_base_url + ) + + def test_get_usage(self, r): + self.conn.get_usage(uid='foo', show_entries=True) + r.assert_called_once_with( + auth=ANY, + method='get', + json={}, + params={'show-entries': True, 'show-summary': False, 'uid': 'foo'}, + url=Constants.rgwa_base_url+'/usage', + verify=True, + ) + + def test_trim_usage(self, r): + self.conn.trim_usage(uid='foo', remove_all=True) + r.assert_called_once_with( + auth=ANY, + method='delete', + json={}, + params={'remove-all': True, 'uid': 'foo'}, + url='http://localhost:8123/admin/usage', + verify=True, + ) + + def test_get_user(self, r): + self.conn.get_user(uid='foo') + r.assert_called_once_with( + auth=ANY, + method='get', + json={}, + params={'uid': 'foo'}, + url=Constants.rgwa_base_url+'/user', + verify=True, + ) + + # Marked FIXME because we experience diminishing returns here. All the + # methods in the library are basically one-liner calls to the common + # _request method, which has been sufficiently covered. There's no + # additional business logic that would be tested, only ensuring beyond api + # stability. Time would be better spent figuring out how to get tox to + # stand up testing instances of various versions of an actual radosgw + # server for integration testing. + + @expectedFailure + def test_create_user(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_modify_user(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_remove_user(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_create_subuser(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_modify_subuser(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_remove_subuser(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_create_key(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_remove_key(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_get_bucket(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_check_bucket_index(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_remove_bucket(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_unlink_bucket(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_link_bucket(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_remove_object(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_get_policy(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_add_capability(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_remove_capability(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_get_quota(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_set_quota(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_get_user_quota(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_set_user_quota(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_get_user_bucket_quota(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_set_user_bucket_quota(self, r): + raise NotImplementedError # FIXME + + @expectedFailure + def test_get_users(self, r): + raise NotImplementedError # FIXME + @expectedFailure + def test_get_buckets(self, r): + raise NotImplementedError # FIXME + + +# FIXME TODO Add integration tests against a local ceph radosgw instance, +# (disabled by default). Record result of test suite in repository. diff --git a/django/engagementmanager/tests/test_set_checklist_state.py b/django/engagementmanager/tests/test_set_checklist_state.py new file mode 100755 index 0000000..54d3c8e --- /dev/null +++ b/django/engagementmanager/tests/test_set_checklist_state.py @@ -0,0 +1,236 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from engagementmanager.models import Vendor, Checklist, ChecklistDecision, \ + ChecklistLineItem, ChecklistSection +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import CheckListLineType, \ + CheckListDecisionValue, CheckListState, ChecklistDefaultNames, Constants +from rest_framework.status import HTTP_200_OK +from uuid import uuid4 +import json +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class TestChecklistSetState(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Amdocs']) + self.vendor = Vendor.objects.get(name='Amdocs') + self.createDefaultRoles() + + # For negative tests + self.user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + # Create users with role el (el+peer reviwer) + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + self.peer_review_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'peer-reviewer user', self.el, True) + # Create a user with admin role + self.admin_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), Constants.service_provider_admin_mail, + '55501000199', 'admin user', self.admin, True) + + self.template = self.creator.createDefaultCheckListTemplate() + self.engagement = self.creator.createEngagement( + 'just-a-fake-uuid', 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_review_user + self.engagement.engagement_team.add(self.el_user, self.user) + self.engagement.save() + + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) +# self.asInfrastructure = self.creator.createApplicationServiceInfrastructure(self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, self.vendor) +# self.vf.service_infrastructures.add(self.asInfrastructure) + + self.clbodydata = dict() + self.initCLBody() + self.checklist = Checklist.objects.create(uuid=uuid4(), name=self.clbodydata['checkListName'], validation_cycle=1, associated_files=self.clbodydata[ + 'checkListAssociatedFiles'], engagement=self.engagement, template=self.template, creator=self.el_user, owner=self.el_user) + self.checklist.save() + self.section = ChecklistSection.objects.create(uuid=uuid4(), name=self.randomGenerator("randomString"), weight=1.0, description=self.randomGenerator( + "randomString"), validation_instructions=self.randomGenerator("randomString"), template=self.template) + self.section.save() + self.line_item = ChecklistLineItem.objects.create(uuid=uuid4(), name=self.randomGenerator("randomString"), weight=1.0, description=self.randomGenerator( + "randomString"), line_type=CheckListLineType.auto.name, validation_instructions=self.randomGenerator("randomString"), template=self.template, section=self.section) # @UndefinedVariable + self.line_item2 = ChecklistLineItem.objects.create(uuid=uuid4(), name=self.randomGenerator("randomString"), weight=1.0, description=self.randomGenerator( + "randomString"), line_type=CheckListLineType.auto.name, validation_instructions=self.randomGenerator("randomString"), template=self.template, section=self.section) # @UndefinedVariable + self.line_item.save() + self.line_item2.save() + self.decision = ChecklistDecision.objects.create( + uuid=uuid4(), checklist=self.checklist, template=self.template, lineitem=self.line_item) + self.decision2 = ChecklistDecision.objects.create( + uuid=uuid4(), checklist=self.checklist, template=self.template, lineitem=self.line_item2) + self.decision.save() + self.decision2.save() + self.data = dict() + self.peer_review_token = self.loginAndCreateSessionToken( + self.peer_review_user) + self.ELtoken = self.loginAndCreateSessionToken(self.el_user) + self.admin_token = self.loginAndCreateSessionToken(self.admin_user) + + def initCLBody(self): + self.clbodydata['checkListName'] = ChecklistDefaultNames.HEAT_TEMPLATES + self.clbodydata['checkListTemplateUuid'] = str(self.template.uuid) + self.clbodydata[ + 'checkListAssociatedFiles'] = "[\"file0/f69f4ce7-51d5-409c-9d0e-ec6b1e79df28\", \"file1/f69f4ce7-51d5-409c-9d0e-ec6b1e79df28\", \"file2/f69f4ce7-51d5-409c-9d0e-ec6b1e79df28\"]" + + def loggerTestFailedOrSucceded(self, bool): + if bool: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test Succeeded") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + else: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug("Test failed") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + def testSetStateFullWorkFlow(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: Full positive work flow") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "checklist/@cl_uuid/state/" + self.get_checklist_url = self.urlPrefix + "checklist/@cl_uuid" + self.url_for_decision = self.urlPrefix + "checklist/decision/@decision_uuid" + + self.data['decline'] = "False" + self.data['description'] = "BLA BLA BLA" + datajson = json.dumps(self.data, ensure_ascii=False) + + logger.debug( + "**********************************************************************") + logger.debug("----- 1. Current state is " + + self.checklist.state + " -----") + logger.debug( + "**********************************************************************") + + logger.debug( + "----- 1.1 Wishing to move from pending state to automation state -----") + logger.debug( + "----- 1.2 Performing a request to move forward to the next state -----") + response = self.c.put(self.urlStr.replace("@cl_uuid", str(self.checklist.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('1.3 Please Notice, you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_200_OK) + ' response') + self.assertEqual(response.status_code, HTTP_200_OK) + + logger.debug( + "----- 2 Test engine Mock retrieved tests results, hence checklist state moved to review state. -----") + logger.debug( + "----- 3.1 changing decisions' review value to APPROVED -----") + decisions = ChecklistDecision.objects.filter(checklist=self.checklist) + for dec in decisions: + self.data['value'] = "approved" + datajson = json.dumps(self.data, ensure_ascii=False) + response = self.c.put(self.url_for_decision.replace("@decision_uuid", str(dec.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug( + "----- 3.2 Wishing to move from review state to peer_review state -----") + logger.debug( + "----- 3.3 Performing a request to move forward to the next state -----") + + response = self.c.put(self.urlStr.replace("@cl_uuid", str(self.checklist.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('3.4 you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_200_OK) + ' response') + self.assertEqual(response.status_code, HTTP_200_OK) + logger.debug( + "----- 4.1 changing the cl peer_reviews' decisions values to NA -----") + for dec in decisions: + dec.peer_review_value = CheckListDecisionValue.not_relevant.name # @UndefinedVariable + dec.save() + + logger.debug( + "----- 4.2 Wishing to move from peer_review state to approval state -----") + logger.debug( + "----- 4.3 Performing a request to move forward to the next state -----") + response = self.c.put(self.urlStr.replace("@cl_uuid", str(self.checklist.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.peer_review_token}) + logger.debug('4.4 Please Notice, you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_200_OK) + ' response') + self.assertEqual(response.status_code, HTTP_200_OK) + logger.debug( + "----- 5.1 Wishing to move from approval state to handoff state -----") + logger.debug( + "----- 5.2 Performing a request to move forward to the next state -----") + response = self.c.put(self.urlStr.replace("@cl_uuid", str(self.checklist.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + logger.debug('Please Notice, you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_200_OK) + ' response') + self.assertEqual(response.status_code, HTTP_200_OK) + logger.debug( + "----- 6.1 Wishing to move from handoff state to closed state -----") + logger.debug( + "----- 6.2 Performing a request to move forward to the last state -----") + response = self.c.put(self.urlStr.replace("@cl_uuid", str(self.checklist.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('Please Notice, you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_200_OK) + ' response') + self.assertEqual(response.status_code, HTTP_200_OK) + self.loggerTestFailedOrSucceded(True) + + ''' + This test checks that the signal is sent and responds with 200OK. Also that the signal logic change the checklist state to automation + ''' + + def testClFromPendingToAutomationSignal(self): + + self.urlStr = self.urlPrefix + "checklist/@cl_uuid/state/" + self.get_checklist_url = self.urlPrefix + "checklist/@cl_uuid" + self.url_for_decision = self.urlPrefix + "checklist/decision/@decision_uuid" + + self.data['decline'] = "False" + self.data['description'] = "BLA BLA BLA" + datajson = json.dumps(self.data, ensure_ascii=False) + + cl = Checklist.objects.get(uuid=self.checklist.uuid) + cl.state = CheckListState.pending.name # @UndefinedVariable + cl.save() + res1 = self.c.put(self.urlStr.replace("@cl_uuid", str(self.checklist.uuid)), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + self.assertEqual(res1.status_code, HTTP_200_OK) diff --git a/django/engagementmanager/tests/test_set_eng_stage.py b/django/engagementmanager/tests/test_set_eng_stage.py new file mode 100755 index 0000000..f26b70b --- /dev/null +++ b/django/engagementmanager/tests/test_set_eng_stage.py @@ -0,0 +1,207 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json +from rest_framework.status import HTTP_202_ACCEPTED,\ + HTTP_401_UNAUTHORIZED +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import EngagementStage, Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class TestEngagementSetStage(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Amdocs']) + self.vendor = Vendor.objects.get(name='Amdocs') + self.service_provider = Vendor.objects.get(name=Constants.service_provider_company_name) + self.createDefaultRoles() + + # For negative tests + self.user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + # Create users with role el (el+peer reviwer) + self.el_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'el user', self.el, True) + self.peer_reviewer = self.creator.createUser(self.service_provider, self.randomGenerator( + "main-vendor-email"), self.randomGenerator("randomNumber"), self.randomGenerator("randomString"), self.el, True) + # Create a user with admin role + self.admin_user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), Constants.service_provider_admin_mail, + '55501000199', 'admin user', self.admin, True) + + self.engagement = self.creator.createEngagement('just-a-fake-uuid', 'Validation', None) + self.engagement.reviewer = self.el_user + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.engagement_team.add(self.el_user, self.user) + self.engagement.save() + + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) +# self.asInfrastructure = self.creator.createApplicationServiceInfrastructure(self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, self.vendor) +# self.vf.service_infrastructures.add(self.asInfrastructure) + + self.data = dict() + self.user_token = self.loginAndCreateSessionToken(self.user) + self.ELtoken = self.loginAndCreateSessionToken(self.el_user) + self.admin_token = self.loginAndCreateSessionToken(self.admin_user) + + def loggerTestFailedOrSucceded(self, bool): + if bool: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test Succeeded") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + else: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug("Test failed") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + def testSetEngagementStageFullWorkFlowELUser(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: change from Intake to Active, using --EL-- user!") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + self.urlStr = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + "/stage/@stage" + self.get_engagement_url = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + datajson = json.dumps(self.data, ensure_ascii=False) + + logger.debug("**********************************************************************") + logger.debug("----- 1. Current stage is " + self.engagement.engagement_stage + " -----") + logger.debug("**********************************************************************") + + logger.debug("----- 1.1 Wishing to move from Intake stage to Active stage -----") + logger.debug("----- 1.2 Performing a request to move forward to the next stage -----") + response = self.c.put(self.urlStr.replace("@stage", EngagementStage.Active.name), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + response2 = self.c.get(self.get_engagement_url, {}, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + + # Check if the correct activity was created for stage change to Active + self.notifcation_url = self.urlPrefix + "engagement/" + str(self.engagement.uuid) + "/activities/" + response3 = self.c.get(self.notifcation_url, {}, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + received_notification = json.loads(response3.content)[0]['description'] + should_be = "Engagement stage is now Active for the following VF: ##vf_name##" + self.assertEqual(should_be, received_notification) + self.assertEqual(self.vf.name in json.loads(response3.content)[0]['metadata'], True) + logger.debug('1.3 Please Notice, you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_202_ACCEPTED) + ' response') + self.assertEqual(response.status_code, HTTP_202_ACCEPTED) + logger.debug("----- 2 Wishing to move from Active stage to Validated stage -----") + logger.debug("----- 2.1 Performing a request to move forward to the next stage -----") + response = self.c.put(self.urlStr.replace("@stage", EngagementStage.Validated.name), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('2.3 Please Notice, you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_202_ACCEPTED) + ' response') + self.assertEqual(response.status_code, HTTP_202_ACCEPTED) + logger.debug("----- 3.1 Wishing to move from Validated stage to Completed stage -----") + logger.debug("----- 3.2 Performing a request to move forward to the next stage -----") + response = self.c.put(self.urlStr.replace("@stage", EngagementStage.Completed.name), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.ELtoken}) + logger.debug('2.4 you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_202_ACCEPTED) + ' response') + self.assertEqual(response.status_code, HTTP_202_ACCEPTED) + self.loggerTestFailedOrSucceded(True) + + def testSetEngagementStageFullWorkFlowAdminUser(self): + + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: change from Intake to Active, using --ADMIN-- user!") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + "/stage/@stage" + self.get_engagement_url = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + datajson = json.dumps(self.data, ensure_ascii=False) + + logger.debug("**********************************************************************") + logger.debug("----- 1. Current stage is " + self.engagement.engagement_stage + " -----") + logger.debug("**********************************************************************") + + logger.debug("----- 1.1 Wishing to move from Intake stage to Active stage -----") + logger.debug("----- 1.2 Performing a request to move forward to the next stage -----") + response = self.c.put(self.urlStr.replace("@stage", EngagementStage.Active.name), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + logger.debug('1.3 Please Notice, you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_202_ACCEPTED) + ' response') + self.assertEqual(response.status_code, HTTP_202_ACCEPTED) + self.loggerTestFailedOrSucceded(False) + logger.debug("----- 2 Wishing to move from Active stage to Validated stage -----") + logger.debug("----- 2.1 Performing a request to move forward to the next stage -----") + response = self.c.put(self.urlStr.replace("@stage", EngagementStage.Validated.name), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + logger.debug('2.3 Please Notice, you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_202_ACCEPTED) + ' response') + self.assertEqual(response.status_code, HTTP_202_ACCEPTED) + logger.debug("----- 3 Wishing to move from Validated stage to Completed stage -----") + logger.debug("----- 3.1 Performing a request to move forward to the next stage -----") + response = self.c.put(self.urlStr.replace("@stage", EngagementStage.Completed.name), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.admin_token}) + logger.debug('2.4 you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_202_ACCEPTED) + ' response') + self.assertEqual(response.status_code, HTTP_202_ACCEPTED) + self.loggerTestFailedOrSucceded(True) + + def test_negative_set_eng_Stage(self): + + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: Negative test to change stage with un authorized user") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + self.urlStr = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + "/stage/@stage" + self.get_engagement_url = self.urlPrefix + "single-engagement/" + str(self.engagement.uuid) + datajson = json.dumps(self.data, ensure_ascii=False) + + logger.debug("**********************************************************************") + logger.debug("----- 1. Current stage is " + self.engagement.engagement_stage + " -----") + logger.debug("**********************************************************************") + + logger.debug("----- 1.1 Wishing to try move from Intake stage to Active stage AND FAIL! -----") + logger.debug("----- 1.2 Performing a request to move forward to the next stage -----") + response = self.c.put(self.urlStr.replace("@stage", EngagementStage.Active.name), datajson, + content_type='application/json', **{'HTTP_AUTHORIZATION': "token " + self.user_token}) + logger.debug('1.3 Please Notice, you got a ' + str(response.status_code) + + ' response, and was expecting a ' + str(HTTP_401_UNAUTHORIZED) + ' response') + self.assertEqual(response.status_code, HTTP_401_UNAUTHORIZED) + self.loggerTestFailedOrSucceded(True) diff --git a/django/engagementmanager/tests/test_update_password.py b/django/engagementmanager/tests/test_update_password.py new file mode 100755 index 0000000..08b25b2 --- /dev/null +++ b/django/engagementmanager/tests/test_update_password.py @@ -0,0 +1,99 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json + +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.models import Vendor +from engagementmanager.utils.constants import Constants + + +class TestUpdatePasswordTestCase(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Amdocs']) + self.createDefaultRoles() + + # Create a user with role el + self.user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + + self.urlStr = self.urlPrefix + "users/pwd/" + self.data = dict() + self.token = self.loginAndCreateSessionToken(self.user) + + def initBody(self): + self.data['password'] = "Aa12345" + self.data['confirm_password'] = "Aa12345" + + def updatePwd(self, expectedStatus=200, httpMethod="PUT"): + self.accountData = json.dumps(self.data, ensure_ascii=False) + if (httpMethod == "PUT"): + response = self.c.put(self.urlStr, self.accountData, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + elif (httpMethod == "POST"): + response = self.c.post(self.urlStr, self.accountData, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code) + " Expecting " + str(expectedStatus)) + self.assertEqual(response.status_code, expectedStatus) + return response + + ### TESTS ### + def testUpdatePasswordPositive(self): + self.initBody() + print("testUpdatePasswordPositive") + self.updatePwd(200, "PUT") + + def testUpdatePasswordNegative(self): + self.initBody() + print("testUpdatePasswordNegative") + self.updatePwd(405, "POST") + + def testUpdatePasswordNegativePwdNotMatch(self): + self.initBody() + self.data['confirm_password'] = self.data['password'] + "extraletters" + print("testUpdatePasswordNegativePwdNotMatch") + self.updatePwd(400, "PUT") + + def testUpdatePasswordNegativeMissingPassword(self): + self.initBody() + self.data['confirm_password'] = None + print("testUpdatePasswordNegativePwdNotMatch") + self.updatePwd(400, "PUT") diff --git a/django/engagementmanager/tests/test_update_user_account.py b/django/engagementmanager/tests/test_update_user_account.py new file mode 100755 index 0000000..fd0becd --- /dev/null +++ b/django/engagementmanager/tests/test_update_user_account.py @@ -0,0 +1,119 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +import json + +from engagementmanager.models import Vendor, IceUserProfile +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants + + +class TestUpdateUserAccountTestCase(TestBaseEntity): + + def childSetup(self): + + self.createVendors([Constants.service_provider_company_name, 'Amdocs']) + + self.createDefaultRoles() + + # Create a user with role el + self.user = self.creator.createUser(Vendor.objects.get( + name=Constants.service_provider_company_name), self.randomGenerator("main-vendor-email"), + '55501000199', 'user', self.standard_user, True) + print('-----------------------------------------------------') + print('Created User:') + print('UUID: ' + str(self.user.uuid)) + print('Full Name: ' + self.user.full_name) + print('-----------------------------------------------------') + + self.urlStr = self.urlPrefix + "users/account/" + self.data = dict() + self.token = self.loginAndCreateSessionToken(self.user) + + def initBody(self): + self.data['company'] = Vendor.objects.get(name='Amdocs').name + self.data['full_name'] = "user" + self.data['email'] = self.randomGenerator("main-vendor-email") + self.data['phone_number'] = "12345" + self.data['password'] = "Aa12345" + self.data['confirm_password'] = "Aa12345" + + def updateAccount(self, expectedStatus=200, httpMethod="PUT"): + self.accountData = json.dumps(self.data, ensure_ascii=False) + if (httpMethod == "PUT"): + response = self.c.put(self.urlStr, self.accountData, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + elif (httpMethod == "POST"): + response = self.c.post(self.urlStr, self.accountData, content_type='application/json', + **{'HTTP_AUTHORIZATION': "token " + self.token}) + print('Got response : ' + str(response.status_code) + " Expecting 200") + self.assertEqual(response.status_code, expectedStatus) + return response + + ### TESTS ### + def testUpdateNegativeWrongHttpMethodAccount(self): + self.initBody() + print("Negative Test: Wrong HTTP Method --> Expecting status code 405") + self.updateAccount(405, "POST") + + def testUpdateNegativeDiffPasswords(self): + self.initBody() + print("Negative Test: password!=confirm_password --> Expecting status code 400") + self.data['confirm_password'] = "fakePassword" + self.updateAccount(400, "PUT") + + def testUpdatePositive(self): + self.initBody() + print("Positive Test: --> Expecting status code 200") + self.updateAccount(200, "PUT") + + def testUpdateUserProfileNotificationsSettings(self): + self.initBody() + + self.data['regular_email_updates'] = False + self.data['email_updates_daily_digest'] = True + self.data['email_updates_on_every_notification'] = False + + print("Positive Test: --> Expecting status code 200") + + self.updateAccount(200, "PUT") + + user = IceUserProfile.objects.get(uuid=self.user.uuid) + self.assertEqual(user.regular_email_updates, False) + self.assertEqual(user.email_updates_daily_digest, True) + self.assertEqual(user.email_updates_on_every_notification, False) diff --git a/django/engagementmanager/tests/test_vfc.py b/django/engagementmanager/tests/test_vfc.py new file mode 100755 index 0000000..e7f374a --- /dev/null +++ b/django/engagementmanager/tests/test_vfc.py @@ -0,0 +1,180 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from rest_framework.status import HTTP_204_NO_CONTENT, HTTP_200_OK +from engagementmanager.models import Vendor +from engagementmanager.tests.test_base_entity import TestBaseEntity +from engagementmanager.utils.constants import Constants +from engagementmanager.service.logging_service import LoggingServiceFactory + +logger = LoggingServiceFactory.get_logger() + + +class VFCAPITestCase(TestBaseEntity): + + def childSetup(self): + self.createVendors([Constants.service_provider_company_name, 'Other']) + self.createDefaultRoles() + # Create a user with role el + self.service_provider = Vendor.objects.get(name=Constants.service_provider_company_name) + self.el_user = self.creator.createUser(self.service_provider, self.randomGenerator( + "main-vendor-email"), self.randomGenerator("randomNumber"), self.randomGenerator("randomString"), self.el, True) + logger.debug('-----------------------------------------------------') + logger.debug('Created User:') + logger.debug('UUID: ' + str(self.el_user.uuid)) + logger.debug('Full Name: ' + self.el_user.full_name) + logger.debug('-----------------------------------------------------') + self.peer_reviewer = self.creator.createUser(self.service_provider, self.randomGenerator( + "main-vendor-email"), self.randomGenerator("randomNumber"), self.randomGenerator("randomString"), self.el, True) + logger.debug('-----------------------------------------------------') + logger.debug('Created Peer Reviewer:') + logger.debug('UUID: ' + str(self.peer_reviewer.uuid)) + logger.debug('Full Name: ' + self.peer_reviewer.full_name) + logger.debug('-----------------------------------------------------') + + # Create a user with role standard_user + self.vendor = Vendor.objects.get(name='Other') + self.user = self.creator.createUser(self.vendor, self.randomGenerator("email"), self.randomGenerator( + "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True) + logger.debug('-----------------------------------------------------') + logger.debug('Created User:') + logger.debug('UUID: ' + str(self.user.uuid)) + logger.debug('Full Name: ' + self.user.full_name) + logger.debug('-----------------------------------------------------') + + # Create an Engagement with team + self.engagement = self.creator.createEngagement(self.randomGenerator( + "randomString"), self.randomGenerator("randomString"), None) + self.engagement.engagement_team.add(self.user, self.el_user, self.peer_reviewer) + self.engagement.peer_reviewer = self.peer_reviewer + self.engagement.reviewer = self.el_user + self.engagement.save() + logger.debug('-----------------------------------------------------') + logger.debug('Created Engagement:') + logger.debug('UUID: ' + str(self.engagement.uuid)) + logger.debug('-----------------------------------------------------') + + # Create a DT, DTSite, VF + self.deploymentTarget = self.creator.createDeploymentTarget( + self.randomGenerator("randomString"), self.randomGenerator("randomString")) + self.vf = self.creator.createVF(self.randomGenerator("randomString"), + self.engagement, self.deploymentTarget, False, self.vendor) + + logger.debug('-----------------------------------------------------') + logger.debug('Created VF:') + logger.debug('UUID: ' + str(self.vendor.uuid)) + logger.debug('-----------------------------------------------------') + + self.token = self.loginAndCreateSessionToken(self.user) + self.ELtoken = self.loginAndCreateSessionToken(self.el_user) + + def retrieveCurrentObjectsOfEngagementByFilter(self, method, detailOrObject, entity): + if method == 'filter': + list = entity.objects.filter(engagement=detailOrObject) + return list + elif method == 'get': + list = entity.objects.get(uuid=detailOrObject) + return list + + def loggerTestFailedOrSucceded(self, bool): + if bool: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test Succeeded") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + else: + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug("Test failed") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + + def testCreateDTSiteAddToVF(self): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: Create DTSite and add to VF") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + self.deploymentTargetSite = self.creator.createDeploymentTargetSite(self.randomGenerator("randomString")) + self.vf.deployment_target_sites.add(self.deploymentTargetSite) + sites = self.vf.deployment_target_sites.all() + num = 1 + for item in sites: + logger.debug(str(num) + ". " + item.name) + print("\n") + if (self.vf.deployment_target_sites.all().count() > 0): + self.loggerTestFailedOrSucceded(True) + else: + self.loggerTestFailedOrSucceded(False) + + def testDeleteVFC(self, expectedStatus=HTTP_204_NO_CONTENT): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: Delete VFC") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + vfc = self.creator.createVFC(self.randomGenerator("randomString"), self.randomGenerator( + "randomNumber"), self.vendor, self.vf, self.el_user) + urlStr = self.urlPrefix + 'vf/' + str(vfc.uuid) + '/vfcs/' + str(vfc.uuid) + if (vfc == None): + logger.error("vfc wasn't created successfully before the deletion attempt") + self.loggerTestFailedOrSucceded(False) + self.assertEqual(500, expectedStatus) + response = self.c.delete(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + logger.debug('Got response : ' + str(response.status_code)) + if (response.status_code == HTTP_204_NO_CONTENT): + self.loggerTestFailedOrSucceded(True) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, expectedStatus) + + def testGetVFC(self, expectedStatus=HTTP_200_OK): + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + logger.debug(" Test started: Get VFCs") + logger.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n\n") + vfc = self.creator.createVFC(self.randomGenerator("randomString"), self.randomGenerator( + "randomNumber"), self.vendor, self.vf, self.el_user) + vfc2 = self.creator.createVFC(self.randomGenerator("randomString"), self.randomGenerator( + "randomNumber"), self.service_provider, self.vf, self.el_user) + urlStr = self.urlPrefix + 'vf/' + str(self.vf.uuid) + '/vfcs/' + if (vfc == None or vfc2 == None): + logger.error("The VFCs were not created successfully before the deletion attempt") + self.loggerTestFailedOrSucceded(False) + self.assertEqual(500, expectedStatus) + response = self.c.get(urlStr, **{'HTTP_AUTHORIZATION': "token " + self.token}) + logger.debug('Got response : ' + str(response.status_code)) + logger.debug("VFCs found in the VF(through the GET request):") + logger.debug(str(response.content)) + if (response.status_code == HTTP_200_OK): + self.loggerTestFailedOrSucceded(True) + else: + self.loggerTestFailedOrSucceded(False) + self.assertEqual(response.status_code, expectedStatus) diff --git a/django/engagementmanager/tests/vvpEntitiesCreator.py b/django/engagementmanager/tests/vvpEntitiesCreator.py new file mode 100755 index 0000000..0ba619e --- /dev/null +++ b/django/engagementmanager/tests/vvpEntitiesCreator.py @@ -0,0 +1,290 @@ +# +# ============LICENSE_START========================================== +# org.onap.vvp/engagementmgr +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +from uuid import uuid4 +from django.utils import timezone +import string +import random +from engagementmanager.service.logging_service import LoggingServiceFactory +from engagementmanager.utils.authentication import JWTAuthentication +from engagementmanager.models import Vendor, IceUserProfile, Role, Engagement, DeploymentTarget, ApplicationServiceInfrastructure, VF,\ + NextStep, ChecklistTemplate, DeploymentTargetSite, VFC, Checklist, ChecklistSection, ChecklistLineItem,\ + CustomUser +from engagementmanager.utils.constants import Constants + +logger = LoggingServiceFactory.get_logger() + + +class VvpEntitiesCreator: + + def __init__(self): + pass + + def createUserTemplate(self, company, email, full_name, role, is_service_provider_contact, ssh_key=None, ): + return { + 'company': company, + 'email': email, + 'phone_number': '555019900', + 'full_name': full_name, + 'role': role, + 'create_time': timezone.now(), + 'is_service_provider_contact': is_service_provider_contact, + 'ssh_public_key': ssh_key, + } + + def createEngagementTemplate(self, engagement_type, engagement_team): + return { + # 'engagement_type' : engagement_type, + # 'members' : engagement_team, + # 'start_date' : timezone.now(), + 'progress': 0 + } + + def createVendor(self, vendorName): + ven = Vendor.objects.filter(name=vendorName) + if (not ven.exists()): + vendor = Vendor.objects.create(name=vendorName, public=True) + else: + vendor = Vendor.objects.get(name=vendorName) + return vendor.uuid, vendor + + def createDeploymentTarget(self, name, version): + deployment = DeploymentTarget.objects.create( + name=name, version=version) + return deployment + + def createDeploymentTargetSite(self, name): + dtsite = DeploymentTargetSite.objects.create(name=name) + return dtsite + + def createApplicationServiceInfrastructure(self, name): + asinfrastructure = ApplicationServiceInfrastructure.objects.create( + name=name) + return asinfrastructure + + def getOrCreateIfNotExist(self, entity, getCriteria, creationFields): + obj = None + try: + logger.debug("about to look for object: " + str(getCriteria)) + obj = entity.objects.get(**getCriteria) + logger.debug('found object') + except entity.DoesNotExist: + logger.error('not found. Trying to create...') + obj = entity.objects.create(**creationFields) + return obj + + def createAndGetDefaultRoles(self): + admin = self.getOrCreateIfNotExist( + Role, {'name': 'admin'}, {'name': 'admin'}) + el = self.getOrCreateIfNotExist(Role, {'name': 'el'}, {'name': 'el'}) + standard_user = self.getOrCreateIfNotExist(Role, {'name': 'standard_user'}, { + 'name': 'standard_user'}) + return admin, el, standard_user + + def createUser(self, company, email, phone, fullName, role, is_active=False, ssh_key=None, activation_token_create_time=timezone.now()): + try: + user, is_user_created = CustomUser.objects.get_or_create(username=email, defaults={'email': email, 'password': '12345678', 'activation_token': uuid4( + ), 'activation_token_create_time': timezone.now(), 'last_login': timezone.now(), 'is_active': is_active}) + user_profile, is_profile_created = IceUserProfile.objects.update_or_create( + email=email, defaults=self.createUserTemplate(company, email, fullName, role, False, ssh_key)) + except Exception as e: + logger.error("VvpEntitiesCreator - createUser - error:") + logger.error(e) + return None + return user_profile + + def createEngagement(self, uuid, engagement_type, engagement_team): + return self.getOrCreateIfNotExist(Engagement, {'uuid': uuid}, self.createEngagementTemplate(engagement_type, engagement_team)) + + def createVF(self, name, engagement, deployment, is_service_provider_internal, vendor, **kwargs): + vf = VF.objects.create(name=name, engagement=engagement, + deployment_target=deployment, + is_service_provider_internal=is_service_provider_internal, + vendor=vendor, + target_lab_entry_date=timezone.now(), **kwargs) + return vf + + def createNextStep(self, uuid, createFields): + return self.getOrCreateIfNotExist(NextStep, {'uuid': uuid}, createFields) + + def createDefaultCheckListTemplate(self): + checklist_templates = [ + { + 'name': 'Heat', + 'category': 'first category', + 'version': 1, + 'sections': [ + { + 'name': 'Parameter Specification', + 'weight': 1, + 'description': 'section description', + 'validation_instructions': 'valid instructions', + 'lineitems': [ + { + 'name': 'Parameters', + 'weight': 1, + 'description': 'Numeric parameters should include range and/or allowed values.', + 'validation_instructions': 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', + 'line_type': 'auto', + }, + { + 'name': 'String parameters', + 'weight': 1, + 'description': 'Numeric parameters should include range and/or allowed values.', + 'validation_instructions': 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', + 'line_type': 'auto', + }, + { + 'name': 'Numeric parameters', + 'weight': 1, + 'description': 'Numeric parameters should include range and/or allowed values.', + 'validation_instructions': 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', + 'line_type': 'auto', + } + ] + }, + { + 'name': 'External References', + 'weight': 1, + 'description': 'section descripyion', + 'validation_instructions': 'valid instructions', + 'lineitems': [ + { + 'name': 'Normal references', + 'weight': 1, + 'description': 'Numeric parameters should include range and/or allowed values.', + 'validation_instructions': 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', + 'line_type': 'manual', + }, + { + 'name': 'VF image', + 'weight': 1, + 'description': 'Numeric parameters should include range and/or allowed values.', + 'validation_instructions': 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', + 'line_type': 'auto', + } + ] + + } + ] + } + ] + + for template in checklist_templates: + created_template = ChecklistTemplate.objects.get_or_create(name=template['name'], defaults={ + 'category': template['category'], + 'version': template['version'], + 'create_time': timezone.now() + }) + created_template = ChecklistTemplate.objects.get( + name=template['name']) + for section in template['sections']: + created_section = ChecklistSection.objects.get_or_create( + name=section['name'], + template_id=created_template.uuid, defaults={ + 'weight': section['weight'], + 'description': section['description'], + 'validation_instructions': section['validation_instructions'] + + }) + created_section = ChecklistSection.objects.get( + name=section['name'], template_id=created_template.uuid) + for lineitem in section['lineitems']: + created_lineitem = ChecklistLineItem.objects.get_or_create( + name=lineitem['name'], + template_id=created_template.uuid, + defaults={ + 'weight': lineitem['weight'], + 'description': lineitem['description'], + 'validation_instructions': lineitem['validation_instructions'], + 'line_type': lineitem['line_type'], + 'section_id': created_section.uuid, + }) + + self.defaultCheklistTemplate = ChecklistTemplate.objects.get( + name="Heat") + + return self.defaultCheklistTemplate + + def createCheckList(self, name, state, validation_cycle, associated_files, engagement, template, creator, owner): + if (associated_files == None): + associated_files = '{}' + return self.getOrCreateIfNotExist(Checklist, {'name': name}, { + 'state': state, + 'name': name, + 'validation_cycle': 1, + 'associated_files': associated_files, + 'engagement': engagement, + 'template': template, + 'creator': creator, + 'owner': owner + }) + + def createVFC(self, name, ext_ref, company, vf, creator): + obj = None + try: + obj = VFC.objects.get(name=name) + except VFC.DoesNotExist: + return VFC.objects.create(name=name, external_ref_id=ext_ref, company=company, creator=creator, vf=vf) + return obj + + def randomGenerator(self, typeOfValue, numberOfDigits=0): + lettersAndNumbers = string.ascii_letters + string.digits + if typeOfValue == 'email': + myEmail = ''.join(random.choice(lettersAndNumbers) for _ in range(4)) + "@" + \ + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com" + return myEmail + elif typeOfValue == 'main-vendor-email': + myEmail = ''.join(random.choice(lettersAndNumbers) for _ in range(4)) + "@" + \ + Constants.service_provider_mail_domain[0] + return myEmail + elif typeOfValue == 'randomNumber': + randomNumber = ''.join("%s" % random.randint(0, 9) for _ in range(0, (numberOfDigits + 1))) + return randomNumber + elif typeOfValue == 'randomString': + randomString = "".join(random.sample(lettersAndNumbers, 5)) + return randomString + else: + logger.debug("Error on randonGenerator - given bad value.") + exit + + def loginAndCreateSessionToken(self, user): + jwt_service = JWTAuthentication() + token = jwt_service.create_token(user.user) + logger.debug("token " + token) + return token |