# Copyright 2022 Orange, Deutsche Telekom AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections import namedtuple from unittest import mock from onapsdk.aai.business import Customer from onapsdk.exceptions import RequestError from onapsdk.nbi import Nbi, Service, ServiceOrder, ServiceSpecification SERVICE_SPECIFICATION = { "id": "a80c901c-6593-491f-9465-877e5acffb46", "name": "testService1", "invariantUUID": "217deaa7-dfc3-41d8-aa53-bb009029c09f", "category": "Network Service", "distributionStatus": "DISTRIBUTED", "version": "1.0", "lifecycleStatus": "CERTIFIED", "relatedParty": { "id": "cs0008", "role": "lastUpdater" } } SERVICE_SPECIFICATIONS = [ { "id": "a80c901c-6593-491f-9465-877e5acffb46", "name": "testService1", "invariantUUID": "217deaa7-dfc3-41d8-aa53-bb009029c09f", "category": "Network Service", "distributionStatus": "DISTRIBUTED", "version": "1.0", "lifecycleStatus": "CERTIFIED", "relatedParty": { "id": "cs0008", "role": "lastUpdater" } }, { "id": "b1cda0ab-d968-41ef-9051-d26b33b120be", "name": "testService2", "invariantUUID": "906c3185-9656-4639-8f4d-d51d9ee0695d", "category": "Network Service", "distributionStatus": "DISTRIBUTED", "version": "1.0", "lifecycleStatus": "CERTIFIED", "relatedParty": { "id": "cs0008" , "role": "lastUpdater" } } ] SERVICES = [ { "id": "5c855390-7c39-4fe4-b164-2029b09de57c", "name": "test6", "serviceSpecification": { "name": "testService9", "id": "125727ad-8660-423e-b4a1-99cd4a749f45" }, "relatedParty": { "role": "ONAPcustomer", "id": "generic" }, "href": "service/5c855390-7c39-4fe4-b164-2029b09de57c" }, { "id": "f948be83-c3e8-4515-a27d-2983eba63911", "name": "test4", "serviceSpecification": { "name": "testService8", "id": "0960aedb-3ad8-49e1-ade5-a59414f6fda4" }, "relatedParty": { "role": "ONAPcustomer", "id": "generic" }, "href": "service/f948be83-c3e8-4515-a27d-2983eba63911" }, { "id": "5066eabd-846c-4ed9-886b-69892a12968d", "name": "test5", "serviceSpecification": { "name": "testService8", "id": "0960aedb-3ad8-49e1-ade5-a59414f6fda4" }, "relatedParty": { "role": "ONAPcustomer", "id": "generic" }, "href": "service/5066eabd-846c-4ed9-886b-69892a12968d" } ] SERVICES_CUSTOMER = [ { "id": "6a855390-7c39-4fe4-b164-2029b09de57d", "name": "test7", "serviceSpecification": { "name": "testService9", "id": "125727ad-8660-423e-b4a1-99cd4a749f45" }, "relatedParty": { "role": "ONAPcustomer", "id": "test_customer" }, "href": "service/6a855390-7c39-4fe4-b164-2029b09de57d" } ] SERVICE_ORDERS = [ { "id": "5e9d6d98ae76af6b04e4df9a", "href": "serviceOrder/5e9d6d98ae76af6b04e4df9a", "externalId": "", "priority": "1", "description": "testService order for generic customer via Python ONAP SDK", "category": "Consumer", "state": "rejected", "orderDate": "2020-04-20T09:38:32.286Z", "completionDateTime": "2020-04-20T09:38:47.866Z", "expectedCompletionDate": None, "requestedStartDate": "2020-04-20T09:47:49.919Z", "requestedCompletionDate": "2020-04-20T09:47:49.919Z", "startDate": None, "@baseType": None, "@type": None, "@schemaLocation": None, "relatedParty": [ { "id": "generic", "href": None, "role": "ONAPcustomer", "name": "generic", "@referredType": None } ], "orderRelationship": None, "orderItem": [ { "orderMessage": [], "id": "1", "action": "add", "state": "rejected", "percentProgress": "0", "@type": None, "@schemaLocation": None, "@baseType": None, "orderItemRelationship": [], "service": { "id": None, "serviceType": None, "href": None, "name": "08d960ae-c2e1-4d5c-baf0-6420659ea68a", "serviceState": "active", "@type": None, "@schemaLocation": None, "serviceCharacteristic": None, "serviceRelationship": None, "relatedParty": None, "serviceSpecification": { "id": "a80c901c-6593-491f-9465-877e5acffb46", "href": None, "name": None, "version": None, "targetServiceSchema": None, "@type": None, "@schemaLocation": None, "@baseType": None } }, "orderItemMessage": [] } ], "orderMessage": [ { "code": "501", "field": None, "messageInformation": "Problem with AAI API", "severity": "error", "correctionRequired": True }, { "code": "503", "field": None, "messageInformation": "tenantId not found in AAI", "severity": "error", "correctionRequired": True } ] } ] SERVICE_ORDERS_NO_RELATED_PARTY = [ { "id": "5e9d6d98ae76af6b04e4df9a", "href": "serviceOrder/5e9d6d98ae76af6b04e4df9a", "externalId": "", "priority": "1", "description": "testService order for generic customer via Python ONAP SDK", "category": "Consumer", "state": "rejected", "orderDate": "2020-04-20T09:38:32.286Z", "completionDateTime": "2020-04-20T09:38:47.866Z", "expectedCompletionDate": None, "requestedStartDate": "2020-04-20T09:47:49.919Z", "requestedCompletionDate": "2020-04-20T09:47:49.919Z", "startDate": None, "@baseType": None, "@type": None, "@schemaLocation": None, "relatedParty": None, "orderRelationship": None, "orderItem": [ { "orderMessage": [], "id": "1", "action": "add", "state": "rejected", "percentProgress": "0", "@type": None, "@schemaLocation": None, "@baseType": None, "orderItemRelationship": [], "service": { "id": None, "serviceType": None, "href": None, "name": "08d960ae-c2e1-4d5c-baf0-6420659ea68a", "serviceState": "active", "@type": None, "@schemaLocation": None, "serviceCharacteristic": None, "serviceRelationship": None, "relatedParty": None, "serviceSpecification": { "id": "a80c901c-6593-491f-9465-877e5acffb46", "href": None, "name": None, "version": None, "targetServiceSchema": None, "@type": None, "@schemaLocation": None, "@baseType": None } }, "orderItemMessage": [] } ], "orderMessage": [ { "code": "501", "field": None, "messageInformation": "Problem with AAI API", "severity": "error", "correctionRequired": True }, { "code": "503", "field": None, "messageInformation": "tenantId not found in AAI", "severity": "error", "correctionRequired": True } ] } ] SERVICE_ORDER_STATE_COMPLETED = { 'state': 'completed' } SERVICE_ORDER_STATE_FAILED = { 'state': 'failed' } SERVICE_ORDER_STATE_IN_PROGRESS = { 'state': 'inProgress' } SERVICE_ORDER_STATE_REJECTED = { 'state': 'rejected' } SERVICE_ORDER_STATE_UNKNOWN = { 'state': 'lalala' } @mock.patch.object(Nbi, "send_message") def test_nbi(mock_send_message): assert Nbi.base_url == "https://nbi.api.simpledemo.onap.org:30274" assert Nbi.api_version == "/nbi/api/v4" mock_send_message.side_effect = RequestError assert Nbi.is_status_ok() == False mock_send_message.side_effect = None assert Nbi.is_status_ok() == True @mock.patch.object(ServiceSpecification, "send_message_json") def test_service_specification_get_all(mock_service_specification_send_message): mock_service_specification_send_message.return_value = [] assert len(list(ServiceSpecification.get_all())) == 0 mock_service_specification_send_message.return_value = SERVICE_SPECIFICATIONS service_specifications = list(ServiceSpecification.get_all()) assert len(service_specifications) == 2 assert service_specifications[0].unique_id == "a80c901c-6593-491f-9465-877e5acffb46" assert service_specifications[0].name == "testService1" assert service_specifications[0].invariant_uuid == "217deaa7-dfc3-41d8-aa53-bb009029c09f" assert service_specifications[0].category == "Network Service" assert service_specifications[0].distribution_status == "DISTRIBUTED" assert service_specifications[0].version == "1.0" assert service_specifications[0].lifecycle_status == "CERTIFIED" assert service_specifications[1].unique_id == "b1cda0ab-d968-41ef-9051-d26b33b120be" assert service_specifications[1].name == "testService2" assert service_specifications[1].invariant_uuid == "906c3185-9656-4639-8f4d-d51d9ee0695d" assert service_specifications[1].category == "Network Service" assert service_specifications[1].distribution_status == "DISTRIBUTED" assert service_specifications[1].version == "1.0" assert service_specifications[1].lifecycle_status == "CERTIFIED" @mock.patch.object(ServiceSpecification, "send_message_json") def test_service_specification_get_by_id(mock_service_specification_send_message): mock_service_specification_send_message.return_value = SERVICE_SPECIFICATION service_specification = ServiceSpecification.get_by_id("test") assert service_specification.unique_id == "a80c901c-6593-491f-9465-877e5acffb46" assert service_specification.name == "testService1" assert service_specification.invariant_uuid == "217deaa7-dfc3-41d8-aa53-bb009029c09f" assert service_specification.category == "Network Service" assert service_specification.distribution_status == "DISTRIBUTED" assert service_specification.version == "1.0" assert service_specification.lifecycle_status == "CERTIFIED" @mock.patch.object(Service, "send_message_json") @mock.patch.object(Customer, "get_by_global_customer_id") @mock.patch.object(ServiceSpecification, "get_by_id") def test_service_get_all(mock_service_specification_get_by_id, mock_customer_get_by_id, mock_service_send_message): mock_service_send_message.return_value = [] assert len(list(Service.get_all())) == 0 mock_service_send_message.return_value = SERVICES services_list = list(Service.get_all()) assert len(services_list) == 3 service = services_list[0] assert service.name == "test6" assert service.service_id == "5c855390-7c39-4fe4-b164-2029b09de57c" assert service._service_specification_name == "testService9" assert service._service_specification_id == "125727ad-8660-423e-b4a1-99cd4a749f45" assert service._customer_id == "generic" assert service.customer_role == "ONAPcustomer" assert service.href == "service/5c855390-7c39-4fe4-b164-2029b09de57c" assert service.customer is not None mock_customer_get_by_id.assert_called_once_with(service._customer_id) service._customer_id = None assert service.customer is None assert service.service_specification is not None mock_service_specification_get_by_id.assert_called_once_with(service._service_specification_id) service._service_specification_id = None assert service.service_specification is None mock_service_send_message.return_value = SERVICES_CUSTOMER services_list = list(Service.get_all(customer_id="test_customer")) assert len(services_list) == 1 service = services_list[0] assert service.name == "test7" assert service.service_id == "6a855390-7c39-4fe4-b164-2029b09de57d" assert service._service_specification_name == "testService9" assert service._service_specification_id == "125727ad-8660-423e-b4a1-99cd4a749f45" assert service._customer_id == "test_customer" assert service.customer_role == "ONAPcustomer" assert service.href == "service/6a855390-7c39-4fe4-b164-2029b09de57d" @mock.patch.object(ServiceOrder, "send_message_json") def test_service_order(mock_service_order_send_message): mock_service_order_send_message.return_value = [] assert len(list(ServiceOrder.get_all())) == 0 mock_service_order_send_message.return_value = SERVICE_ORDERS service_orders = list(ServiceOrder.get_all()) assert len(service_orders) == 1 service_order = service_orders[0] assert service_order.unique_id == "5e9d6d98ae76af6b04e4df9a" assert service_order.href == "serviceOrder/5e9d6d98ae76af6b04e4df9a" assert service_order.priority == "1" assert service_order.category == "Consumer" assert service_order.description == "testService order for generic customer via Python ONAP SDK" assert service_order.external_id == "" assert service_order._customer == None assert service_order._customer_id == "generic" assert service_order._service_specification == None assert service_order._service_specification_id == "a80c901c-6593-491f-9465-877e5acffb46" assert service_order.service_instance_name == "08d960ae-c2e1-4d5c-baf0-6420659ea68a" assert service_order.state == "rejected" @mock.patch.object(ServiceOrder, "send_message_json") def test_service_order_status(mock_service_order_send_message): mock_service_order_send_message.return_value = SERVICE_ORDERS service_order = next(ServiceOrder.get_all()) mock_service_order_send_message.return_value = SERVICE_ORDER_STATE_COMPLETED assert service_order.status == service_order.StatusEnum.COMPLETED assert service_order.completed assert service_order.finished assert not service_order.rejected assert not service_order.failed mock_service_order_send_message.return_value = SERVICE_ORDER_STATE_FAILED assert service_order.status == service_order.StatusEnum.FAILED assert not service_order.completed assert service_order.finished assert not service_order.rejected assert service_order.failed mock_service_order_send_message.return_value = SERVICE_ORDER_STATE_IN_PROGRESS assert service_order.status == service_order.StatusEnum.IN_PROGRESS assert not service_order.completed assert not service_order.finished assert not service_order.rejected assert not service_order.failed mock_service_order_send_message.return_value = SERVICE_ORDER_STATE_REJECTED assert service_order.status == service_order.StatusEnum.REJECTED assert not service_order.completed assert service_order.finished assert service_order.rejected assert not service_order.failed mock_service_order_send_message.return_value = SERVICE_ORDER_STATE_UNKNOWN assert service_order.status == service_order.StatusEnum.UNKNOWN assert not service_order.completed assert service_order.finished assert not service_order.rejected assert not service_order.failed mock_service_order_send_message.return_value = {} assert service_order.status == service_order.StatusEnum.UNKNOWN assert not service_order.completed assert service_order.finished assert not service_order.rejected assert not service_order.failed @mock.patch.object(ServiceOrder, "send_message_json") def test_service_order_no_related_party(mock_service_order_send_message): mock_service_order_send_message.return_value = [] assert len(list(ServiceOrder.get_all())) == 0 mock_service_order_send_message.return_value = SERVICE_ORDERS_NO_RELATED_PARTY service_orders = list(ServiceOrder.get_all()) assert len(service_orders) == 1 service_order = service_orders[0] assert service_order.unique_id == "5e9d6d98ae76af6b04e4df9a" assert service_order.href == "serviceOrder/5e9d6d98ae76af6b04e4df9a" assert service_order.priority == "1" assert service_order.category == "Consumer" assert service_order.description == "testService order for generic customer via Python ONAP SDK" assert service_order.external_id == "" assert service_order._customer == None assert service_order._customer_id == None assert service_order._service_specification == None assert service_order._service_specification_id == "a80c901c-6593-491f-9465-877e5acffb46" assert service_order.service_instance_name == "08d960ae-c2e1-4d5c-baf0-6420659ea68a" assert service_order.state == "rejected" @mock.patch.object(Customer, "get_by_global_customer_id") def test_service_order_customer(mock_customer_get_by_id): service_order = ServiceOrder("test_unique_id", "test_href", "test_priority", "test_description", "test_category", "test_external_id", "test_service_instance_name") assert service_order.customer is None assert service_order._customer is None service_order._customer_id = "test_customer_id" assert service_order.customer is not None mock_customer_get_by_id.assert_called_once_with("test_customer_id") assert service_order._customer is not None @mock.patch.object(ServiceSpecification, "get_by_id") def test_service_order_service_specification(mock_service_spec_get_by_id): service_order = ServiceOrder("test_unique_id", "test_href", "test_priority", "test_description", "test_category", "test_external_id", "test_service_instance_name") assert service_order.service_specification is None assert service_order._service_specification_id is None service_order._service_specification_id = "test_service_spec_id" assert service_order.service_specification is not None mock_service_spec_get_by_id.assert_called_once_with("test_service_spec_id") assert service_order._service_specification is not None @mock.patch.object(ServiceOrder, "send_message_json") def test_service_order_create(mock_service_order_send_message): ServiceOrder.create(customer=mock.MagicMock(), service_specification=mock.MagicMock()) mock_service_order_send_message.assert_called_once() method, _, url = mock_service_order_send_message.call_args[0] assert method == "POST" assert url == f"{ServiceOrder.base_url}{ServiceOrder.api_version}/serviceOrder" def test_service_order_wait_for_finish(): with mock.patch.object(ServiceOrder, "finished", new_callable=mock.PropertyMock) as mock_finished: with mock.patch.object(ServiceOrder, "completed", new_callable=mock.PropertyMock) as mock_completed: service_order = ServiceOrder( unique_id="test", href="test", priority="test", description="test", category="test", external_id="test", service_instance_name="test", ) service_order.WAIT_FOR_SLEEP_TIME = 0 mock_finished.side_effect = [False, False, True] mock_completed.return_value = True rv = namedtuple("Value", ["return_value"]) service_order._wait_for_finish(rv) assert rv.return_value