summaryrefslogtreecommitdiffstats
path: root/django/engagementmanager/tests/test_access_credentials.py
diff options
context:
space:
mode:
Diffstat (limited to 'django/engagementmanager/tests/test_access_credentials.py')
-rwxr-xr-xdjango/engagementmanager/tests/test_access_credentials.py175
1 files changed, 175 insertions, 0 deletions
diff --git a/django/engagementmanager/tests/test_access_credentials.py b/django/engagementmanager/tests/test_access_credentials.py
new file mode 100755
index 0000000..73d427c
--- /dev/null
+++ b/django/engagementmanager/tests/test_access_credentials.py
@@ -0,0 +1,175 @@
+#
+# ============LICENSE_START==========================================
+# org.onap.vvp/engagementmgr
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+from engagementmanager.tests.test_base_entity import TestBaseEntity
+import json
+from boto.s3.connection import S3Connection, OrdinaryCallingFormat
+from django.conf import settings
+from wheel.signatures import assertTrue
+from django.utils import timezone
+from engagementmanager.utils.constants import Constants
+from engagementmanager.vm_integration import vm_client
+from validationmanager.rados.rgwa_client import RGWAClient
+from validationmanager.tests.test_rgwa_client_factory import TestRGWAClientFactory
+
+
+class ActivateTestCase(TestBaseEntity):
+
+ def childSetup(self): # Variables to use in this class.
+ self.s3_host = '10.252.0.21' # settings.AWS_S3_HOST
+ self.s3_port = 8080 # settings.AWS_S3_PORT
+
+ self.urlStr = self.urlPrefix + "signup/"
+ self.createDefaultRoles()
+ uuid, vendor = self.creator.createVendor(Constants.service_provider_company_name)
+ self.activation_token_time = timezone.now()
+ self.activation_token_time = self.activation_token_time.replace(
+ 2012, 1, 2, 13, 48, 25)
+ print("This is the time that is going to be added to expiredTokenUser: " +
+ str(self.activation_token_time))
+ self.user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator(
+ "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True)
+ self.new_user = self.creator.createUser(vendor, self.randomGenerator("email"), self.randomGenerator(
+ "randomNumber"), self.randomGenerator("randomString"), self.standard_user, True)
+ print('-----------------------------------------------------')
+ print('Created User:')
+ print('UUID: ' + str(self.user.uuid))
+ print('Full Name: ' + self.user.full_name)
+ print('-----------------------------------------------------')
+ self.params = '{"company":"' + str(self.user.company) + '","full_name":"' + self.user.full_name + '","email":"' + self.user.email + '","phone_number":"' + self.user.phone_number + \
+ '","password":"' + self.user.user.password + '","regular_email_updates":"' + \
+ str(self.user.regular_email_updates) + \
+ '","is_service_provider_contact":"' + str(self.user.is_service_provider_contact) + '"}'
+ self.userToken = self.loginAndCreateSessionToken(self.user)
+ self.new_user_token = self.loginAndCreateSessionToken(self.new_user)
+
+ def test_validate_rgwauser_created_after_Activation(self):
+ if settings.IS_SIGNAL_ENABLED:
+ vm_client.fire_event_in_bg(
+ 'send_create_user_in_rgwa_event', self.user)
+ rgwa = TestRGWAClientFactory.admin()
+ rgwa_user = rgwa.get_user(self.user.full_name)
+ if rgwa_user is None:
+ print("Test Failed!")
+ else:
+ access_key = rgwa_user['access_key']
+ print("######access_key#################= ", access_key)
+ secret_key = rgwa_user['secret_key']
+ print("######secret_key#################= ", secret_key)
+ self.assertTrue(access_key and secret_key != None)
+ print("#################################")
+ print("Test PASS!")
+ self.printTestName("Test ended")
+
+
+# Whenever a new user is configured, we should create a RadosGW user for them.
+# If unspecified, the access keys are generated on the server and returned here
+# in the response.
+ def testCreateAndGetRgwaUser(self):
+ if settings.IS_SIGNAL_ENABLED:
+ base_url = 'http://{S3_HOST}:{S3_PORT}/admin'.format(
+ S3_HOST=self.s3_host, # settings.AWS_S3_HOST,
+ S3_PORT=self.s3_port, # settings.AWS_S3_PORT,
+ )
+ admin_conn = RGWAClient(base_url)
+ print("base_url=" + base_url)
+ print("S3_HOST = " + self.s3_host)
+ print("s3_port =" + str(self.s3_port))
+ print("admin_conn= ", admin_conn)
+ username = self.randomGenerator("randomString")
+ print("username", username)
+ new_user = admin_conn.create_user(
+ uid=username, display_name='User "%s"' % username)
+ print("new_user = " + str(new_user))
+ self.assertTrue(new_user['user_id'] != None)
+ get_user = admin_conn.get_user(new_user['user_id'])
+ self.assertTrue(new_user['user_id'] == get_user['user_id'])
+
+ def testCreateAndGetRgwaBucket(self):
+ if settings.IS_SIGNAL_ENABLED:
+ s3aws_access_key_id = settings.AWS_ACCESS_KEY_ID
+ s3aws_secret_access_key = settings.AWS_SECRET_ACCESS_KEY
+ print("s3aws_access_key_id=" + s3aws_access_key_id)
+ print("s3aws_secret_access_key=" + s3aws_secret_access_key)
+ print("S3_HOST = " + self.s3_host)
+ print("s3_port =" + str(self.s3_port))
+
+ boto_conn = S3Connection(host=self.s3_host,
+ port=self.s3_port,
+ aws_access_key_id=s3aws_access_key_id,
+ aws_secret_access_key=s3aws_secret_access_key,
+ calling_format=OrdinaryCallingFormat(),
+ is_secure=False,
+ )
+ boto_conn.num_retries = 0
+ bucketname = self.randomGenerator("randomString").lower()
+ new_bucket = boto_conn.create_bucket(bucketname)
+ assertTrue(new_bucket != None)
+ print("new_bucket = " + str(new_bucket))
+ bucket = boto_conn.get_bucket(bucketname)
+ print("bucket = " + str(bucket))
+ bucket_acl = bucket.get_acl()
+ print("bucket_acl = " + str(bucket_acl))
+
+ def testGetSecretKey(self):
+ vm_client.send_create_user_in_rgwa_event(self.user)
+ urlStr = self.urlPrefix + 'users/account/rgwa/'
+ print("urlStr of get secret key",urlStr)
+ self.printTestName("testGetSecretKey [Start]")
+ response = self.c.get(urlStr, data={}, content_type='application/json',
+ **{'HTTP_AUTHORIZATION': "token " + self.userToken})
+ print('Got response : ' + str(response.status_code))
+ dict_response = json.loads(response.content)
+ print("api response",dict_response)
+ self.assertTrue(dict_response["rgwa_secret_key"] is not None)
+ self.printTestName("testGetSecretKey [End]")
+
+ def testNegativeGetSecretKeyInvalidToken(self):
+ vm_client.send_create_user_in_rgwa_event(self.user)
+ urlStr = self.urlPrefix + 'users/account/rgwa/'
+ print("urlStr of get secret key",urlStr)
+ self.printTestName("testGetSecretKey [Start]")
+ response = self.c.get(urlStr, data={}, content_type='application/json',
+ **{'HTTP_AUTHORIZATION': 'token' + self.new_user_token})
+ print('Got response : ' + str(response.status_code))
+ dict_response = json.loads(response.content)
+ print("api response",dict_response)
+ self.assertTrue(dict_response[
+ "detail"] == 'You must authenticate in order to ' +
+ 'perform this action: Authentication credentials were not provided.')
+ self.printTestName("testGetSecretKey [End]")