summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service
diff options
context:
space:
mode:
authorSagarS <sagar.shetty@est.tech>2021-11-09 17:36:43 +0000
committerSagarS <sagar.shetty@est.tech>2021-12-02 14:49:11 +0000
commit24454771f523f65fce5e3dce91cfe8cd15fc8be7 (patch)
tree4911a6b473cb0c2e2f566f243b95a6dd44719120 /components/pm-subscription-handler/pmsh_service
parent7e3042157d736e1f81618b92afc3bab501755a31 (diff)
[DCAEGEN2] PMSH Response Event Handler Integration
Issue-ID: DCAEGEN2-2915 Change-Id: I95b34a7b5b011760ae30c1485925dc19fde5e6c8 Signed-off-by: SagarS <sagar.shetty@est.tech>
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py55
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py7
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py71
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/pmsh_service_main.py3
4 files changed, 97 insertions, 39 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py
index 733d803e..cc07cc03 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py
@@ -18,8 +18,8 @@
from mod.api.db_models import MeasurementGroupModel, NfMeasureGroupRelationalModel
from mod import db, logger
-from mod.subscription import SubNfState
from mod.api.services import nf_service
+from mod.network_function import NetworkFunction
from mod.pmsh_config import MRTopic, AppConfig
@@ -47,18 +47,19 @@ def save_measurement_group(measurement_group, subscription_name):
return new_measurement_group
-def apply_nf_to_measgroup(nf_name, measurement_group_name):
+def apply_nf_status_to_measurement_group(nf_name, measurement_group_name, status):
"""
Associate and saves the measurement group with Network function
Args:
nf_name (string): Network function name.
measurement_group_name (string): Measurement group name
+ status (string): nf status to apply on measurement group
"""
new_nf_measure_grp_rel = NfMeasureGroupRelationalModel(
measurement_grp_name=measurement_group_name,
nf_name=nf_name,
- nf_measure_grp_status=SubNfState.PENDING_CREATE.value
+ nf_measure_grp_status=status
)
db.session.add(new_nf_measure_grp_rel)
@@ -86,3 +87,51 @@ def publish_measurement_group(sub_model, measurement_group, nf):
}
logger.debug(f'Event Body: {event_body}')
AppConfig.get_instance().publish_to_topic(MRTopic.POLICY_PM_PUBLISHER.value, event_body)
+
+
+def update_measurement_group_nf_status(measurement_group_name, status, nf_name):
+ """ Updates the status of a measurement grp for a particular nf
+
+ Args:
+ measurement_group_name (string): Measurement group name
+ nf_name (string): The network function name
+ status (string): status of the network function for measurement group
+ """
+ try:
+ logger.info(f'Performing update for measurement group name: {measurement_group_name},'
+ f' network function name: {nf_name} on status: {status}')
+ NfMeasureGroupRelationalModel.query.filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == measurement_group_name,
+ NfMeasureGroupRelationalModel.nf_name == nf_name). \
+ update({NfMeasureGroupRelationalModel.nf_measure_grp_status: status},
+ synchronize_session='evaluate')
+ db.session.commit()
+ except Exception as e:
+ logger.error(f'Failed to update nf: {nf_name} for measurement group: '
+ f'{measurement_group_name} due to: {e}')
+
+
+def delete_nf_to_measurement_group(nf_name, measurement_group_name, status):
+ """ Deletes a particular nf related to a measurement group name and
+ if no more relations of nf exist to measurement group then delete nf from PMSH
+
+ Args:
+ nf_name (string): The network function name
+ measurement_group_name (string): Measurement group name
+ status (string): status of the network function for measurement group
+ """
+ try:
+ logger.info(f'Performing delete for measurement group name: {measurement_group_name},'
+ f' network function name: {nf_name} on status: {status}')
+ nf_measurement_group_rel = NfMeasureGroupRelationalModel.query.filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == measurement_group_name,
+ NfMeasureGroupRelationalModel.nf_name == nf_name).one_or_none()
+ db.session.delete(nf_measurement_group_rel)
+ db.session.commit()
+ nf_relations = NfMeasureGroupRelationalModel.query.filter(
+ NfMeasureGroupRelationalModel.nf_name == nf_name).all()
+ if not nf_relations:
+ NetworkFunction.delete(nf_name=nf_name)
+ except Exception as e:
+ logger.error(f'Failed to delete nf: {nf_name} for measurement group: '
+ f'{measurement_group_name} due to: {e}')
diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py
index 96d50c27..c33c82fb 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py
@@ -21,7 +21,7 @@ from mod.api.db_models import SubscriptionModel, NfSubRelationalModel, \
NetworkFunctionFilterModel, NetworkFunctionModel
from mod.api.services import measurement_group_service, nf_service
from mod.api.custom_exception import InvalidDataException, DuplicateDataException
-from mod.subscription import AdministrativeState
+from mod.subscription import AdministrativeState, SubNfState
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload
@@ -143,8 +143,9 @@ def apply_measurement_grp_to_nfs(filtered_nfs, measurement_groups):
for nf in filtered_nfs:
logger.info(f'Saving measurement group to nf name, measure_grp_name: {nf.nf_name},'
f'{measurement_group.measurement_group_name}')
- measurement_group_service.apply_nf_to_measgroup(
- nf.nf_name, measurement_group.measurement_group_name)
+ measurement_group_service.apply_nf_status_to_measurement_group(
+ nf.nf_name, measurement_group.measurement_group_name,
+ SubNfState.PENDING_CREATE.value)
else:
logger.info(f'No nfs added as measure_grp_name: '
f'{measurement_group.measurement_group_name} is LOCKED')
diff --git a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
index 09c97047..1bc58081 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2020 Nordix Foundation.
+# Copyright (C) 2020-2021 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,33 +15,31 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-
import json
-
+from mod.pmsh_config import MRTopic, AppConfig
from mod import logger
-from mod.network_function import NetworkFunction
-from mod.subscription import Subscription, AdministrativeState, subscription_nf_states
+from mod.subscription import AdministrativeState, subscription_nf_states
+from mod.api.db_models import MeasurementGroupModel
+from mod.api.services import measurement_group_service
policy_response_handle_functions = {
AdministrativeState.LOCKED.value: {
- 'success': NetworkFunction.delete,
- 'failed': Subscription.update_sub_nf_status
+ 'success': measurement_group_service.delete_nf_to_measurement_group,
+ 'failed': measurement_group_service.update_measurement_group_nf_status
},
AdministrativeState.UNLOCKED.value: {
- 'success': Subscription.update_sub_nf_status,
- 'failed': Subscription.update_sub_nf_status
+ 'success': measurement_group_service.update_measurement_group_nf_status,
+ 'failed': measurement_group_service.update_measurement_group_nf_status
},
AdministrativeState.LOCKING.value: {
- 'success': NetworkFunction.delete,
- 'failed': Subscription.update_sub_nf_status
+ 'success': measurement_group_service.delete_nf_to_measurement_group,
+ 'failed': measurement_group_service.update_measurement_group_nf_status
}
}
class PolicyResponseHandler:
- def __init__(self, mr_sub, app_conf, app):
- self.mr_sub = mr_sub
- self.app_conf = app_conf
+ def __init__(self, app):
self.app = app
def poll_policy_topic(self):
@@ -50,38 +48,49 @@ class PolicyResponseHandler:
relevant subscription and then handles the response
"""
self.app.app_context().push()
- administrative_state = self.app_conf.subscription.administrativeState
logger.info('Polling MR for XNF activation/deactivation policy response events.')
try:
- response_data = self.mr_sub.get_from_topic('dcae_pmsh_policy_cl_input')
+ response_data = AppConfig.get_instance(). \
+ get_from_topic(MRTopic.POLICY_PM_SUBSCRIBER.value, 'dcae_pmsh_policy_cl_input')
for data in response_data:
data = json.loads(data)
- if data['status']['subscriptionName'] \
- == self.app_conf.subscription.subscriptionName:
- nf_name = data['status']['nfName']
- response_message = data['status']['message']
- self._handle_response(self.app_conf.subscription.subscriptionName,
- administrative_state, nf_name, response_message)
+ measurement_group_name = data['status']['measurementGroupName']
+ subscription_name = data['status']['subscriptionName']
+ measurement_group = (MeasurementGroupModel.query.filter(
+ MeasurementGroupModel.measurement_group_name == measurement_group_name,
+ subscription_name == MeasurementGroupModel
+ .subscription_name).one_or_none())
+ nf_name = data['status']['nfName']
+ response_message = data['status']['message']
+ if measurement_group:
+ self._handle_response(measurement_group_name,
+ measurement_group.administrative_state,
+ nf_name, response_message)
+ else:
+ logger.info(f'Polled MR response provides missing measurement '
+ f'group name : {measurement_group_name}')
except Exception as err:
logger.error(f'Error trying to poll policy response topic on MR: {err}', exc_info=True)
@staticmethod
- def _handle_response(subscription_name, administrative_state, nf_name, response_message):
+ def _handle_response(measurement_group_name, administrative_state, nf_name, response_message):
"""
Handles the response from Policy, updating the DB
-
Args:
- subscription_name (str): The subscription name
- administrative_state (str): The administrative state of the subscription
- nf_name (str): The network function name
- response_message (str): The message in the response regarding the state (success|failed)
+ measurement_group_name (string): The measurement group name
+ administrative_state (string): The administrative state of the measurement group
+ nf_name (string): The network function name
+ response_message (string): The message in the response
+ regarding the state (success|failed)
"""
- logger.info(f'Response from MR: Sub: {subscription_name} for '
+ logger.info(f'Response from MR: measurement group name: {measurement_group_name} for '
f'NF: {nf_name} received, updating the DB')
try:
- sub_nf_status = subscription_nf_states[administrative_state][response_message].value
+ nf_measure_grp_status = subscription_nf_states[administrative_state][response_message]\
+ .value
policy_response_handle_functions[administrative_state][response_message](
- subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name)
+ measurement_group_name=measurement_group_name, status=nf_measure_grp_status,
+ nf_name=nf_name)
except Exception as err:
logger.error(f'Error changing nf_sub status in the DB: {err}')
raise
diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
index 0b6544b5..fe151c0d 100755
--- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
+++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
@@ -35,13 +35,12 @@ def main():
app_conf = AppConfig()
pmsh_app_conf = NewAppConfig()
policy_mr_pub = app_conf.get_mr_pub('policy_pm_publisher')
- policy_mr_sub = app_conf.get_mr_sub('policy_pm_subscriber')
aai_event_mr_sub = app_conf.get_mr_sub('aai_subscriber')
except Exception as e:
logger.error(f'Failed to get config and create application: {e}', exc_info=True)
sys.exit(e)
- policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app)
+ policy_response_handler = PolicyResponseHandler(app)
policy_response_handler_thread = PeriodicTask(25, policy_response_handler.poll_policy_topic)
policy_response_handler_thread.name = 'policy_event_thread'
logger.info('Start polling PMSH_CL_INPUT topic on DMaaP MR.')