summaryrefslogtreecommitdiffstats
path: root/newton/newton/requests
diff options
context:
space:
mode:
authorVictor Morales <victor.morales@intel.com>2018-01-09 14:52:07 -0800
committerVictor Morales <victor.morales@intel.com>2018-01-09 14:52:07 -0800
commitca4927c2d5521ac6cd4f739b5e47fd4259e3c375 (patch)
treea6868e6b002c8d46df4a0b75cbed0b1c18a5779b /newton/newton/requests
parent7330272e07e7bc3b72bf17cc4eb69ea9584697de (diff)
Add UTs for VimDriverUtils class
The VimDriverUtils class contains utility methods for managing sessions and vim information, as well as others for replacing keys of a dictionary. This change add Unit Tests that validates their correct functionality. Change-Id: Ie569cc995c3e0ebc27f33e4c58652bb0ed34c0a7 Signed-off-by: Victor Morales <victor.morales@intel.com> Issue-ID: MULTICLOUD-83
Diffstat (limited to 'newton/newton/requests')
-rw-r--r--newton/newton/requests/tests/test_util.py159
-rw-r--r--newton/newton/requests/views/util.py176
2 files changed, 233 insertions, 102 deletions
diff --git a/newton/newton/requests/tests/test_util.py b/newton/newton/requests/tests/test_util.py
new file mode 100644
index 00000000..4db754fe
--- /dev/null
+++ b/newton/newton/requests/tests/test_util.py
@@ -0,0 +1,159 @@
+# Copyright (c) 2018 Intel Corporation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from django.core.cache import cache
+import mock
+import unittest
+
+from newton.requests.views import util
+from newton.requests.tests import mock_info
+
+
+class TestUtil(unittest.TestCase):
+
+ def test_get_query(self):
+ query_string = "name=ferret&color=purple"
+ mock_request = mock.Mock()
+ mock_request.get_full_path.side_effect = [
+ "path/to/page?" + query_string,
+ "path/to/page"
+ ]
+
+ self.assertEqual(
+ query_string, util.VimDriverUtils.get_query_part(
+ mock_request))
+ self.assertEqual(
+ "", util.VimDriverUtils.get_query_part( mock_request))
+
+ def test_get_new_openstack_v2_session_with_tenant_id(self):
+ vim_info = mock_info.MOCK_VIM_INFO.copy()
+ vim_info["url"] = "http://128.224.180.14:5000/v2"
+ tenant_it = "1a62b3971d774404a504c5d9a3e506e3"
+
+ os_session = util.VimDriverUtils.get_session(
+ vim_info, tenant_it)
+
+ self.assertIsNotNone(os_session)
+ self.assertIsNotNone(os_session.auth)
+ self.assertEqual(vim_info["url"], os_session.auth.auth_url)
+ self.assertEqual(vim_info["userName"],
+ os_session.auth.username)
+ self.assertEqual(vim_info["password"],
+ os_session.auth.password)
+
+ def test_get_new_openstack_v3_session_with_project_id(self):
+ projectid = "1a62b3971d774404a504c5d9a3e506e3"
+ os_session = util.VimDriverUtils.get_session(
+ mock_info.MOCK_VIM_INFO, projectid)
+
+ self.assertIsNotNone(os_session)
+ self.assertIsNotNone(os_session.auth)
+ self.assertEqual(mock_info.MOCK_VIM_INFO["url"],
+ os_session.auth.auth_url)
+ self.assertEqual(mock_info.MOCK_VIM_INFO["domain"],
+ os_session.auth.project_domain_name)
+ self.assertEqual(projectid,
+ os_session.auth.project_id)
+
+ def test_get_new_openstack_session_with_project_id(self):
+ vim_info = mock_info.MOCK_VIM_INFO.copy()
+ vim_info["url"] = "http://128.224.180.14:5000"
+ project_id = "1a62b3971d774404a504c5d9a3e506e3"
+
+ os_session = util.VimDriverUtils.get_session(
+ vim_info, project_id)
+
+ self.assertIsNotNone(os_session)
+ self.assertIsNotNone(os_session.auth)
+ self.assertEqual(vim_info["url"] + "/v3",
+ os_session.auth.auth_url[0])
+
+ def test_get_new_openstack_v3_session_with_project_name(self):
+ project_name = "demo"
+ os_session = util.VimDriverUtils.get_session(
+ mock_info.MOCK_VIM_INFO, tenant_name=project_name)
+
+ self.assertIsNotNone(os_session)
+ self.assertIsNotNone(os_session.auth)
+ self.assertEqual(project_name,
+ os_session.auth.project_name)
+
+ def test_get_auth_state_from_valid_session(self):
+ test_result = "auth_state"
+
+ mock_auth = mock.Mock()
+ mock_auth.get_auth_state.return_value = test_result
+ mock_session = mock.Mock()
+ mock_session._auth_required.return_value = mock_auth
+
+ auth_state = util.VimDriverUtils.get_auth_state(mock_session)
+
+ self.assertIsNotNone(auth_state)
+ self.assertEqual(test_result, auth_state)
+
+ def test_get_auth_state_from_invalid_session(self):
+ mock_session = mock.Mock()
+ mock_session._auth_required.return_value = None
+
+ self.assertIsNone(util.VimDriverUtils.get_auth_state(
+ mock_session))
+
+ @mock.patch.object(cache, 'get')
+ def test_get_valid_tokens_from_cache(self, mock_cache_get):
+ mock_cache_get.return_value = "valid_token"
+
+ token, meta_token = util.VimDriverUtils.get_token_cache(
+ "token")
+ self.assertIsNotNone(token)
+ self.assertIsNotNone(meta_token)
+
+ @mock.patch.object(cache, 'get')
+ def test_update_cache_expired_info(self, mock_cache_get):
+ mock_cache_get.return_value = None
+
+ util.VimDriverUtils.update_token_cache(
+ "token", "auth_state", "metadata")
+
+ @mock.patch.object(cache, 'get')
+ def test_update_cache_info(self, mock_cache_get):
+ mock_cache_get.return_value = "existing"
+
+ util.VimDriverUtils.update_token_cache(
+ "token", "auth_state", "metadata")
+
+ def test_replace_keys_of_dict(self):
+ dict_obj = {
+ "project_id": "demo",
+ "ram": "16G"
+ }
+ new_keys = ["tenantId", "memory"]
+ mapping = [(o, n) for o, n in zip(dict_obj.keys(), new_keys)]
+ util.VimDriverUtils.replace_key_by_mapping(
+ dict_obj, mapping)
+
+ self.assertEqual(len(new_keys), len(dict_obj.keys()))
+ self.assertEqual(sorted(new_keys), sorted(dict_obj.keys()))
+
+ def test_replace_keys_reverse_order(self):
+ dict_obj = {
+ "project_id": "demo",
+ "ram": "16G"
+ }
+ new_keys = ["tenantId", "memory"]
+ mapping = [(n, o) for o, n in zip(dict_obj.keys(), new_keys)]
+ util.VimDriverUtils.replace_key_by_mapping(
+ dict_obj, mapping, reverse=True)
+
+ self.assertEqual(len(new_keys), len(dict_obj.keys()))
+ self.assertEqual(sorted(new_keys), sorted(dict_obj.keys())) \ No newline at end of file
diff --git a/newton/newton/requests/views/util.py b/newton/newton/requests/views/util.py
index ba383475..d2363252 100644
--- a/newton/newton/requests/views/util.py
+++ b/newton/newton/requests/views/util.py
@@ -14,6 +14,7 @@
import logging
+from django.conf import settings
from django.core.cache import cache
from keystoneauth1.identity import v2 as keystone_v2
from keystoneauth1.identity import v3 as keystone_v3
@@ -27,10 +28,14 @@ logger = logging.getLogger(__name__)
class VimDriverUtils(object):
@staticmethod
def get_vim_info(vimid):
- # get vim info from local cache firstly
- # if cache miss, get it from ESR service
- vim = extsys.get_vim_by_id(vimid)
- return vim
+ """
+ Retrieve VIM information.
+
+ :param vimid: VIM Identifier
+ :return: VIM information
+ """
+ # TODO: get vim info from local cache firstly later from ESR
+ return extsys.get_vim_by_id(vimid)
@staticmethod
def delete_vim_info(vimid):
@@ -41,65 +46,45 @@ class VimDriverUtils(object):
query = ""
full_path = request.get_full_path()
if '?' in full_path:
- _, query = request.get_full_path().split('?')
+ _, query = full_path.split('?')
return query
@staticmethod
- def get_session(vim, tenantid=None, tenantname=None, auth_state=None):
+ def get_session(
+ vim, tenant_id=None, tenant_name=None, auth_state=None):
"""
get session object and optionally preload auth_state
"""
auth = None
- #tenantid takes precedence over tenantname
- if not tenantid:
- #input tenant name takes precedence over the default one from AAI data store
- tenant_name = tenantname if tenantname else vim['tenant']
-
- if tenantid:
- if '/v2' in vim["url"]:
- auth = keystone_v2.Password(auth_url=vim["url"],
- username=vim["userName"],
- password=vim["password"],
- tenant_id=tenantid)
- elif '/v3' in vim["url"]:
- auth = keystone_v3.Password(auth_url=vim["url"],
- username=vim["userName"],
- password=vim["password"],
- user_domain_name=vim["domain"],
- project_id=tenantid)
- #elif '/identity' in vim["url"]:
- else:
- auth = keystone_v3.Password(auth_url=vim["url"]+"/v3",
- username=vim["userName"],
- password=vim["password"],
- user_domain_name=vim["domain"],
- project_id=tenantid)
- elif tenant_name:
- if '/v2' in vim["url"]:
- auth = keystone_v2.Password(auth_url=vim["url"],
- username=vim["userName"],
- password=vim["password"],
- tenant_name=tenant_name)
- elif '/v3' in vim["url"]:
- auth = keystone_v3.Password(auth_url=vim["url"],
- username=vim["userName"],
- password=vim["password"],
- project_name=tenant_name,
- user_domain_name=vim["domain"],
- project_domain_name=vim["domain"])
- #elif '/identity' in vim["url"]:
- else:
- auth = keystone_v3.Password(auth_url=vim["url"]+"/v3",
- username=vim["userName"],
- password=vim["password"],
- project_name=tenant_name,
- user_domain_name=vim["domain"],
- project_domain_name=vim["domain"])
+ params = {
+ "auth_url": vim["url"],
+ "username": vim["userName"],
+ "password": vim["password"],
+ }
+ # tenantid takes precedence over tenantname
+ if tenant_id:
+ params["tenant_id"] = tenant_id
else:
- #something wrong
- return None
+ # input tenant name takes precedence over the default one
+ # from AAI data store
+ params["tenant_name"] = (tenant_name if tenant_name
+ else vim['tenant'])
+
+ if '/v2' in params["auth_url"]:
+ auth = keystone_v2.Password(**params)
+ else:
+ params["user_domain_name"] = vim["domain"]
+ params["project_domain_name"] = vim["domain"]
+
+ if 'tenant_id' in params:
+ params["project_id"] = params.pop("tenant_id")
+ if 'tenant_name' in params:
+ params["project_name"] = params.pop("tenant_name")
+ if '/v3' not in params["auth_url"]:
+ params["auth_url"] = params["auth_url"] + "/v3",
+ auth = keystone_v3.Password(**params)
#preload auth_state which was acquired in last requests
if auth_state:
@@ -107,71 +92,58 @@ class VimDriverUtils(object):
return session.Session(auth=auth)
-
@staticmethod
- def get_auth_state(vim, session):
- auth = session._auth_required(None, 'fetch a token')
- if not auth:
- return None
-
- #trigger the authenticate request
- session.get_auth_headers(auth)
-
-# norm_expires = utils.normalize_time(auth.expires)
+ def get_auth_state(session_obj):
+ """
+ Retrieve the authorization state
+ :param session: OpenStack Session object
+ :return: return a string dump of json object with token and
+ resp_data of authentication request
+ """
+ auth = session_obj._auth_required(None, 'fetch a token')
+ if auth:
+ #trigger the authenticate request
+ session_obj.get_auth_headers(auth)
- #return a string dump of json object with token and resp_data of authentication request
- return auth.get_auth_state()
-# return auth.get_auth_ref(session)
+ # norm_expires = utils.normalize_time(auth.expires)
+ return auth.get_auth_state()
@staticmethod
def get_token_cache(token):
- '''
+ """
get auth_state and metadata fromm cache
:param token:
:return:
- '''
+ """
return cache.get(token), cache.get("meta_%s" % token)
-
@staticmethod
- def update_token_cache(vim, session, token, auth_state, metadata=None):
- '''
- cache the auth_state as well as metadata_catalog
- :param vim:
- :param session:
- :param token:
- :param auth_state:
- :param matadata:
- :return:
- '''
-
- if metadata == None: #do not update token any more
- return token
-
- metadata_key = "meta_%s" % token
-
- if not cache.get(token):
- # store the auth_state, memcached
- # set expiring in 1 hour
- cache.set(token, auth_state, 3600)
- cache.set(metadata_key, metadata, 3600)
-
- return token
+ def update_token_cache(token, auth_state, metadata):
+ """
+ Stores into the cache the auth_state and metadata_catalog
+ information.
+ :param token: Base token to be used as an identifier
+ :param auth_state: Authorization information
+ :param metadata: Metadata Catalog information
+ """
+ if metadata and not cache.get(token):
+ cache.set(
+ token, auth_state, settings.CACHE_EXPIRATION_TIME)
+ cache.set(
+ "meta_%s" % token, metadata,
+ settings.CACHE_EXPIRATION_TIME)
@staticmethod
- def replace_a_key(dict_obj, keypair, reverse=False):
- old_key, new_key = None, None
- if reverse:
- old_key, new_key = keypair[1], keypair[0]
- else:
- old_key, new_key = keypair[0], keypair[1]
+ def _replace_a_key(dict_obj, key_pair, reverse):
+ old_key = key_pair[1] if reverse else key_pair[0]
+ new_key = key_pair[0] if reverse else key_pair[1]
- v = dict_obj.pop(old_key, None)
- if v:
- dict_obj[new_key] = v
+ old_value = dict_obj.pop(old_key, None)
+ if old_value:
+ dict_obj[new_key] = old_value
@staticmethod
def replace_key_by_mapping(dict_obj, mapping, reverse=False):
for k in mapping:
- VimDriverUtils.replace_a_key(dict_obj, k, reverse)
+ VimDriverUtils._replace_a_key(dict_obj, k, reverse)