summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/tests/test_pmsh_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/tests/test_pmsh_utils.py')
-rw-r--r--components/pm-subscription-handler/tests/test_pmsh_utils.py100
1 files changed, 100 insertions, 0 deletions
diff --git a/components/pm-subscription-handler/tests/test_pmsh_utils.py b/components/pm-subscription-handler/tests/test_pmsh_utils.py
new file mode 100644
index 00000000..ee79d523
--- /dev/null
+++ b/components/pm-subscription-handler/tests/test_pmsh_utils.py
@@ -0,0 +1,100 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2019-2020 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+import json
+import os
+import unittest
+from unittest import mock
+from unittest.mock import patch
+
+import responses
+from requests import Session
+
+from mod.pmsh_utils import AppConfig
+from mod.subscription import Subscription
+
+
+class PmshUtilsTestCase(unittest.TestCase):
+
+ def setUp(self):
+ with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data.json'), 'r') as data:
+ self.cbs_data = json.load(data)
+ self.app_conf = AppConfig(**self.cbs_data['config'])
+ self.sub = Subscription(**self.cbs_data['policy']['subscription'])
+
+ def test_utils_get_mr_sub(self):
+ mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
+ self.assertTrue(mr_policy_sub.aaf_id, 'dcae@dcae.onap.org')
+
+ def test_utils_get_mr_sub_fails_with_invalid_name(self):
+ with self.assertRaises(KeyError):
+ self.app_conf.get_mr_sub('invalid_sub')
+
+ def test_utils_get_mr_pub(self):
+ mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
+ self.assertTrue(mr_policy_pub.aaf_pass, 'demo123456!')
+
+ def test_utils_get_mr_pub_fails_with_invalid_name(self):
+ with self.assertRaises(KeyError):
+ self.app_conf.get_mr_pub('invalid_pub')
+
+ def test_utils_get_cert_data(self):
+ self.assertTrue(self.app_conf.cert_params, ('/opt/app/pm-mapper/etc/certs/cert.pem',
+ '/opt/app/pm-mapper/etc/certs/key.pem'))
+
+ @mock.patch.object(Session, 'post')
+ def test_mr_pub_publish_to_topic_success(self, mock_session):
+ mock_session.return_value.status_code = 200
+ mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
+ with patch('requests.Session.post') as session_post_call:
+ mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
+ session_post_call.assert_called_once()
+
+ @responses.activate
+ def test_mr_pub_publish_to_topic_fail(self):
+ responses.add(responses.POST,
+ 'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS',
+ json={'error': 'Client Error'}, status=400)
+ mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
+ with self.assertRaises(Exception):
+ mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
+
+ def test_mr_pub_publish_sub_event_data_success(self):
+ mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
+ with patch('mod.pmsh_utils._MrPub.publish_to_topic') as pub_to_topic_call:
+ mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201')
+ pub_to_topic_call.assert_called_once()
+
+ @responses.activate
+ def test_mr_sub_get_from_topic_success(self):
+ responses.add(responses.GET,
+ 'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
+ 'dcae_pmsh_cg/1?timeout=1000',
+ json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200)
+ mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
+ mr_topic_data = mr_policy_sub.get_from_topic(1)
+ self.assertIsNotNone(mr_topic_data)
+
+ @responses.activate
+ def test_mr_sub_get_from_topic_fail(self):
+ responses.add(responses.GET,
+ 'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
+ 'dcae_pmsh_cg/1?timeout=1000',
+ json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400)
+ mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
+ mr_topic_data = mr_policy_sub.get_from_topic(1)
+ self.assertIsNone(mr_topic_data)