summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service
diff options
context:
space:
mode:
authorJoseph O'Leary <joseph.o.leary@est.tech>2020-06-17 16:04:53 +0000
committerGerrit Code Review <gerrit@onap.org>2020-06-17 16:04:53 +0000
commit20637908b156aeff53d7607f88d655d0becc1f11 (patch)
tree32977480c45d99c5be4668c176ddb2069e438fe7 /components/pm-subscription-handler/pmsh_service
parenta878f684dc8f906e0800dc1598e5214f8da2ba33 (diff)
parent80ff14860e3b8a7a2c29272c2c10c1e830c2141d (diff)
Merge "[PMSH] Improve CBS data handling"
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/__init__.py4
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_client.py29
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py21
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/network_function.py4
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py65
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py15
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/subscription.py5
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py7
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/pmsh_service_main.py19
9 files changed, 98 insertions, 71 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
index 316687c0..efc61aae 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
@@ -59,7 +59,7 @@ def create_app():
def create_logger():
config_file_path = os.getenv('LOGGER_CONFIG')
- update_config(config_file_path)
+ update_logging_config(config_file_path)
monkey.patch_loggingYaml()
logging.config.yamlConfig(filepath=config_file_path,
watchDog=os.getenv('DYNAMIC_LOGGER_CONFIG', True))
@@ -73,7 +73,7 @@ def create_logger():
logging.setLogRecordFactory(augment_record)
-def update_config(config_file_path):
+def update_logging_config(config_file_path):
config_yaml = YAML()
config_file = pathlib.Path(config_file_path)
data = config_yaml.load(config_file)
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
index 371fdb06..2b92df41 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
@@ -22,39 +22,39 @@ import requests
from requests.auth import HTTPBasicAuth
from mod import logger
+from mod.network_function import NetworkFunction
from mod.pmsh_utils import mdc_handler
-from mod.network_function import NetworkFunction, NetworkFunctionFilter
-from mod.subscription import Subscription
-def get_pmsh_subscription_data(cbs_data):
+def get_pmsh_nfs_from_aai(app_conf):
"""
- Returns the PMSH subscription data
+ Returns the Network Functions from AAI related to the Subscription.
Args:
- cbs_data: json app config from the Config Binding Service.
+ app_conf (AppConfig): the AppConfig object.
Returns:
- Subscription, set(NetworkFunctions): `Subscription` <Subscription> object,
- set of NetworkFunctions to be added.
+ set(NetworkFunctions): set of NetworkFunctions.
Raises:
- RuntimeError: if AAI data cannot be retrieved.
+ RuntimeError: if AAI Network Function data cannot be retrieved.
"""
- aai_nf_data = _get_all_aai_nf_data()
+ aai_nf_data = _get_all_aai_nf_data(app_conf)
if aai_nf_data:
- sub = Subscription(**cbs_data['policy']['subscription'])
- nfs = _filter_nf_data(aai_nf_data, NetworkFunctionFilter(**sub.nfFilter))
+ nfs = _filter_nf_data(aai_nf_data, app_conf.nf_filter)
else:
raise RuntimeError('Failed to get data from AAI')
- return sub, nfs
+ return nfs
@mdc_handler
-def _get_all_aai_nf_data(**kwargs):
+def _get_all_aai_nf_data(app_conf, **kwargs):
"""
Return queried nf data from the AAI service.
+ Args:
+ app_conf (AppConfig): the AppConfig object.
+
Returns:
dict: the json response from AAI query, else None.
"""
@@ -77,7 +77,8 @@ def _get_all_aai_nf_data(**kwargs):
}"""
params = {'format': 'simple', 'nodesOnly': 'true'}
response = session.put(aai_endpoint, headers=headers,
- auth=HTTPBasicAuth('AAI', 'AAI'),
+ auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'),
+ app_conf.aaf_creds.get('aaf_pass')),
data=json_data, params=params, verify=False)
response.raise_for_status()
if response.ok:
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
index 5aebb926..96f51431 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
@@ -20,8 +20,7 @@ import json
from enum import Enum
from mod import logger
-from mod.network_function import NetworkFunction, NetworkFunctionFilter
-from mod.subscription import AdministrativeState
+from mod.network_function import NetworkFunction
class XNFType(Enum):
@@ -34,13 +33,12 @@ class AAIEvent(Enum):
UPDATE = 'UPDATE'
-def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf):
+def process_aai_events(mr_sub, mr_pub, app, app_conf):
"""
Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active.
Args:
mr_sub (_MrSub): MR subscriber
- subscription (Subscription): The current subscription object
mr_pub (_MrPub): MR publisher
app (db): DB application
app_conf (AppConfig): the application configuration.
@@ -48,7 +46,7 @@ def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf):
app.app_context().push()
aai_events = mr_sub.get_from_topic('AAI-EVENT')
- if _aai_event_exists(aai_events):
+ if aai_events is not None and len(aai_events) != 0:
for entry in aai_events:
logger.debug(f'AAI-EVENT entry: {entry}')
entry = json.loads(entry)
@@ -60,19 +58,18 @@ def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf):
'vnf-name']
new_status = aai_xnf['orchestration-status']
- if NetworkFunctionFilter(**subscription.nfFilter).is_nf_in_filter(xnf_name, new_status):
- _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf)
+ if app_conf.nf_filter.is_nf_in_filter(xnf_name, new_status):
+ _process_event(action, new_status, xnf_name, mr_pub, app_conf)
-def _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf):
+def _process_event(action, new_status, xnf_name, mr_pub, app_conf):
if action == AAIEvent.UPDATE.value:
logger.info(f'Update event found for network function {xnf_name}')
local_xnf = NetworkFunction.get(xnf_name)
if local_xnf is None:
logger.info(f'Activating subscription for network function {xnf_name}')
- subscription.administrativeState = AdministrativeState.UNLOCKED.value
- subscription.process_subscription([NetworkFunction(
+ app_conf.subscription.process_subscription([NetworkFunction(
nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf)
else:
logger.debug(f"Update Event for network function {xnf_name} will not be processed "
@@ -81,7 +78,3 @@ def _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf)
logger.info(f'Delete event found for network function {xnf_name}')
NetworkFunction.delete(nf_name=xnf_name)
logger.info(f'{xnf_name} successfully deleted.')
-
-
-def _aai_event_exists(aai_events):
- return aai_events is not None and len(aai_events) != 0
diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
index aa39bf2c..979cc775 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
@@ -102,8 +102,8 @@ class NetworkFunctionFilter:
"""Match the nf name against regex values in Subscription.nfFilter.nfNames
Args:
- nf_name: the AAI nf name.
- orchestration_status: orchestration status of the nf
+ nf_name (str): the AAI nf name.
+ orchestration_status (str): orchestration status of the nf
Returns:
bool: True if matched, else False.
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index 01661ad0..fb6a5194 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -15,6 +15,7 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
+import threading
import uuid
from os import getenv
from threading import Timer
@@ -26,6 +27,8 @@ from requests.auth import HTTPBasicAuth
from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
from mod import logger
+from mod.network_function import NetworkFunctionFilter
+from mod.subscription import Subscription
def mdc_handler(function):
@@ -42,12 +45,40 @@ def mdc_handler(function):
return decorator
-class ConfigHandler:
- """ Handles retrieval of PMSH's configuration from Configbinding service."""
- @staticmethod
+class ThreadSafeSingleton(type):
+ _instances = {}
+ _singleton_lock = threading.Lock()
+
+ def __call__(cls, *args, **kwargs):
+ # double-checked locking pattern (https://en.wikipedia.org/wiki/Double-checked_locking)
+ if cls not in cls._instances:
+ with cls._singleton_lock:
+ if cls not in cls._instances:
+ cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
+ return cls._instances[cls]
+
+
+class AppConfig(metaclass=ThreadSafeSingleton):
+
+ def __init__(self):
+ try:
+ conf = self._get_pmsh_config()
+ except Exception:
+ raise
+ self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'),
+ 'aaf_pass': conf['config'].get('aaf_password')}
+ self.cert_path = conf['config'].get('cert_path')
+ self.key_path = conf['config'].get('key_path')
+ self.streams_subscribes = conf['config'].get('streams_subscribes')
+ self.streams_publishes = conf['config'].get('streams_publishes')
+ self.operational_policy_name = conf['config'].get('operational_policy_name')
+ self.control_loop_name = conf['config'].get('control_loop_name')
+ self.subscription = Subscription(**conf['policy']['subscription'])
+ self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter)
+
@mdc_handler
@retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
- def get_pmsh_config(**kwargs):
+ def _get_pmsh_config(self, **kwargs):
""" Retrieves PMSH's configuration from Config binding service. If a non-2xx response
is received, it retries after 2 seconds for 5 times before raising an exception.
@@ -66,17 +97,23 @@ class ConfigHandler:
logger.error(f'Failed to get config from CBS: {err}')
raise Exception
+ def refresh_config(self):
+ """
+ Update the relevant attributes of the AppConfig object.
-class AppConfig:
- def __init__(self, **kwargs):
- self.aaf_creds = {'aaf_id': kwargs.get('aaf_identity'),
- 'aaf_pass': kwargs.get('aaf_password')}
- self.cert_path = kwargs.get('cert_path')
- self.key_path = kwargs.get('key_path')
- self.streams_subscribes = kwargs.get('streams_subscribes')
- self.streams_publishes = kwargs.get('streams_publishes')
- self.operational_policy_name = kwargs.get('operational_policy_name')
- self.control_loop_name = kwargs.get('control_loop_name')
+ Raises:
+ Exception: if cbs request fails.
+ """
+ try:
+ app_conf = self._get_pmsh_config()
+ except Exception:
+ logger.debug("Failed to refresh AppConfig data")
+ raise
+ self.subscription.administrativeState = \
+ app_conf['policy']['subscription']['administrativeState']
+ self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames']
+ self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter']['swVersions']
+ logger.info("AppConfig data has been refreshed")
def get_mr_sub(self, sub_name):
"""
diff --git a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
index 5ce03691..2b917cec 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
@@ -37,9 +37,9 @@ policy_response_handle_functions = {
class PolicyResponseHandler:
- def __init__(self, mr_sub, subscription_name, app):
+ def __init__(self, mr_sub, app_conf, app):
self.mr_sub = mr_sub
- self.subscription_name = subscription_name
+ self.app_conf = app_conf
self.app = app
@retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception))
@@ -48,18 +48,19 @@ class PolicyResponseHandler:
This method polls MR for response from policy. It checks whether the message is for the
relevant subscription and then handles the response
"""
- logger.info('Polling MR started for XNF activation/deactivation policy response events.')
self.app.app_context().push()
- administrative_state = Subscription.get(self.subscription_name).status
+ administrative_state = self.app_conf.subscription.administrativeState
+ logger.info('Polling MR started for XNF activation/deactivation policy response events.')
try:
response_data = self.mr_sub.get_from_topic('policy_response_consumer')
for data in response_data:
data = json.loads(data)
- if data['status']['subscriptionName'] == self.subscription_name:
+ if data['status']['subscriptionName'] \
+ == self.app_conf.subscription.subscriptionName:
nf_name = data['status']['nfName']
response_message = data['status']['message']
- self._handle_response(self.subscription_name, administrative_state,
- nf_name, response_message)
+ self._handle_response(self.app_conf.subscription.subscriptionName,
+ administrative_state, nf_name, response_message)
except Exception as err:
raise Exception(f'Error trying to poll policy response topic on MR: {err}')
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
index be217b11..d6b17cd9 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
@@ -125,7 +125,7 @@ class Subscription:
logger.debug(f'Failed to add nf {nf.nf_name} to subscription '
f'{current_sub.subscription_name}: {e}')
logger.debug(f'Subscription {current_sub.subscription_name} now contains these XNFs:'
- f'{Subscription.get_nfs_per_subscription(current_sub.subscription_name)}')
+ f'{Subscription.get_nf_names_per_sub(current_sub.subscription_name)}')
@staticmethod
def get(subscription_name):
@@ -206,9 +206,9 @@ class Subscription:
self.update_subscription_status()
if self.administrativeState == AdministrativeState.UNLOCKED.value:
- logger.info(f'{action} subscription initiated for {self.subscriptionName}.')
action = 'Activate'
sub_nf_state = SubNfState.PENDING_CREATE.value
+ logger.info(f'{action} subscription initiated for {self.subscriptionName}.')
try:
for nf in nfs:
@@ -228,7 +228,6 @@ class Subscription:
list: NetworkFunctions per Subscription list else empty
"""
nf_per_subscriptions = NfSubRelationalModel.query.all()
-
return nf_per_subscriptions
@staticmethod
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
index be67cae6..add8be42 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
@@ -17,7 +17,6 @@
# ============LICENSE_END=====================================================
import mod.aai_client as aai
-from mod.pmsh_utils import ConfigHandler
from mod import logger
from mod.subscription import AdministrativeState
@@ -38,15 +37,15 @@ class SubscriptionHandler:
the Subscription if a change has occurred
"""
self.app.app_context().push()
- config = ConfigHandler.get_pmsh_config()
- new_administrative_state = config['policy']['subscription']['administrativeState']
+ new_administrative_state = self.app_conf.subscription.administrativeState
try:
if self.administrative_state == new_administrative_state:
logger.info('Administrative State did not change in the Config')
else:
logger.info(f'Administrative State has changed from {self.administrative_state} '
f'to {new_administrative_state}.')
- self.current_sub, self.current_nfs = aai.get_pmsh_subscription_data(config)
+ self.current_nfs = aai.get_pmsh_nfs_from_aai(self.app_conf)
+ self.current_sub = self.app_conf.subscription
self.administrative_state = new_administrative_state
self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf)
diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
index f1fb1e06..143b5c8c 100755
--- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
+++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
@@ -18,11 +18,10 @@
import sys
from signal import signal, SIGTERM
-import mod.aai_client as aai
from mod import db, create_app, launch_api_server, logger
from mod.aai_event_handler import process_aai_events
from mod.exit_handler import ExitHandler
-from mod.pmsh_utils import AppConfig, PeriodicTask, ConfigHandler
+from mod.pmsh_utils import AppConfig, PeriodicTask
from mod.policy_response_handler import PolicyResponseHandler
from mod.subscription import Subscription, AdministrativeState
from mod.subscription_handler import SubscriptionHandler
@@ -33,29 +32,27 @@ def main():
app = create_app()
app.app_context().push()
db.create_all(app=app)
- config = ConfigHandler.get_pmsh_config()
- app_conf = AppConfig(**config['config'])
-
- sub, nfs = aai.get_pmsh_subscription_data(config)
+ app_conf = AppConfig()
policy_mr_pub = app_conf.get_mr_pub('policy_pm_publisher')
policy_mr_sub = app_conf.get_mr_sub('policy_pm_subscriber')
mr_aai_event_sub = app_conf.get_mr_sub('aai_subscriber')
- subscription_in_db = Subscription.get(sub.subscriptionName)
+ subscription_in_db = Subscription.get(app_conf.subscription.subscriptionName)
administrative_state = subscription_in_db.status if subscription_in_db \
else AdministrativeState.LOCKED.value
+ app_conf_thread = PeriodicTask(10, app_conf.refresh_config)
+ app_conf_thread.start()
aai_event_thread = PeriodicTask(10, process_aai_events,
- args=(mr_aai_event_sub,
- sub, policy_mr_pub, app, app_conf))
+ args=(mr_aai_event_sub, policy_mr_pub, app, app_conf))
subscription_handler = SubscriptionHandler(administrative_state,
policy_mr_pub, app, app_conf, aai_event_thread)
- policy_response_handler = PolicyResponseHandler(policy_mr_sub, sub.subscriptionName, app)
+ policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app)
subscription_handler_thread = PeriodicTask(30, subscription_handler.execute)
policy_response_handler_thread = PeriodicTask(5, policy_response_handler.poll_policy_topic)
subscription_handler_thread.start()
policy_response_handler_thread.start()
- periodic_tasks = [aai_event_thread, subscription_handler_thread,
+ periodic_tasks = [app_conf_thread, aai_event_thread, subscription_handler_thread,
policy_response_handler_thread]
signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks,