diff options
author | emartin <ephraim.martin@est.tech> | 2020-02-27 13:56:52 +0000 |
---|---|---|
committer | emartin <ephraim.martin@est.tech> | 2020-02-28 11:24:53 +0000 |
commit | c19a0a85bbbc8dcf0633a32d26f4128f6c8c4544 (patch) | |
tree | 234c14ca8729b852f0ea229483807890b8f96362 | |
parent | 82a39f7da3177a9b9b700c7291ed5ea47c90e478 (diff) |
Update PM subscription event for PMSH
* Add control loop name
* Remove invariant id reference
Issue-ID: DCAEGEN2-2100
Signed-off-by: emartin <ephraim.martin@est.tech>
Change-Id: I6bbb757e07f3d930ecd28bd3106df307a264ff65
11 files changed, 108 insertions, 30 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index f8254e52..9d69e760 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -39,7 +39,7 @@ class OrchestrationStatus(Enum): INVENTORIED = 'Inventoried' -def process_aai_events(mr_sub, subscription, mr_pub, app): +def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf): """ Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active. @@ -48,6 +48,7 @@ def process_aai_events(mr_sub, subscription, mr_pub, app): subscription (Subscription): The current subscription object mr_pub (_MrPub): MR publisher app (db): DB application + app_conf (AppConfig): the application configuration. """ app.app_context().push() aai_events = mr_sub.get_from_topic('AAI-EVENT') @@ -65,10 +66,10 @@ def process_aai_events(mr_sub, subscription, mr_pub, app): new_status = aai_xnf['orchestration-status'] if NetworkFunctionFilter(**subscription.nfFilter).is_nf_in_filter(xnf_name): - _process_event(action, new_status, xnf_name, subscription, mr_pub) + _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf) -def _process_event(action, new_status, xnf_name, subscription, mr_pub): +def _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf): if action == AAIEvent.UPDATE.value: logger.debug(f'Update event found for network function {xnf_name}') local_xnf = NetworkFunction.get(xnf_name) @@ -76,7 +77,7 @@ def _process_event(action, new_status, xnf_name, subscription, mr_pub): if local_xnf is None: logger.debug(f'Activating subscription for network function {xnf_name}') subscription.process_subscription([NetworkFunction( - nf_name=xnf_name, orchestration_status=new_status)], mr_pub) + nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf) else: logger.debug(f"Update Event for network function {xnf_name} will not be processed " f" as it's state is set to {local_xnf.orchestration_status}.") diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 9ff0c653..c8b3bc77 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -37,6 +37,8 @@ class AppConfig: self.key_path = kwargs.get('key_path') self.streams_subscribes = kwargs.get('streams_subscribes') self.streams_publishes = kwargs.get('streams_publishes') + self.operational_policy_name = kwargs.get('operational_policy_name') + self.control_loop_name = kwargs.get('control_loop_name') def get_mr_sub(self, sub_name): """ @@ -127,16 +129,17 @@ class _MrPub(_DmaapMrClient): logger.debug(e) raise - def publish_subscription_event_data(self, subscription, xnf_name): + def publish_subscription_event_data(self, subscription, xnf_name, app_conf): """ Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic. Args: subscription: the `Subscription` <Subscription> object. xnf_name: the xnf to include in the event. + app_conf (AppConfig): the application configuration. """ try: - subscription_event = subscription.prepare_subscription_event(xnf_name) + subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf) self.publish_to_topic(subscription_event) except Exception as e: logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}') @@ -249,6 +252,7 @@ class PeriodicTask(Timer): """ See :class:`Timer`. """ + def run(self): while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs) diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index 7a0b88c1..5449f420 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -43,25 +43,26 @@ class Subscription: self.administrativeState = kwargs.get('administrativeState') self.fileBasedGP = kwargs.get('fileBasedGP') self.fileLocation = kwargs.get('fileLocation') - self.nfTypeModelInvariantId = kwargs.get('nfTypeModelInvariantId') self.nfFilter = kwargs.get('nfFilter') self.measurementGroups = kwargs.get('measurementGroups') - def prepare_subscription_event(self, xnf_name): + def prepare_subscription_event(self, xnf_name, app_conf): """Prepare the sub event for publishing Args: xnf_name: the AAI xnf name. + app_conf (AppConfig): the application configuration. Returns: dict: the Subscription event to be published. """ clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} - clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}', - 'changeType': 'DELETE' - if self.administrativeState == AdministrativeState.LOCKED.value - else 'CREATE'}) - return clean_sub + sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name, + 'changeType': 'DELETE' + if self.administrativeState == AdministrativeState.LOCKED.value + else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name, + 'subscription': clean_sub} + return sub_event def create(self): """ Creates a subscription database entry @@ -159,7 +160,7 @@ class Subscription: @retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3), retry=retry_if_exception_type(Exception)) - def process_subscription(self, nfs, mr_pub): + def process_subscription(self, nfs, mr_pub, app_conf): action = 'Deactivate' sub_nf_state = SubNfState.PENDING_DELETE.value self.update_subscription_status() @@ -170,7 +171,7 @@ class Subscription: try: for nf in nfs: - mr_pub.publish_subscription_event_data(self, nf.nf_name) + mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf) logger.debug(f'Publishing Event to {action} ' f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') self.add_network_functions_to_subscription(nfs) diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index 31d1d079..8245466b 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -42,6 +42,7 @@ def subscription_processor(config_handler, administrative_state, mr_pub, app, """ app.app_context().push() config = config_handler.get_config() + app_conf = AppConfig(**config['config']) new_administrative_state = config['policy']['subscription']['administrativeState'] polling_period = 30.0 @@ -52,9 +53,9 @@ def subscription_processor(config_handler, administrative_state, mr_pub, app, logger.debug(f'Administrative State changed from "{administrative_state}" "to ' f'"{new_administrative_state}".') sub, nfs = aai.get_pmsh_subscription_data(config) - sub.process_subscription(nfs, mr_pub) - aai_event_thread = PeriodicTask(10, process_aai_events, args=(mr_aai_event_subscriber, - sub, mr_pub, app)) + sub.process_subscription(nfs, mr_pub, app_conf) + aai_event_thread = PeriodicTask(10, process_aai_events, args=( + mr_aai_event_subscriber, sub, mr_pub, app, app_conf)) if new_administrative_state == AdministrativeState.UNLOCKED.value: logger.debug('Listening to AAI-EVENT topic in MR.') diff --git a/components/pm-subscription-handler/tests/data/cbs_data_1.json b/components/pm-subscription-handler/tests/data/cbs_data_1.json index ccc0626d..8dc225dc 100644 --- a/components/pm-subscription-handler/tests/data/cbs_data_1.json +++ b/components/pm-subscription-handler/tests/data/cbs_data_1.json @@ -5,7 +5,6 @@ "administrativeState":"UNLOCKED", "fileBasedGP":15, "fileLocation":"\/pm\/pm.xml", - "nfTypeModelInvariantId":"2829292", "nfFilter":{ "swVersions":[ "1.0.0", @@ -61,6 +60,8 @@ } }, "config":{ + "control_loop_name": "pmsh-control-loop", + "operational_policy_name": "pmsh-operational-policy", "aaf_password":"demo123456!", "aaf_identity":"dcae@dcae.onap.org", "cert_path":"/opt/app/pm-mapper/etc/certs/cert.pem", diff --git a/components/pm-subscription-handler/tests/data/cbs_data_2.json b/components/pm-subscription-handler/tests/data/cbs_data_2.json index 43f67e88..c223ddea 100755 --- a/components/pm-subscription-handler/tests/data/cbs_data_2.json +++ b/components/pm-subscription-handler/tests/data/cbs_data_2.json @@ -6,7 +6,6 @@ "administrativeState": "UNLOCKED", "fileBasedGP": 15, "fileLocation": "c:\/\/PM", - "nfTypeModelInvariantId": "2829292", "nfFilter": { "swVersions": [ "A21", diff --git a/components/pm-subscription-handler/tests/data/pm_subscription_event.json b/components/pm-subscription-handler/tests/data/pm_subscription_event.json new file mode 100755 index 00000000..e190aa26 --- /dev/null +++ b/components/pm-subscription-handler/tests/data/pm_subscription_event.json @@ -0,0 +1,54 @@ +{
+ "nfName":"pnf_1",
+ "policyName":"pmsh-operational-policy",
+ "changeType":"CREATE",
+ "closedLoopControlName":"pmsh-control-loop",
+ "subscription":{
+ "subscriptionName":"ExtraPM-All-gNB-R2B",
+ "administrativeState":"UNLOCKED",
+ "fileBasedGP":15,
+ "fileLocation":"/pm/pm.xml",
+ "measurementGroups":[
+ {
+ "measurementGroup":{
+ "measurementTypes":[
+ {
+ "measurementType":"countera"
+ },
+ {
+ "measurementType":"counterb"
+ }
+ ],
+ "managedObjectDNsBasic":[
+ {
+ "DN":"dna"
+ },
+ {
+ "DN":"dnb"
+ }
+ ]
+ }
+ },
+ {
+ "measurementGroup":{
+ "measurementTypes":[
+ {
+ "measurementType":"counterc"
+ },
+ {
+ "measurementType":"counterd"
+ }
+ ],
+ "managedObjectDNsBasic":[
+ {
+ "DN":"dnc"
+ },
+ {
+ "DN":"dnd"
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file diff --git a/components/pm-subscription-handler/tests/test_aai_event_handler.py b/components/pm-subscription-handler/tests/test_aai_event_handler.py index add7b3f5..0fd9e77e 100755 --- a/components/pm-subscription-handler/tests/test_aai_event_handler.py +++ b/components/pm-subscription-handler/tests/test_aai_event_handler.py @@ -39,15 +39,17 @@ class AAIEventHandlerTest(TestCase): @patch('mod.aai_event_handler.NetworkFunction.delete') @patch('mod.aai_event_handler.NetworkFunction.get') - def test_process_aai_update_and_delete_events(self, mock_nf_get, mock_nf_delete): + @patch('pmsh_service_main.AppConfig') + def test_process_aai_update_and_delete_events(self, mock_app_conf, mock_nf_get, mock_nf_delete): pnf_already_active = NetworkFunction(nf_name='pnf_already_active', orchestration_status=OrchestrationStatus.ACTIVE.value) mock_nf_get.side_effect = [None, pnf_already_active] expected_nf_for_processing = NetworkFunction( nf_name='pnf_newly_discovered', orchestration_status=OrchestrationStatus.ACTIVE.value) - process_aai_events(self.mock_mr_sub, self.mock_sub, self.mock_mr_pub, self.mock_app) + process_aai_events(self.mock_mr_sub, self.mock_sub, + self.mock_mr_pub, self.mock_app, mock_app_conf) self.mock_sub.process_subscription.assert_called_once_with([expected_nf_for_processing], - self.mock_mr_pub) + self.mock_mr_pub, mock_app_conf) mock_nf_delete.assert_called_once_with(nf_name='pnf_to_be_deleted') diff --git a/components/pm-subscription-handler/tests/test_pmsh_service.py b/components/pm-subscription-handler/tests/test_pmsh_service.py index b7225677..cd28a5d9 100644 --- a/components/pm-subscription-handler/tests/test_pmsh_service.py +++ b/components/pm-subscription-handler/tests/test_pmsh_service.py @@ -46,7 +46,9 @@ class PMSHServiceTest(TestCase): @patch('threading.Timer') @patch('mod.aai_client.get_pmsh_subscription_data') @patch('pmsh_service_main.PeriodicTask') - def test_subscription_processor_changed_state(self, periodic_task, mock_get_aai, mock_thread): + @patch('pmsh_service_main.AppConfig') + def test_subscription_processor_changed_state(self, mock_app_conf, periodic_task, mock_get_aai, + mock_thread): self.mock_config_handler.get_config.return_value = self.cbs_data_1 mock_get_aai.return_value = self.mock_sub, self.nfs mock_thread.start.return_value = 1 @@ -55,7 +57,8 @@ class PMSHServiceTest(TestCase): pmsh_service.subscription_processor(self.mock_config_handler, 'LOCKED', self.mock_mr_pub, self.mock_app, self.mock_aai_sub) - self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub) + self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub, + mock_app_conf.return_value) @patch('threading.Timer') @patch('mod.pmsh_logging.debug') diff --git a/components/pm-subscription-handler/tests/test_pmsh_utils.py b/components/pm-subscription-handler/tests/test_pmsh_utils.py index 03e8c691..ea657f49 100644 --- a/components/pm-subscription-handler/tests/test_pmsh_utils.py +++ b/components/pm-subscription-handler/tests/test_pmsh_utils.py @@ -91,7 +91,7 @@ class PmshUtilsTestCase(TestCase): def test_mr_pub_publish_sub_event_data_success(self): mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher') with patch('mod.pmsh_utils._MrPub.publish_to_topic') as pub_to_topic_call: - mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201') + mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201', self.app_conf) pub_to_topic_call.assert_called_once() @responses.activate diff --git a/components/pm-subscription-handler/tests/test_subscription.py b/components/pm-subscription-handler/tests/test_subscription.py index c357ad79..bd39f28a 100755 --- a/components/pm-subscription-handler/tests/test_subscription.py +++ b/components/pm-subscription-handler/tests/test_subscription.py @@ -27,6 +27,7 @@ from tenacity import stop_after_attempt import mod.aai_client as aai_client from mod import db, create_app from mod.network_function import NetworkFunction +from mod.pmsh_utils import AppConfig from mod.subscription import Subscription, NetworkFunctionFilter @@ -35,7 +36,8 @@ class SubscriptionTest(TestCase): @patch('mod.pmsh_utils._MrSub') @patch('mod.get_db_connection_url') @patch.object(Session, 'put') - def setUp(self, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub): + @patch('pmsh_service_main.AppConfig') + def setUp(self, mock_app_config, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub): mock_get_db_url.return_value = 'sqlite://' with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data: self.aai_response_data = data.read() @@ -61,6 +63,7 @@ class SubscriptionTest(TestCase): self.app = create_app() self.app_context = self.app.app_context() self.app_context.push() + self.mock_app_config = mock_app_config db.create_all() def tearDown(self): @@ -163,7 +166,7 @@ class SubscriptionTest(TestCase): def test_process_activate_subscription(self, mock_update_sub_status, mock_update_sub_nf, mock_add_nfs): self.sub_1.process_subscription.retry.stop = stop_after_attempt(1) - self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub) + self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config) mock_update_sub_status.assert_called() mock_add_nfs.assert_called() @@ -177,7 +180,7 @@ class SubscriptionTest(TestCase): mock_update_sub_nf): self.sub_1.administrativeState = 'LOCKED' self.sub_1.process_subscription.retry.stop = stop_after_attempt(1) - self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub) + self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config) self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called) mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName, @@ -187,4 +190,13 @@ class SubscriptionTest(TestCase): def test_process_subscription_exception(self): self.sub_1.process_subscription.retry.stop = stop_after_attempt(1) self.assertRaises(Exception, self.sub_1.process_subscription, - [self.nf_1], 'not_mr_pub') + [self.nf_1], 'not_mr_pub', 'app_config') + + def test_prepare_subscription_event(self): + with open(os.path.join(os.path.dirname(__file__), + 'data/pm_subscription_event.json'), 'r') as data: + expected_sub_event = json.load(data) + app_conf = AppConfig(**self.cbs_data_1['config']) + actual_sub_event = self.sub_1.prepare_subscription_event(self.nf_1.nf_name, app_conf) + print(actual_sub_event) + self.assertEqual(expected_sub_event, actual_sub_event) |