summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbiancunkang <bian.cunkang@zte.com.cn>2018-08-30 19:39:56 +0800
committerbiancunkang <bian.cunkang@zte.com.cn>2018-08-30 20:21:43 +0800
commit3f27305089f36acfb42f6c7c2ede316da8308f3b (patch)
tree8ad705534c228540283d3df1d8072c93c48ac858
parent9b024778eded69fddd2d4afb6795a0404ca442c4 (diff)
Add test for pnf descriptor
Add test for exception Change-Id: Ie9e7e22aa4ac56a9e08e45903a0dc7a7eb0bee2f Issue-ID: VFC-1038 Signed-off-by: biancunkang <bian.cunkang@zte.com.cn>
-rw-r--r--catalog/packages/biz/pnf_descriptor.py205
-rw-r--r--catalog/packages/tests/test_pnf_descriptor.py42
-rw-r--r--catalog/packages/views/pnf_descriptor_views.py15
3 files changed, 152 insertions, 110 deletions
diff --git a/catalog/packages/biz/pnf_descriptor.py b/catalog/packages/biz/pnf_descriptor.py
index 5cd20fd3..188bc708 100644
--- a/catalog/packages/biz/pnf_descriptor.py
+++ b/catalog/packages/biz/pnf_descriptor.py
@@ -28,61 +28,109 @@ from catalog.packages.const import PKG_STATUS
logger = logging.getLogger(__name__)
-def create(data):
- logger.info('Start to create a PNFD...')
- user_defined_data = ignore_case_get(data, 'userDefinedData')
- data = {
- 'id': str(uuid.uuid4()),
- 'pnfdOnboardingState': PKG_STATUS.CREATED,
- 'pnfdUsageState': PKG_STATUS.NOT_IN_USE,
- 'userDefinedData': user_defined_data,
- '_links': None # TODO
- }
- PnfPackageModel.objects.create(
- pnfPackageId=data['id'],
- onboardingState=data['pnfdOnboardingState'],
- usageState=data['pnfdUsageState'],
- userDefinedData=data['userDefinedData']
- )
- logger.info('A PNFD(%s) has been created.' % data['id'])
- return data
-
-
-def query_multiple():
- pnf_pkgs = PnfPackageModel.objects.all()
- response_data = []
- for pnf_pkg in pnf_pkgs:
- data = fill_response_data(pnf_pkg)
- response_data.append(data)
- return response_data
-
-
-def query_single(pnfd_info_id):
- pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
- if not pnf_pkgs.exists():
- logger.error('PNFD(%s) does not exist.' % pnfd_info_id)
- raise ResourceNotFoundException('PNFD(%s) does not exist.' % pnfd_info_id)
- return fill_response_data(pnf_pkgs[0])
-
-
-def upload(remote_file, pnfd_info_id):
- logger.info('Start to upload PNFD(%s)...' % pnfd_info_id)
- pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
- if not pnf_pkgs.exists():
- logger.info('PNFD(%s) does not exist.' % pnfd_info_id)
- raise CatalogException('PNFD (%s) does not exist.' % pnfd_info_id)
-
- pnf_pkgs.update(onboardingState=PKG_STATUS.UPLOADING)
- local_file_name = remote_file.name
- local_file_dir = os.path.join(CATALOG_ROOT_PATH, pnfd_info_id)
- local_file_name = os.path.join(local_file_dir, local_file_name)
- if not os.path.exists(local_file_dir):
- fileutil.make_dirs(local_file_dir)
- with open(local_file_name, 'wb') as local_file:
- for chunk in remote_file.chunks(chunk_size=1024 * 8):
- local_file.write(chunk)
- logger.info('PNFD(%s) content has been uploaded.' % pnfd_info_id)
- return local_file_name
+class PnfPackage(object):
+
+ def __init__(self):
+ pass
+
+ def create(self, data):
+ logger.info('Start to create a PNFD...')
+ user_defined_data = ignore_case_get(data, 'userDefinedData')
+ data = {
+ 'id': str(uuid.uuid4()),
+ 'pnfdOnboardingState': PKG_STATUS.CREATED,
+ 'pnfdUsageState': PKG_STATUS.NOT_IN_USE,
+ 'userDefinedData': user_defined_data,
+ '_links': None # TODO
+ }
+ PnfPackageModel.objects.create(
+ pnfPackageId=data['id'],
+ onboardingState=data['pnfdOnboardingState'],
+ usageState=data['pnfdUsageState'],
+ userDefinedData=data['userDefinedData']
+ )
+ logger.info('A PNFD(%s) has been created.' % data['id'])
+ return data
+
+ def query_multiple(self):
+ pnf_pkgs = PnfPackageModel.objects.all()
+ response_data = []
+ for pnf_pkg in pnf_pkgs:
+ data = fill_response_data(pnf_pkg)
+ response_data.append(data)
+ return response_data
+
+ def query_single(self, pnfd_info_id):
+ pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
+ if not pnf_pkgs.exists():
+ logger.error('PNFD(%s) does not exist.' % pnfd_info_id)
+ raise ResourceNotFoundException('PNFD(%s) does not exist.' % pnfd_info_id)
+ return fill_response_data(pnf_pkgs[0])
+
+ def upload(self, remote_file, pnfd_info_id):
+ logger.info('Start to upload PNFD(%s)...' % pnfd_info_id)
+ pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
+ if not pnf_pkgs.exists():
+ logger.info('PNFD(%s) does not exist.' % pnfd_info_id)
+ raise CatalogException('PNFD (%s) does not exist.' % pnfd_info_id)
+
+ pnf_pkgs.update(onboardingState=PKG_STATUS.UPLOADING)
+ local_file_name = remote_file.name
+ local_file_dir = os.path.join(CATALOG_ROOT_PATH, pnfd_info_id)
+ local_file_name = os.path.join(local_file_dir, local_file_name)
+ if not os.path.exists(local_file_dir):
+ fileutil.make_dirs(local_file_dir)
+ with open(local_file_name, 'wb') as local_file:
+ for chunk in remote_file.chunks(chunk_size=1024 * 8):
+ local_file.write(chunk)
+ logger.info('PNFD(%s) content has been uploaded.' % pnfd_info_id)
+ return local_file_name
+
+ def delete_single(self, pnfd_info_id):
+ logger.info('Start to delete PNFD(%s)...' % pnfd_info_id)
+ pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
+ if not pnf_pkgs.exists():
+ logger.info('PNFD(%s) has been deleted.' % pnfd_info_id)
+ return
+ '''
+ if pnf_pkgs[0].usageState != PKG_STATUS.NOT_IN_USE:
+ logger.info('PNFD(%s) shall be NOT_IN_USE.' % pnfd_info_id)
+ raise CatalogException('PNFD(%s) shall be NOT_IN_USE.' % pnfd_info_id)
+ '''
+ ns_pkgs = NSPackageModel.objects.all()
+ for ns_pkg in ns_pkgs:
+ nsd_model = None
+ if ns_pkg.nsdModel:
+ nsd_model = json.JSONDecoder().decode(ns_pkg.nsdModel)
+ pnf_info_ids = []
+ for pnf in nsd_model['pnfs']:
+ pnfd_id = pnf["properties"]["id"]
+ pkgs = PnfPackageModel.objects.filter(pnfdId=pnfd_id)
+ for pkg in pkgs:
+ pnf_info_ids.append(pkg.pnfPackageId)
+ if pnfd_info_id in pnf_info_ids:
+ logger.info('PNFD(%s) is referenced.' % pnfd_info_id)
+ raise CatalogException('PNFD(%s) is referenced.' % pnfd_info_id)
+
+ pnf_pkgs.delete()
+ pnf_pkg_path = os.path.join(CATALOG_ROOT_PATH, pnfd_info_id)
+ fileutil.delete_dirs(pnf_pkg_path)
+ logger.debug('PNFD(%s) has been deleted.' % pnfd_info_id)
+
+ def download(self, pnfd_info_id):
+ logger.info('Start to download PNFD(%s)...' % pnfd_info_id)
+ pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
+ if not pnf_pkgs.exists():
+ logger.error('PNFD(%s) does not exist.' % pnfd_info_id)
+ raise ResourceNotFoundException('PNFD(%s) does not exist.' % pnfd_info_id)
+ if pnf_pkgs[0].onboardingState != PKG_STATUS.ONBOARDED:
+ logger.error('PNFD(%s) is not ONBOARDED.' % pnfd_info_id)
+ raise CatalogException('PNFD(%s) is not ONBOARDED.' % pnfd_info_id)
+ local_file_path = pnf_pkgs[0].localFilePath
+ local_file_name = local_file_path.split('/')[-1]
+ local_file_name = local_file_name.split('\\')[-1]
+ logger.info('PNFD(%s) has been downloaded.' % pnfd_info_id)
+ return local_file_path, local_file_name, os.path.getsize(local_file_path)
def parse_pnfd_and_save(pnfd_info_id, local_file_name):
@@ -110,53 +158,6 @@ def parse_pnfd_and_save(pnfd_info_id, local_file_name):
logger.info('PNFD(%s) has been processed.' % pnfd_info_id)
-def download(pnfd_info_id):
- logger.info('Start to download PNFD(%s)...' % pnfd_info_id)
- pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
- if not pnf_pkgs.exists():
- logger.error('PNFD(%s) does not exist.' % pnfd_info_id)
- raise ResourceNotFoundException('PNFD(%s) does not exist.' % pnfd_info_id)
- if pnf_pkgs[0].onboardingState != PKG_STATUS.ONBOARDED:
- logger.error('PNFD(%s) is not ONBOARDED.' % pnfd_info_id)
- raise CatalogException('PNFD(%s) is not ONBOARDED.' % pnfd_info_id)
- local_file_path = pnf_pkgs[0].localFilePath
- local_file_name = local_file_path.split('/')[-1]
- local_file_name = local_file_name.split('\\')[-1]
- logger.info('PNFD(%s) has been downloaded.' % pnfd_info_id)
- return local_file_path, local_file_name, os.path.getsize(local_file_path)
-
-
-def delete_single(pnfd_info_id):
- logger.info('Start to delete PNFD(%s)...' % pnfd_info_id)
- pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
- if not pnf_pkgs.exists():
- logger.info('PNFD(%s) has been deleted.' % pnfd_info_id)
- return
- '''
- if pnf_pkgs[0].usageState != PKG_STATUS.NOT_IN_USE:
- logger.info('PNFD(%s) shall be NOT_IN_USE.' % pnfd_info_id)
- raise CatalogException('PNFD(%s) shall be NOT_IN_USE.' % pnfd_info_id)
- '''
- ns_pkgs = NSPackageModel.objects.all()
- for ns_pkg in ns_pkgs:
- if ns_pkg.nsdModel:
- nsd_model = json.JSONDecoder().decode(ns_pkg.nsdModel)
- pnf_info_ids = []
- for pnf in nsd_model['pnfs']:
- pnfd_id = pnf["properties"]["id"]
- pkgs = PnfPackageModel.objects.filter(pnfdId=pnfd_id)
- for pkg in pkgs:
- pnf_info_ids.append(pkg.pnfPackageId)
- if pnfd_info_id in pnf_info_ids:
- logger.info('PNFD(%s) is referenced.' % pnfd_info_id)
- raise CatalogException('PNFD(%s) is referenced.' % pnfd_info_id)
-
- pnf_pkgs.delete()
- pnf_pkg_path = os.path.join(CATALOG_ROOT_PATH, pnfd_info_id)
- fileutil.delete_dirs(pnf_pkg_path)
- logger.debug('PNFD(%s) has been deleted.' % pnfd_info_id)
-
-
def fill_response_data(pnf_pkg):
data = {
'id': pnf_pkg.pnfPackageId,
diff --git a/catalog/packages/tests/test_pnf_descriptor.py b/catalog/packages/tests/test_pnf_descriptor.py
index 195433ee..0b8ce104 100644
--- a/catalog/packages/tests/test_pnf_descriptor.py
+++ b/catalog/packages/tests/test_pnf_descriptor.py
@@ -27,6 +27,7 @@ from catalog.pub.utils import toscaparser
from catalog.packages.const import PKG_STATUS
from catalog.packages.tests.const import pnfd_data
from catalog.pub.config.config import CATALOG_ROOT_PATH
+from catalog.packages.biz.pnf_descriptor import PnfPackage
class TestPnfDescriptor(TestCase):
@@ -205,3 +206,44 @@ class TestPnfDescriptor(TestCase):
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual('test1test2', file_content)
os.remove('pnfd_content.txt')
+
+ def test_pnfd_download_failed(self):
+ response = self.client.get("/api/nsd/v1/pnf_descriptors/22/pnfd_content")
+ self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+
+ @mock.patch.object(PnfPackage, "create")
+ def test_pnfd_create_when_catch_exception(self, mock_create):
+ request_data = {'userDefinedData': self.user_defined_data}
+ mock_create.side_effect = TypeError('integer type')
+ response = self.client.post('/api/nsd/v1/pnf_descriptors', data=request_data, format='json')
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+ @mock.patch.object(PnfPackage, "delete_single")
+ def test_delete_single_when_catch_exception(self, mock_delete_single):
+ mock_delete_single.side_effect = TypeError("integer type")
+ response = self.client.delete("/api/nsd/v1/pnf_descriptors/22", format='json')
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+ @mock.patch.object(PnfPackage, "query_single")
+ def test_query_single_when_catch_exception(self, mock_query_single):
+ mock_query_single.side_effect = TypeError("integer type")
+ response = self.client.get('/api/nsd/v1/pnf_descriptors/22', format='json')
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+ @mock.patch.object(PnfPackage, "query_multiple")
+ def test_query_multiple_when_catch_exception(self, mock_query_muitiple):
+ mock_query_muitiple.side_effect = TypeError("integer type")
+ response = self.client.get('/api/nsd/v1/pnf_descriptors', format='json')
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+ @mock.patch.object(PnfPackage, "upload")
+ def test_upload_when_catch_exception(self, mock_upload):
+ mock_upload.side_effect = TypeError("integer type")
+ response = self.client.put("/api/nsd/v1/pnf_descriptors/22/pnfd_content")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+ @mock.patch.object(PnfPackage, "download")
+ def test_download_when_catch_exception(self, mock_download):
+ mock_download.side_effect = TypeError("integer type")
+ response = self.client.get("/api/nsd/v1/pnf_descriptors/22/pnfd_content")
+ self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
diff --git a/catalog/packages/views/pnf_descriptor_views.py b/catalog/packages/views/pnf_descriptor_views.py
index b0d02a82..d6600923 100644
--- a/catalog/packages/views/pnf_descriptor_views.py
+++ b/catalog/packages/views/pnf_descriptor_views.py
@@ -17,8 +17,7 @@ import traceback
from django.http import FileResponse
-from catalog.packages.biz.pnf_descriptor import create, delete_single, download, query_multiple, query_single, upload, \
- parse_pnfd_and_save, handle_upload_failed
+from catalog.packages.biz.pnf_descriptor import PnfPackage, parse_pnfd_and_save, handle_upload_failed
from catalog.packages.serializers.create_pnfd_info_request import CreatePnfdInfoRequestSerializer
from catalog.packages.serializers.pnfd_info import PnfdInfoSerializer
from catalog.packages.serializers.pnfd_infos import PnfdInfosSerializer
@@ -55,7 +54,7 @@ def pnfd_info_rd(request, pnfdInfoId): # TODO
if request.method == 'GET':
logger.debug("Query an individual PNF descriptor> %s" % request.data)
try:
- data = query_single(pnfdInfoId)
+ data = PnfPackage().query_single(pnfdInfoId)
pnfd_info = validate_data(data, PnfdInfoSerializer)
return Response(data=pnfd_info.data, status=status.HTTP_200_OK)
except ResourceNotFoundException as e:
@@ -73,7 +72,7 @@ def pnfd_info_rd(request, pnfdInfoId): # TODO
if request.method == 'DELETE':
logger.debug("Delete an individual PNFD resource> %s" % request.data)
try:
- delete_single(pnfdInfoId)
+ PnfPackage().delete_single(pnfdInfoId)
return Response(data=None, status=status.HTTP_204_NO_CONTENT)
except CatalogException as e:
logger.error(e.message)
@@ -108,7 +107,7 @@ def pnf_descriptors_rc(request, *args, **kwargs):
if request.method == 'POST':
try:
create_pnfd_info_request = validate_data(request.data, CreatePnfdInfoRequestSerializer)
- data = create(create_pnfd_info_request.data)
+ data = PnfPackage().create(create_pnfd_info_request.data)
pnfd_info = validate_data(data, PnfdInfoSerializer)
return Response(data=pnfd_info.data, status=status.HTTP_201_CREATED)
except CatalogException as e:
@@ -122,7 +121,7 @@ def pnf_descriptors_rc(request, *args, **kwargs):
if request.method == 'GET':
try:
- data = query_multiple()
+ data = PnfPackage().query_multiple()
pnfd_infos = validate_data(data, PnfdInfosSerializer)
return Response(data=pnfd_infos.data, status=status.HTTP_200_OK)
except CatalogException as e:
@@ -160,7 +159,7 @@ def pnfd_content_ru(request, *args, **kwargs):
if request.method == 'PUT':
files = request.FILES.getlist('file')
try:
- local_file_name = upload(files[0], pnfd_info_id)
+ local_file_name = PnfPackage().upload(files[0], pnfd_info_id)
parse_pnfd_and_save(pnfd_info_id, local_file_name)
return Response(data=None, status=status.HTTP_204_NO_CONTENT)
except CatalogException as e:
@@ -176,7 +175,7 @@ def pnfd_content_ru(request, *args, **kwargs):
if request.method == 'GET':
try:
- file_path, file_name, file_size = download(pnfd_info_id)
+ file_path, file_name, file_size = PnfPackage().download(pnfd_info_id)
response = FileResponse(open(file_path, 'rb'), status=status.HTTP_200_OK)
response['Content-Disposition'] = 'attachment; filename=%s' % file_name.encode('utf-8')
response['Content-Length'] = file_size