summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod
diff options
context:
space:
mode:
authoremartin <ephraim.martin@est.tech>2020-02-27 13:56:52 +0000
committeremartin <ephraim.martin@est.tech>2020-02-28 11:24:53 +0000
commitc19a0a85bbbc8dcf0633a32d26f4128f6c8c4544 (patch)
tree234c14ca8729b852f0ea229483807890b8f96362 /components/pm-subscription-handler/pmsh_service/mod
parent82a39f7da3177a9b9b700c7291ed5ea47c90e478 (diff)
Update PM subscription event for PMSH
* Add control loop name * Remove invariant id reference Issue-ID: DCAEGEN2-2100 Signed-off-by: emartin <ephraim.martin@est.tech> Change-Id: I6bbb757e07f3d930ecd28bd3106df307a264ff65
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod')
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py9
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py8
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/subscription.py19
3 files changed, 21 insertions, 15 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
index f8254e52..9d69e760 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
@@ -39,7 +39,7 @@ class OrchestrationStatus(Enum):
INVENTORIED = 'Inventoried'
-def process_aai_events(mr_sub, subscription, mr_pub, app):
+def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf):
"""
Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active.
@@ -48,6 +48,7 @@ def process_aai_events(mr_sub, subscription, mr_pub, app):
subscription (Subscription): The current subscription object
mr_pub (_MrPub): MR publisher
app (db): DB application
+ app_conf (AppConfig): the application configuration.
"""
app.app_context().push()
aai_events = mr_sub.get_from_topic('AAI-EVENT')
@@ -65,10 +66,10 @@ def process_aai_events(mr_sub, subscription, mr_pub, app):
new_status = aai_xnf['orchestration-status']
if NetworkFunctionFilter(**subscription.nfFilter).is_nf_in_filter(xnf_name):
- _process_event(action, new_status, xnf_name, subscription, mr_pub)
+ _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf)
-def _process_event(action, new_status, xnf_name, subscription, mr_pub):
+def _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf):
if action == AAIEvent.UPDATE.value:
logger.debug(f'Update event found for network function {xnf_name}')
local_xnf = NetworkFunction.get(xnf_name)
@@ -76,7 +77,7 @@ def _process_event(action, new_status, xnf_name, subscription, mr_pub):
if local_xnf is None:
logger.debug(f'Activating subscription for network function {xnf_name}')
subscription.process_subscription([NetworkFunction(
- nf_name=xnf_name, orchestration_status=new_status)], mr_pub)
+ nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf)
else:
logger.debug(f"Update Event for network function {xnf_name} will not be processed "
f" as it's state is set to {local_xnf.orchestration_status}.")
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index 9ff0c653..c8b3bc77 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -37,6 +37,8 @@ class AppConfig:
self.key_path = kwargs.get('key_path')
self.streams_subscribes = kwargs.get('streams_subscribes')
self.streams_publishes = kwargs.get('streams_publishes')
+ self.operational_policy_name = kwargs.get('operational_policy_name')
+ self.control_loop_name = kwargs.get('control_loop_name')
def get_mr_sub(self, sub_name):
"""
@@ -127,16 +129,17 @@ class _MrPub(_DmaapMrClient):
logger.debug(e)
raise
- def publish_subscription_event_data(self, subscription, xnf_name):
+ def publish_subscription_event_data(self, subscription, xnf_name, app_conf):
"""
Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
Args:
subscription: the `Subscription` <Subscription> object.
xnf_name: the xnf to include in the event.
+ app_conf (AppConfig): the application configuration.
"""
try:
- subscription_event = subscription.prepare_subscription_event(xnf_name)
+ subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf)
self.publish_to_topic(subscription_event)
except Exception as e:
logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
@@ -249,6 +252,7 @@ class PeriodicTask(Timer):
"""
See :class:`Timer`.
"""
+
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
index 7a0b88c1..5449f420 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
@@ -43,25 +43,26 @@ class Subscription:
self.administrativeState = kwargs.get('administrativeState')
self.fileBasedGP = kwargs.get('fileBasedGP')
self.fileLocation = kwargs.get('fileLocation')
- self.nfTypeModelInvariantId = kwargs.get('nfTypeModelInvariantId')
self.nfFilter = kwargs.get('nfFilter')
self.measurementGroups = kwargs.get('measurementGroups')
- def prepare_subscription_event(self, xnf_name):
+ def prepare_subscription_event(self, xnf_name, app_conf):
"""Prepare the sub event for publishing
Args:
xnf_name: the AAI xnf name.
+ app_conf (AppConfig): the application configuration.
Returns:
dict: the Subscription event to be published.
"""
clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
- clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}',
- 'changeType': 'DELETE'
- if self.administrativeState == AdministrativeState.LOCKED.value
- else 'CREATE'})
- return clean_sub
+ sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
+ 'changeType': 'DELETE'
+ if self.administrativeState == AdministrativeState.LOCKED.value
+ else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
+ 'subscription': clean_sub}
+ return sub_event
def create(self):
""" Creates a subscription database entry
@@ -159,7 +160,7 @@ class Subscription:
@retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3),
retry=retry_if_exception_type(Exception))
- def process_subscription(self, nfs, mr_pub):
+ def process_subscription(self, nfs, mr_pub, app_conf):
action = 'Deactivate'
sub_nf_state = SubNfState.PENDING_DELETE.value
self.update_subscription_status()
@@ -170,7 +171,7 @@ class Subscription:
try:
for nf in nfs:
- mr_pub.publish_subscription_event_data(self, nf.nf_name)
+ mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
logger.debug(f'Publishing Event to {action} '
f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
self.add_network_functions_to_subscription(nfs)