summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service
diff options
context:
space:
mode:
authorERIMROB <robertas.rimkus@est.tech>2020-02-12 11:35:20 +0000
committerERIMROB <robertas.rimkus@est.tech>2020-02-20 16:03:48 +0000
commit26b76c02052269ea850d8d4efd6deb536115a0af (patch)
treef7485d7ccd0e7d95c000b9c05bce2c371c34581a /components/pm-subscription-handler/pmsh_service
parentd42ac06c733c43e19a01b4203c1b987b4973ccfd (diff)
Add Support for Activation and Deactivation
* Add support for reconfiguration of the administrativeState field * Add support for policy feedback handling * Fix network function filter applying to non active network functions Signed-off-by: ERIMROB <robertas.rimkus@est.tech> Change-Id: Ic1cfc3207b2495c1d8d10acd0ed1c40114cf4643 Issue-ID: DCAEGEN2-1830
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_client.py6
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/config_handler.py25
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/network_function.py18
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py5
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py73
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/subscription.py77
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/pmsh_service.py49
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/pmsh_service_main.py94
8 files changed, 270 insertions, 77 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
index 747846f1..f0f20566 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
@@ -119,10 +119,12 @@ def _filter_nf_data(nf_data, nf_filter):
try:
for nf in nf_data['results']:
name_identifier = 'pnf-name' if nf['node-type'] == 'pnf' else 'vnf-name'
- if nf_filter.is_nf_in_filter(nf['properties'].get(name_identifier)):
+ orchestration_status = nf['properties'].get('orchestration-status')
+ if nf_filter.is_nf_in_filter(nf['properties'].get(name_identifier)) \
+ and orchestration_status == 'Active':
nf_set.add(NetworkFunction(
nf_name=nf['properties'].get(name_identifier),
- orchestration_status=nf['properties'].get('orchestration-status')))
+ orchestration_status=orchestration_status))
except KeyError as e:
logger.debug(f'Failed to parse AAI data: {e}')
raise
diff --git a/components/pm-subscription-handler/pmsh_service/mod/config_handler.py b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py
index 1ce4b701..acf5b76f 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/config_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py
@@ -15,12 +15,10 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-
-import json
from os import environ
import requests
-from tenacity import retry, wait_fixed, stop_after_attempt
+from tenacity import retry, wait_fixed, stop_after_attempt, retry_if_exception_type
import mod.pmsh_logging as logger
@@ -45,7 +43,7 @@ class ConfigHandler:
def hostname(self):
return _get_environment_variable('HOSTNAME')
- @retry(wait=wait_fixed(2), stop=stop_after_attempt(5))
+ @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
def get_config(self):
""" Retrieves PMSH's configuration from Configbinding service. If a non-2xx response
is received, it retries after 2 seconds for 5 times before raising an exception.
@@ -56,18 +54,15 @@ class ConfigHandler:
Raises:
Exception: If any error occurred pulling configuration from Configbinding service.
"""
- if self._config is None:
- logger.debug('No configuration found, pulling from Configbinding Service.')
- try:
- response = requests.get(self.cbs_url)
- response.raise_for_status()
- self._config = response.json()
- logger.debug(f'PMSH Configuration from Configbinding Service: {self._config}')
- return json.loads(self._config)
- except Exception as err:
- raise Exception(f'Error retrieving configuration from CBS: {err}')
- else:
+
+ try:
+ response = requests.get(self.cbs_url)
+ response.raise_for_status()
+ self._config = response.json()
+ logger.debug(f'PMSH Configuration from Configbinding Service: {self._config}')
return self._config
+ except Exception as err:
+ raise Exception(f'Error retrieving configuration from CBS: {err}')
def _get_environment_variable(env_var_key):
diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
index 64f614af..9f21cc66 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
@@ -15,16 +15,13 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-
from mod import pmsh_logging as logger, db
from mod.db_models import NetworkFunctionModel
class NetworkFunction:
def __init__(self, **kwargs):
- """
- Object representation of the NetworkFunction.
- """
+ """ Object representation of the NetworkFunction. """
self.nf_name = kwargs.get('nf_name')
self.orchestration_status = kwargs.get('orchestration_status')
@@ -36,8 +33,7 @@ class NetworkFunction:
return f'nf-name: {self.nf_name}, orchestration-status: {self.orchestration_status}'
def create(self):
- """ Creates a NetworkFunction database entry
- """
+ """ Creates a NetworkFunction database entry """
existing_nf = NetworkFunctionModel.query.filter(
NetworkFunctionModel.nf_name == self.nf_name).one_or_none()
@@ -71,3 +67,13 @@ class NetworkFunction:
list: NetworkFunctionModel objects else empty
"""
return NetworkFunctionModel.query.all()
+
+ @staticmethod
+ def delete(**kwargs):
+ """ Deletes a network function from the database """
+ nf_name = kwargs['nf_name']
+ NetworkFunctionModel.query.filter(
+ NetworkFunctionModel.nf_name == nf_name). \
+ delete(synchronize_session='evaluate')
+
+ db.session.commit()
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py
index f2d11d49..885644b4 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2019 Nordix Foundation.
+# Copyright (C) 2019-2020 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,11 +15,10 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-
+import datetime
import logging as log
from logging.handlers import RotatingFileHandler
from os import makedirs
-import datetime
# These loggers will be overwritten with EELF logging when running in Docker
_AUDIT_LOGGER = log.getLogger("defaultlogger")
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index b665691d..4a77543b 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -15,12 +15,17 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
+import json
+import threading
import uuid
import requests
from requests.auth import HTTPBasicAuth
+from tenacity import retry, wait_fixed, retry_if_exception_type
import mod.pmsh_logging as logger
+from mod.subscription import Subscription, SubNfState, AdministrativeState
+from mod.network_function import NetworkFunction
class AppConfig:
@@ -168,3 +173,71 @@ class _MrSub(_DmaapMrClient):
except Exception as e:
logger.debug(e)
return topic_data
+
+ @staticmethod
+ def _handle_response(subscription_name, administrative_state, nf_name, response_message):
+ """
+ Handles the response from Policy, updating the DB
+
+ Args:
+ subscription_name (str): The subscription name
+ administrative_state (str): The administrative state of the subscription
+ nf_name (str): The network function name
+ response_message (str): The message in the response regarding the state (success|failed)
+ """
+ logger.debug(f'Response from MR: Sub: {subscription_name} for '
+ f'NF: {nf_name} received, updating the DB')
+ try:
+ sub_nf_status = subscription_nf_states[administrative_state][response_message].value
+ policy_response_handle_functions[administrative_state][response_message](
+ subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name)
+ except Exception as err:
+ raise Exception(f'Error changing nf_sub status in the DB: {err}')
+
+ @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception))
+ def poll_policy_topic(self, subscription_name, app):
+ """
+ This method polls MR for response from policy. It checks whether the message is for the
+ relevant subscription and then handles the response
+
+ Args:
+ subscription_name (str): The subscription name
+ app (app): Needed to push context for the db
+ """
+ app.app_context().push()
+ administrative_state = Subscription.get(subscription_name).status
+ try:
+ response_data = self.get_from_topic('policy_response_consumer')
+ for data in response_data:
+ data = json.loads(data)
+ if data['status']['subscriptionName'] == subscription_name:
+ nf_name = data['status']['nfName']
+ response_message = data['status']['message']
+ self._handle_response(subscription_name, administrative_state,
+ nf_name, response_message)
+ threading.Timer(5, self.poll_policy_topic, [subscription_name, app]).start()
+ except Exception as err:
+ raise Exception(f'Error trying to poll MR: {err}')
+
+
+subscription_nf_states = {
+ AdministrativeState.LOCKED.value: {
+ 'success': SubNfState.CREATED,
+ 'failed': SubNfState.DELETE_FAILED
+ },
+ AdministrativeState.UNLOCKED.value: {
+ 'success': SubNfState.CREATED,
+ 'failed': SubNfState.CREATE_FAILED
+ }
+}
+
+policy_response_handle_functions = {
+ AdministrativeState.LOCKED.value: {
+ 'success': NetworkFunction.delete,
+ 'failed': Subscription.update_sub_nf_status
+ },
+ AdministrativeState.UNLOCKED.value: {
+ 'success': Subscription.update_sub_nf_status,
+ 'failed': Subscription.update_sub_nf_status
+ }
+}
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
index 265d90b8..031609aa 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
@@ -16,10 +16,25 @@
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
import re
+from enum import Enum
import mod.pmsh_logging as logger
from mod import db
from mod.db_models import SubscriptionModel, NfSubRelationalModel
+from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt
+
+
+class SubNfState(Enum):
+ PENDING_CREATE = 'PENDING_CREATE'
+ CREATE_FAILED = 'CREATE_FAILED'
+ CREATED = 'CREATED'
+ PENDING_DELETE = 'PENDING_DELETE'
+ DELETE_FAILED = 'DELETE_FAILED'
+
+
+class AdministrativeState(Enum):
+ UNLOCKED = 'UNLOCKED'
+ LOCKED = 'LOCKED'
class Subscription:
@@ -42,7 +57,10 @@ class Subscription:
dict: the Subscription event to be published.
"""
clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
- clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}'})
+ clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}',
+ 'changeType': 'DELETE'
+ if self.administrativeState == AdministrativeState.LOCKED.value
+ else 'CREATE'})
return clean_sub
def create(self):
@@ -84,7 +102,8 @@ class Subscription:
NfSubRelationalModel.subscription_name == current_sub.subscription_name,
NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none()
if existing_entry is None:
- new_nf_sub = NfSubRelationalModel(current_sub.subscription_name, nf.nf_name)
+ new_nf_sub = NfSubRelationalModel(current_sub.subscription_name,
+ nf.nf_name, SubNfState.PENDING_CREATE.value)
new_nf_sub.nf = current_nf
logger.debug(current_nf)
current_sub.nfs.append(new_nf_sub)
@@ -114,6 +133,44 @@ class Subscription:
"""
return SubscriptionModel.query.all()
+ def update_subscription_status(self):
+ """ Updates the status of subscription in subscription table """
+ SubscriptionModel.query.filter(
+ SubscriptionModel.subscription_name == self.subscriptionName). \
+ update({SubscriptionModel.status: self.administrativeState},
+ synchronize_session='evaluate')
+
+ db.session.commit()
+
+ def delete_subscription(self):
+ """ Deletes a subscription from the database """
+ SubscriptionModel.query.filter(
+ SubscriptionModel.subscription_name == self.subscriptionName). \
+ delete(synchronize_session='evaluate')
+
+ db.session.commit()
+
+ @retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3),
+ retry=retry_if_exception_type(Exception))
+ def process_subscription(self, nfs, mr_pub):
+ action = 'Deactivate'
+ sub_nf_state = SubNfState.PENDING_DELETE.value
+ self.update_subscription_status()
+
+ if self.administrativeState == AdministrativeState.UNLOCKED.value:
+ action = 'Activate'
+ sub_nf_state = SubNfState.PENDING_CREATE.value
+
+ try:
+ for nf in nfs:
+ mr_pub.publish_subscription_event_data(self, nf.nf_name)
+ logger.debug(f'Publishing Event to {action} '
+ f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+ self.add_network_functions_to_subscription(nfs)
+ self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name)
+ except Exception as err:
+ raise Exception(f'Error publishing activation event to MR: {err}')
+
@staticmethod
def get_all_nfs_subscription_relations():
""" Retrieves all network function to subscription relations
@@ -125,6 +182,22 @@ class Subscription:
return nf_per_subscriptions
+ @staticmethod
+ def update_sub_nf_status(subscription_name, status, nf_name):
+ """ Updates the status of the subscription for a particular nf
+
+ Args:
+ subscription_name (str): The subscription name
+ nf_name (str): The network function name
+ status (str): Status of the subscription
+ """
+ NfSubRelationalModel.query.filter(
+ NfSubRelationalModel.subscription_name == subscription_name,
+ NfSubRelationalModel.nf_name == nf_name). \
+ update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate')
+
+ db.session.commit()
+
class NetworkFunctionFilter:
def __init__(self, **kwargs):
diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service.py b/components/pm-subscription-handler/pmsh_service/pmsh_service.py
deleted file mode 100755
index c564a5e3..00000000
--- a/components/pm-subscription-handler/pmsh_service/pmsh_service.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# ============LICENSE_START===================================================
-# Copyright (C) 2019-2020 Nordix Foundation.
-# ============================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# SPDX-License-Identifier: Apache-2.0
-# ============LICENSE_END=====================================================
-import sys
-import time
-
-import mod.aai_client as aai_client
-import mod.pmsh_logging as logger
-from mod import db, create_app
-from mod.config_handler import ConfigHandler
-from mod.subscription import Subscription
-
-
-def main():
-
- try:
- app = create_app()
- app.app_context().push()
- db.create_all(app=app)
-
- config_handler = ConfigHandler()
- cbs_data = config_handler.get_config()
- subscription, xnfs = aai_client.get_pmsh_subscription_data(cbs_data)
- subscription.add_network_functions_to_subscription(xnfs)
- except Exception as e:
- logger.debug(f'Failed to Init PMSH: {e}')
- sys.exit(e)
-
- while True:
- logger.debug(Subscription.get_all_nfs_subscription_relations())
- time.sleep(5)
-
-
-if __name__ == '__main__':
- main()
diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
new file mode 100755
index 00000000..ab330320
--- /dev/null
+++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
@@ -0,0 +1,94 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2019-2020 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+import sys
+import time
+import threading
+
+import mod.aai_client as aai
+import mod.pmsh_logging as logger
+from mod import db, create_app
+from mod.config_handler import ConfigHandler
+from mod.pmsh_utils import AppConfig
+from mod.subscription import Subscription, AdministrativeState
+
+
+def subscription_processor(config_handler, administrative_state, mr_pub, app):
+ """
+ Checks for changes of administrative state in config and proceeds to process
+ the Subscription if a change has occurred
+
+ Args:
+ config_handler (ConfigHandler): Configuration Handler used to get config
+ administrative_state (str): The administrative state
+ mr_pub (_MrPub): MR publisher
+ app (db): DB application
+ """
+ app.app_context().push()
+ config = config_handler.get_config()
+ sub, nfs = aai.get_pmsh_subscription_data(config)
+ new_administrative_state = config['policy']['subscription']['administrativeState']
+ polling_period = 30.0
+
+ try:
+ if administrative_state == new_administrative_state:
+ logger.debug('Administrative State did not change in the Config')
+ else:
+ sub.process_subscription(nfs, mr_pub)
+
+ except Exception as err:
+ logger.debug(f'Error occurred during the activation/deactivation process {err}')
+
+ threading.Timer(polling_period, subscription_processor,
+ [config_handler, new_administrative_state, mr_pub, app]).start()
+
+
+def main():
+
+ try:
+ config_handler = ConfigHandler()
+ config = config_handler.get_config()
+ app_conf = AppConfig(**config['config'])
+ app = create_app()
+ app.app_context().push()
+ db.create_all(app=app)
+ sub, nfs = aai.get_pmsh_subscription_data(config)
+ mr_pub = app_conf.get_mr_pub('policy_pm_publisher')
+ mr_sub = app_conf.get_mr_sub('policy_pm_subscriber')
+ initial_start_delay = 5.0
+
+ administrative_state = AdministrativeState.LOCKED.value
+ subscription_in_db = Subscription.get(sub.subscriptionName)
+ if subscription_in_db is not None:
+ administrative_state = subscription_in_db.status
+
+ threading.Timer(initial_start_delay, subscription_processor,
+ [config_handler, administrative_state, mr_pub, app]).start()
+
+ threading.Timer(20.0, mr_sub.poll_policy_topic, [sub.subscriptionName, app]).start()
+
+ except Exception as e:
+ logger.debug(f'Failed to Init PMSH: {e}')
+ sys.exit(e)
+
+ while True:
+ logger.debug(Subscription.get_all_nfs_subscription_relations())
+ time.sleep(5)
+
+
+if __name__ == '__main__':
+ main()