diff options
author | Michal Jagiello <michal.jagiello@t-mobile.pl> | 2022-10-17 12:46:49 +0000 |
---|---|---|
committer | Michal Jagiello <michal.jagiello@t-mobile.pl> | 2022-10-17 14:05:09 +0000 |
commit | f2adf542e878c96895210f97ebf1ebb763b2f465 (patch) | |
tree | 91fc0faeb3436e723d07aed1f38ce59a6e7cc7c5 /tests/test_cps.py | |
parent | 49071a0d0425ef67fa552dbf14c81e5a11cc49e7 (diff) |
Release ONAP SDKv10.2
Issue-ID: INT-2150
Signed-off-by: Michal Jagiello <michal.jagiello@t-mobile.pl>
Change-Id: I650047c599a5aae6de7c6b42d38e34aea88578e2
Diffstat (limited to 'tests/test_cps.py')
-rw-r--r-- | tests/test_cps.py | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/tests/test_cps.py b/tests/test_cps.py new file mode 100644 index 0000000..8f7bdc4 --- /dev/null +++ b/tests/test_cps.py @@ -0,0 +1,240 @@ +# Copyright 2022 Orange, Deutsche Telekom AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from typing import List + +from onapsdk.cps import Anchor, Dataspace, SchemaSet, SchemaSetModuleReference, anchor + +DATASPACE_ANCHOR = { + "name": "anchor1", + "schemaSetName": "schemaSet1" +} + +DATASPACE_ANCHORS = [ + DATASPACE_ANCHOR, + { + "name": "anchor2", + "schemaSetName": "schemaSet2" + } +] + +DATASPACE_SCHEMA_SET = { + "name": "schemaSet1", + "moduleReferences": [ + { + "name": "mr1", + "namespace": "mr1_namespace", + "revision": "mr1_revision", + }, + { + "name": "mr2", + "namespace": "mr2_namespace", + "revision": "mr2_revision", + } + ] +} + +# Dataspace tests +def test_dataspace(): + ds = Dataspace(name="test_ds") + assert ds.name == "test_ds" + assert f"cps/api/v1/dataspaces/{ds.name}" in ds.url + +@mock.patch("onapsdk.cps.Dataspace.send_message") +def test_dataspace_create_anchor(mock_send_message): + ds = Dataspace(name="test_ds") + anchor = ds.create_anchor(mock.MagicMock(), "test_anchor") + mock_send_message.assert_called_once() + assert anchor.name == "test_anchor" + +@mock.patch("onapsdk.cps.Dataspace.send_message_json") +def test_dataspace_get_anchors(mock_send_message_json): + mock_send_message_json.return_value = DATASPACE_ANCHORS + ds = Dataspace(name="test_ds") + anchors = list(ds.get_anchors()) + assert len(anchors) == 2 + anchor_1, anchor_2 = anchors + assert isinstance(anchor_1, Anchor) + assert isinstance(anchor_2, Anchor) + assert anchor_1.name == "anchor1" + assert isinstance(anchor_1.schema_set, SchemaSet) + assert anchor_1.schema_set.name == "schemaSet1" + assert anchor_1.schema_set.dataspace == ds + assert anchor_2.name == "anchor2" + assert isinstance(anchor_2.schema_set, SchemaSet) + assert anchor_2.schema_set.name == "schemaSet2" + assert anchor_2.schema_set.dataspace == ds + +@mock.patch("onapsdk.cps.Dataspace.send_message_json") +def test_dataspace_get_anchor(mock_send_message_json): + mock_send_message_json.return_value = DATASPACE_ANCHOR + ds = Dataspace(name="test_ds") + anchor = ds.get_anchor("anything") + assert anchor.name == "anchor1" + assert anchor.schema_set.name == "schemaSet1" + assert anchor.schema_set.dataspace == ds + +@mock.patch("onapsdk.cps.Dataspace.send_message_json") +def test_dataspace_get_schema_set(mock_send_message_json): + mock_send_message_json.return_value = DATASPACE_SCHEMA_SET + ds = Dataspace(name="test_ds") + schema_set = ds.get_schema_set("anything") + assert isinstance(schema_set, SchemaSet) + assert schema_set.dataspace == ds + assert schema_set.name == "schemaSet1" + assert len(schema_set.module_refences) == 2 + mr_1, mr_2 = schema_set.module_refences + assert mr_1.name == "mr1" + assert mr_1.namespace == "mr1_namespace" + assert mr_1.revision == "mr1_revision" + assert mr_2.name == "mr2" + assert mr_2.namespace == "mr2_namespace" + assert mr_2.revision == "mr2_revision" + +@mock.patch("onapsdk.cps.Dataspace.send_message") +@mock.patch("onapsdk.cps.Dataspace.get_schema_set") +def test_dataspace_create_schema_set(mock_get_chema_set, mock_send_message): + ds = Dataspace(name="test_ds") + _ = ds.create_schema_set("test_schema_set_name", b"fake_file") + mock_send_message.assert_called_once() + mock_get_chema_set.assert_called_once_with("test_schema_set_name") + +@mock.patch("onapsdk.cps.Dataspace.send_message") +def test_dataspace_delete(mock_send_message): + ds = Dataspace(name="test_ds") + ds.delete() + mock_send_message.assert_called_once() + +# Schemaset tests +def test_schema_set(): + schema_set = SchemaSet(name="test", dataspace=mock.MagicMock()) + assert schema_set.name == "test" + assert isinstance(schema_set.module_refences, List) + assert not len(schema_set.module_refences) + + schema_set = SchemaSet(name="test_with_mr", dataspace=mock.MagicMock(), + module_references=[SchemaSetModuleReference(name="mr1", namespace="mr1_n", revision="mr1_rev"), + SchemaSetModuleReference(name="mr2", namespace="mr2_n", revision="mr2_rev")]) + assert schema_set.name == "test_with_mr" + assert isinstance(schema_set.module_refences, List) + assert len(schema_set.module_refences) == 2 + mr_1, mr_2 = schema_set.module_refences + assert isinstance(mr_1, SchemaSetModuleReference) + assert isinstance(mr_2, SchemaSetModuleReference) + assert mr_1.name == "mr1" + assert mr_1.namespace == "mr1_n" + assert mr_1.revision == "mr1_rev" + assert mr_2.name == "mr2" + assert mr_2.namespace == "mr2_n" + assert mr_2.revision == "mr2_rev" + +@mock.patch("onapsdk.cps.SchemaSet.send_message") +def test_schemaset_delete(mock_send_message): + schema_set = SchemaSet(name="test", dataspace=mock.MagicMock()) + schema_set.delete() + mock_send_message.assert_called_once() + +# Anchor tests +def test_anchor(): + anchor = Anchor(name="test_anchor", schema_set=mock.MagicMock()) + assert anchor.name == "test_anchor" + assert "test_anchor" in anchor.url + +@mock.patch("onapsdk.cps.Anchor.send_message") +def test_anchor_delete(mock_send_message): + anchor = Anchor(name="test_anchor", schema_set=mock.MagicMock()) + anchor.delete() + mock_send_message.assert_called_once() + url = mock_send_message.call_args[0][2] + assert anchor.url in url + +@mock.patch("onapsdk.cps.Anchor.send_message") +def test_anchor_create_node(mock_send_message): + anchor = Anchor(name="test_anchor", schema_set=mock.MagicMock()) + anchor.create_node('{"test": "data"}') + mock_send_message.assert_called_once() + data = mock_send_message.call_args[1]["data"] + assert data == '{"test": "data"}' + +@mock.patch("onapsdk.cps.Anchor.send_message_json") +def test_anchor_get_node(mock_send_message_json): + anchor = Anchor(name="test_anchor", schema_set=mock.MagicMock()) + anchor.get_node("test-xpath") + mock_send_message_json.assert_called_once() + url = mock_send_message_json.call_args[0][2] + assert "xpath=test-xpath" in url + assert "include-descendants=False" in url + + mock_send_message_json.reset_mock() + anchor.get_node("test-xpath-2", include_descendants=True) + mock_send_message_json.assert_called_once() + url = mock_send_message_json.call_args[0][2] + assert "xpath=test-xpath-2" in url + assert "include-descendants=True" in url + + mock_send_message_json.reset_mock() + anchor.get_node("test-xpath-3", include_descendants=False) + mock_send_message_json.assert_called_once() + url = mock_send_message_json.call_args[0][2] + assert "xpath=test-xpath-3" in url + assert "include-descendants=False" in url + +@mock.patch("onapsdk.cps.Anchor.send_message") +def test_anchor_update_node(mock_send_message): + anchor = Anchor(name="test_anchor", schema_set=mock.MagicMock()) + anchor.update_node("test-xpath", '{"test": "data"}') + mock_send_message.assert_called_once() + url = mock_send_message.call_args[0][2] + assert "xpath=test-xpath" in url + +@mock.patch("onapsdk.cps.Anchor.send_message") +def test_anchor_replace_node(mock_send_message): + anchor = Anchor(name="test_anchor", schema_set=mock.MagicMock()) + anchor.replace_node("test-xpath", '{"test": "data"}') + mock_send_message.assert_called_once() + url = mock_send_message.call_args[0][2] + assert "xpath=test-xpath" in url + +@mock.patch("onapsdk.cps.Anchor.send_message") +def test_anchor_add_list_node(mock_send_message): + anchor = Anchor(name="test_anchor", schema_set=mock.MagicMock()) + anchor.add_list_node("test-xpath", '{"test": "data"}') + mock_send_message.assert_called_once() + url = mock_send_message.call_args[0][2] + assert "xpath=test-xpath" in url + +@mock.patch("onapsdk.cps.Anchor.send_message_json") +def test_anchor_query_node(mock_send_message_json): + anchor = Anchor(name="test_anchor", schema_set=mock.MagicMock()) + anchor.query_node("/test-query") + mock_send_message_json.assert_called_once() + url = mock_send_message_json.call_args[0][2] + assert "cps-path=/test-query" in url + assert "include-descendants=False" in url + + mock_send_message_json.reset_mock() + anchor.query_node("/test-query1", include_descendants=True) + mock_send_message_json.assert_called_once() + url = mock_send_message_json.call_args[0][2] + assert "cps-path=/test-query1" in url + assert "include-descendants=True" in url + +@mock.patch("onapsdk.cps.Anchor.send_message") +def test_anchor_delete_nodes(mock_send_message): + anchor = Anchor(name="test_anchor", schema_set=mock.MagicMock()) + anchor.delete_nodes("test-xpath") + mock_send_message.assert_called_once() + url = mock_send_message.call_args[0][2] + assert "xpath=test-xpath" in url |