diff options
author | emartin <ephraim.martin@est.tech> | 2020-02-27 13:56:52 +0000 |
---|---|---|
committer | emartin <ephraim.martin@est.tech> | 2020-02-28 11:24:53 +0000 |
commit | c19a0a85bbbc8dcf0633a32d26f4128f6c8c4544 (patch) | |
tree | 234c14ca8729b852f0ea229483807890b8f96362 /components/pm-subscription-handler/pmsh_service | |
parent | 82a39f7da3177a9b9b700c7291ed5ea47c90e478 (diff) |
Update PM subscription event for PMSH
* Add control loop name
* Remove invariant id reference
Issue-ID: DCAEGEN2-2100
Signed-off-by: emartin <ephraim.martin@est.tech>
Change-Id: I6bbb757e07f3d930ecd28bd3106df307a264ff65
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
4 files changed, 25 insertions, 18 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index f8254e52..9d69e760 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -39,7 +39,7 @@ class OrchestrationStatus(Enum): INVENTORIED = 'Inventoried' -def process_aai_events(mr_sub, subscription, mr_pub, app): +def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf): """ Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active. @@ -48,6 +48,7 @@ def process_aai_events(mr_sub, subscription, mr_pub, app): subscription (Subscription): The current subscription object mr_pub (_MrPub): MR publisher app (db): DB application + app_conf (AppConfig): the application configuration. """ app.app_context().push() aai_events = mr_sub.get_from_topic('AAI-EVENT') @@ -65,10 +66,10 @@ def process_aai_events(mr_sub, subscription, mr_pub, app): new_status = aai_xnf['orchestration-status'] if NetworkFunctionFilter(**subscription.nfFilter).is_nf_in_filter(xnf_name): - _process_event(action, new_status, xnf_name, subscription, mr_pub) + _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf) -def _process_event(action, new_status, xnf_name, subscription, mr_pub): +def _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf): if action == AAIEvent.UPDATE.value: logger.debug(f'Update event found for network function {xnf_name}') local_xnf = NetworkFunction.get(xnf_name) @@ -76,7 +77,7 @@ def _process_event(action, new_status, xnf_name, subscription, mr_pub): if local_xnf is None: logger.debug(f'Activating subscription for network function {xnf_name}') subscription.process_subscription([NetworkFunction( - nf_name=xnf_name, orchestration_status=new_status)], mr_pub) + nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf) else: logger.debug(f"Update Event for network function {xnf_name} will not be processed " f" as it's state is set to {local_xnf.orchestration_status}.") diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 9ff0c653..c8b3bc77 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -37,6 +37,8 @@ class AppConfig: self.key_path = kwargs.get('key_path') self.streams_subscribes = kwargs.get('streams_subscribes') self.streams_publishes = kwargs.get('streams_publishes') + self.operational_policy_name = kwargs.get('operational_policy_name') + self.control_loop_name = kwargs.get('control_loop_name') def get_mr_sub(self, sub_name): """ @@ -127,16 +129,17 @@ class _MrPub(_DmaapMrClient): logger.debug(e) raise - def publish_subscription_event_data(self, subscription, xnf_name): + def publish_subscription_event_data(self, subscription, xnf_name, app_conf): """ Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic. Args: subscription: the `Subscription` <Subscription> object. xnf_name: the xnf to include in the event. + app_conf (AppConfig): the application configuration. """ try: - subscription_event = subscription.prepare_subscription_event(xnf_name) + subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf) self.publish_to_topic(subscription_event) except Exception as e: logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}') @@ -249,6 +252,7 @@ class PeriodicTask(Timer): """ See :class:`Timer`. """ + def run(self): while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs) diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index 7a0b88c1..5449f420 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -43,25 +43,26 @@ class Subscription: self.administrativeState = kwargs.get('administrativeState') self.fileBasedGP = kwargs.get('fileBasedGP') self.fileLocation = kwargs.get('fileLocation') - self.nfTypeModelInvariantId = kwargs.get('nfTypeModelInvariantId') self.nfFilter = kwargs.get('nfFilter') self.measurementGroups = kwargs.get('measurementGroups') - def prepare_subscription_event(self, xnf_name): + def prepare_subscription_event(self, xnf_name, app_conf): """Prepare the sub event for publishing Args: xnf_name: the AAI xnf name. + app_conf (AppConfig): the application configuration. Returns: dict: the Subscription event to be published. """ clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} - clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}', - 'changeType': 'DELETE' - if self.administrativeState == AdministrativeState.LOCKED.value - else 'CREATE'}) - return clean_sub + sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name, + 'changeType': 'DELETE' + if self.administrativeState == AdministrativeState.LOCKED.value + else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name, + 'subscription': clean_sub} + return sub_event def create(self): """ Creates a subscription database entry @@ -159,7 +160,7 @@ class Subscription: @retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3), retry=retry_if_exception_type(Exception)) - def process_subscription(self, nfs, mr_pub): + def process_subscription(self, nfs, mr_pub, app_conf): action = 'Deactivate' sub_nf_state = SubNfState.PENDING_DELETE.value self.update_subscription_status() @@ -170,7 +171,7 @@ class Subscription: try: for nf in nfs: - mr_pub.publish_subscription_event_data(self, nf.nf_name) + mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf) logger.debug(f'Publishing Event to {action} ' f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') self.add_network_functions_to_subscription(nfs) diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index 31d1d079..8245466b 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -42,6 +42,7 @@ def subscription_processor(config_handler, administrative_state, mr_pub, app, """ app.app_context().push() config = config_handler.get_config() + app_conf = AppConfig(**config['config']) new_administrative_state = config['policy']['subscription']['administrativeState'] polling_period = 30.0 @@ -52,9 +53,9 @@ def subscription_processor(config_handler, administrative_state, mr_pub, app, logger.debug(f'Administrative State changed from "{administrative_state}" "to ' f'"{new_administrative_state}".') sub, nfs = aai.get_pmsh_subscription_data(config) - sub.process_subscription(nfs, mr_pub) - aai_event_thread = PeriodicTask(10, process_aai_events, args=(mr_aai_event_subscriber, - sub, mr_pub, app)) + sub.process_subscription(nfs, mr_pub, app_conf) + aai_event_thread = PeriodicTask(10, process_aai_events, args=( + mr_aai_event_subscriber, sub, mr_pub, app, app_conf)) if new_administrative_state == AdministrativeState.UNLOCKED.value: logger.debug('Listening to AAI-EVENT topic in MR.') |