diff options
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
12 files changed, 120 insertions, 796 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py index 1024417e..548670c3 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py +++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019-2021 Nordix Foundation. +# Copyright (C) 2019-2022 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== -import logging as logging +import logging import os import pathlib import ssl @@ -26,6 +26,9 @@ from flask_sqlalchemy import SQLAlchemy from onaplogging import monkey from onaplogging.mdcContext import MDC from ruamel.yaml import YAML +import uuid +from functools import wraps +from os import getenv db = SQLAlchemy() basedir = os.path.abspath(os.path.dirname(__file__)) @@ -33,6 +36,22 @@ _connexion_app = None logger = logging.getLogger('onap_logger') +def mdc_handler(func): + @wraps(func) + def wrapper(*args, **kwargs): + request_id = str(uuid.uuid1()) + invocation_id = str(uuid.uuid1()) + MDC.put('ServiceName', getenv('HOSTNAME')) + MDC.put('RequestID', request_id) + MDC.put('InvocationID', invocation_id) + + kwargs['request_id'] = request_id + kwargs['invocation_id'] = invocation_id + return func(*args, **kwargs) + + return wrapper + + def _get_app(): global _connexion_app if not _connexion_app: diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index d2aeb0f0..437a18f0 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019-2021 Nordix Foundation. +# Copyright (C) 2019-2022 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,8 +20,7 @@ from os import environ import requests from requests.auth import HTTPBasicAuth import mod.network_function -import mod.pmsh_utils -from mod import logger +from mod import logger, mdc_handler def get_pmsh_nfs_from_aai(app_conf, nf_filter): @@ -68,8 +67,8 @@ def _get_all_aai_nf_data(app_conf): }""" params = {'format': 'simple', 'nodesOnly': 'true'} response = session.put(aai_named_query_endpoint, headers=headers, - auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'), - app_conf.aaf_creds.get('aaf_pass')), + auth=HTTPBasicAuth(app_conf.aaf_id, + app_conf.aaf_pass), data=data, params=params, verify=(app_conf.ca_cert_path if app_conf.enable_tls else False), cert=((app_conf.cert_path, @@ -102,7 +101,7 @@ def _get_aai_service_url(): raise -@mod.pmsh_utils.mdc_handler +@mdc_handler def _get_aai_request_headers(**kwargs): return {'accept': 'application/json', 'content-type': 'application/json', @@ -160,28 +159,22 @@ def get_aai_model_data(app_conf, model_invariant_id, model_version_id, nf_name): Returns: json (dict): the sdnc_model json object. - - Raises: - Exception: if AAI model data cannot be retrieved. """ - try: - session = requests.Session() - aai_model_ver_endpoint = \ - f'{_get_aai_service_url()}/service-design-and-creation/models/model/' \ - f'{model_invariant_id}/model-vers/model-ver/{model_version_id}' - - logger.info(f'Fetching sdnc-model info for xNF: {nf_name} from AAI.') - headers = _get_aai_request_headers() - response = session.get(aai_model_ver_endpoint, headers=headers, - auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'), - app_conf.aaf_creds.get('aaf_pass')), - verify=(app_conf.ca_cert_path if app_conf.enable_tls else False), - cert=((app_conf.cert_path, - app_conf.key_path) if app_conf.enable_tls else None)) - response.raise_for_status() - if response.ok: - data = json.loads(response.text) - logger.debug(f'Successfully fetched sdnc-model info from AAI: {data}') - return data - except Exception: - raise + session = requests.Session() + aai_model_ver_endpoint = \ + f'{_get_aai_service_url()}/service-design-and-creation/models/model/' \ + f'{model_invariant_id}/model-vers/model-ver/{model_version_id}' + + logger.info(f'Fetching sdnc-model info for xNF: {nf_name} from AAI.') + headers = _get_aai_request_headers() + response = session.get(aai_model_ver_endpoint, headers=headers, + auth=HTTPBasicAuth(app_conf.aaf_id, + app_conf.aaf_pass), + verify=(app_conf.ca_cert_path if app_conf.enable_tls else False), + cert=((app_conf.cert_path, + app_conf.key_path) if app_conf.enable_tls else None)) + response.raise_for_status() + if response.ok: + data = json.loads(response.text) + logger.debug(f'Successfully fetched sdnc-model info from AAI: {data}') + return data diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/controller.py b/components/pm-subscription-handler/pmsh_service/mod/api/controller.py index 57d3e021..2e811c28 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/api/controller.py +++ b/components/pm-subscription-handler/pmsh_service/mod/api/controller.py @@ -22,7 +22,7 @@ from mod.api.services import subscription_service, measurement_group_service from connexion import NoContent from mod.api.custom_exception import InvalidDataException, DuplicateDataException, \ DataConflictException -from mod.subscription import AdministrativeState +from mod.api.services.measurement_group_service import AdministrativeState def status(): diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py index 07d1b642..145a492c 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py +++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py @@ -23,8 +23,40 @@ from mod import db, logger from mod.api.services import nf_service, subscription_service from mod.network_function import NetworkFunction from mod.pmsh_config import MRTopic, AppConfig -from mod.subscription import AdministrativeState, SubNfState from sqlalchemy import or_ +from enum import Enum + + +class MgNfState(Enum): + PENDING_CREATE = 'PENDING_CREATE' + CREATE_FAILED = 'CREATE_FAILED' + CREATED = 'CREATED' + PENDING_DELETE = 'PENDING_DELETE' + DELETE_FAILED = 'DELETE_FAILED' + DELETED = 'DELETED' + + +class AdministrativeState(Enum): + UNLOCKED = 'UNLOCKED' + LOCKING = 'LOCKING' + LOCKED = 'LOCKED' + FILTERING = 'FILTERING' + + +mg_nf_states = { + AdministrativeState.LOCKED.value: { + 'success': MgNfState.DELETED, + 'failed': MgNfState.DELETE_FAILED + }, + AdministrativeState.UNLOCKED.value: { + 'success': MgNfState.CREATED, + 'failed': MgNfState.CREATE_FAILED + }, + AdministrativeState.LOCKING.value: { + 'success': MgNfState.DELETED, + 'failed': MgNfState.DELETE_FAILED + } +} def save_measurement_group(measurement_group, subscription_name): @@ -229,7 +261,7 @@ def deactivate_nfs(sub_model, measurement_group, nf_meas_relations): logger.info(f'Saving measurement group to nf name, measure_grp_name: {nf.nf_name},' f'{measurement_group.measurement_group_name} with DELETE request') update_measurement_group_nf_status(measurement_group.measurement_group_name, - SubNfState.PENDING_DELETE.value, nf.nf_name) + MgNfState.PENDING_DELETE.value, nf.nf_name) try: network_function = NetworkFunction(**nf.serialize_meas_group_nfs()) logger.info(f'Publishing event for nf name, measure_grp_name: {nf.nf_name},' @@ -267,7 +299,7 @@ def activate_nfs(sub_model, measurement_group): apply_nf_status_to_measurement_group(nf.nf_name, measurement_group.measurement_group_name, - SubNfState.PENDING_CREATE.value) + MgNfState.PENDING_CREATE.value) db.session.commit() try: network_function = NetworkFunction(**nf.serialize_nf()) @@ -331,12 +363,12 @@ def filter_nf_to_meas_grp(nf_name, measurement_group_name, status): status (string): status of the network function for measurement group """ try: - if status == SubNfState.DELETED.value: + if status == MgNfState.DELETED.value: delete_nf_to_measurement_group(nf_name, measurement_group_name, - SubNfState.DELETED.value) - elif status == SubNfState.CREATED.value: + MgNfState.DELETED.value) + elif status == MgNfState.CREATED.value: update_measurement_group_nf_status(measurement_group_name, - SubNfState.CREATED.value, nf_name) + MgNfState.CREATED.value, nf_name) nf_measurement_group_rels = NfMeasureGroupRelationalModel.query.filter( NfMeasureGroupRelationalModel.measurement_grp_name == measurement_group_name, or_(NfMeasureGroupRelationalModel.nf_measure_grp_status.like('PENDING_%'), diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py index 032fc4a0..99b72dfb 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py +++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py @@ -23,7 +23,7 @@ from mod.api.db_models import SubscriptionModel, NfSubRelationalModel, \ from mod.api.services import measurement_group_service, nf_service from mod.api.custom_exception import InvalidDataException, DuplicateDataException, \ DataConflictException -from mod.subscription import AdministrativeState, SubNfState +from mod.api.services.measurement_group_service import MgNfState, AdministrativeState from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import joinedload @@ -155,7 +155,7 @@ def apply_measurement_grp_to_nfs(filtered_nfs, unlocked_mgs): f'{measurement_group.measurement_group_name}') measurement_group_service.apply_nf_status_to_measurement_group( nf.nf_name, measurement_group.measurement_group_name, - SubNfState.PENDING_CREATE.value) + MgNfState.PENDING_CREATE.value) def check_missing_data(subscription): diff --git a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py index 16223790..69f2408f 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py @@ -18,7 +18,7 @@ from mod import logger, db from mod.api.services import subscription_service, measurement_group_service -from mod.subscription import AdministrativeState +from mod.api.services.measurement_group_service import AdministrativeState class ExitHandler: diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py index 9c282ab7..295fc379 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2021 Nordix Foundation. +# Copyright (C) 2021-2022 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,8 +28,7 @@ from onap_dcae_cbs_docker_client.client import get_all from requests.auth import HTTPBasicAuth from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type -from mod import logger -from mod.pmsh_utils import mdc_handler +from mod import logger, mdc_handler @unique @@ -67,8 +66,6 @@ class AppConfig(metaclass=MetaSingleton): self.aaf_pass = app_config['config'].get('aaf_password') self.streams_publishes = app_config['config'].get('streams_publishes') self.streams_subscribes = app_config['config'].get('streams_subscribes') - # TODO: aaf_creds variable should be removed on code cleanup - self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass} @staticmethod def get_instance(): @@ -94,7 +91,7 @@ class AppConfig(metaclass=MetaSingleton): return config except Exception as e: logger.error(f'Failed to get config from CBS: {e}', exc_info=True) - raise ValueError(e) + raise ValueError(e) from e @mdc_handler def publish_to_topic(self, mr_topic, event_json, **kwargs): @@ -104,22 +101,16 @@ class AppConfig(metaclass=MetaSingleton): Args: mr_topic (enum) : Message Router topic to publish. event_json (dict): the json data to be published. - - Raises: - Exception: if post request fails. """ - try: - session = requests.Session() - topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url') - headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'], - 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']} - logger.info(f'Publishing event to MR topic: {topic_url}') - response = session.post(topic_url, headers=headers, - auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, - verify=(self.ca_cert_path if self.enable_tls else False)) - response.raise_for_status() - except Exception as e: - raise e + session = requests.Session() + topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url') + headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'], + 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']} + logger.info(f'Publishing event to MR topic: {topic_url}') + response = session.post(topic_url, headers=headers, + auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, + verify=(self.ca_cert_path if self.enable_tls else False)) + response.raise_for_status() @mdc_handler def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000, @@ -149,7 +140,6 @@ class AppConfig(metaclass=MetaSingleton): verify=(self.ca_cert_path if self.enable_tls else False)) if response.status_code == 503: logger.error(f'MR Service is unavailable at present: {response.content}') - pass response.raise_for_status() if response.ok: return response.json() diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py deleted file mode 100755 index d1790bbb..00000000 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ /dev/null @@ -1,313 +0,0 @@ -# ============LICENSE_START=================================================== -# Copyright (C) 2019-2021 Nordix Foundation. -# ============================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END===================================================== -import json -import os -import uuid -from functools import wraps -from json import JSONDecodeError -from os import getenv -from threading import Timer - -import requests -from jsonschema import validate, ValidationError -from onap_dcae_cbs_docker_client.client import get_all -from onaplogging.mdcContext import MDC -from requests.auth import HTTPBasicAuth -from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type - -from mod import logger -from mod.subscription import Subscription - - -def mdc_handler(func): - @wraps(func) - def wrapper(*args, **kwargs): - request_id = str(uuid.uuid1()) - invocation_id = str(uuid.uuid1()) - MDC.put('ServiceName', getenv('HOSTNAME')) - MDC.put('RequestID', request_id) - MDC.put('InvocationID', invocation_id) - - kwargs['request_id'] = request_id - kwargs['invocation_id'] = invocation_id - return func(*args, **kwargs) - - return wrapper - - -def _load_sub_schema_from_file(): - try: - with open(os.path.join(os.path.dirname(__file__), 'sub_schema.json')) as sub: - return json.load(sub) - except OSError as err: - logger.error(f'Failed to read sub schema file: {err}', exc_info=True) - except JSONDecodeError as json_err: - logger.error(f'sub schema file is not a valid JSON file: {json_err}', exc_info=True) - - -class AppConfig: - INSTANCE = None - - def __init__(self): - conf = self._get_pmsh_config() - self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'), - 'aaf_pass': conf['config'].get('aaf_password')} - self.enable_tls = conf['config'].get('enable_tls') - self.ca_cert_path = conf['config'].get('ca_cert_path') - self.cert_path = conf['config'].get('cert_path') - self.key_path = conf['config'].get('key_path') - self.streams_subscribes = conf['config'].get('streams_subscribes') - self.streams_publishes = conf['config'].get('streams_publishes') - self.operational_policy_name = conf['config'].get('operational_policy_name') - self.control_loop_name = conf['config'].get('control_loop_name') - self.sub_schema = _load_sub_schema_from_file() - self.subscription = Subscription(self.control_loop_name, - self.operational_policy_name, - **conf['config']['pmsh_policy']['subscription']) - self.nf_filter = None - - def __new__(cls, *args, **kwargs): - if AppConfig.INSTANCE is None: - AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs) - return AppConfig.INSTANCE - - @mdc_handler - @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), - retry=retry_if_exception_type(ValueError)) - def _get_pmsh_config(self, **kwargs): - """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response - is received, it retries after 2 seconds for 5 times before raising an exception. - - Returns: - dict: Dictionary representation of the the service configuration - - Raises: - Exception: If any error occurred pulling configuration from Config binding service. - """ - try: - logger.info('Attempting to fetch PMSH Configuration from CBS.') - config = get_all() - logger.info(f'Successfully fetched PMSH config from CBS: {config}') - return config - except Exception as e: - logger.error(f'Failed to get config from CBS: {e}', exc_info=True) - raise ValueError(e) - - def validate_sub_schema(self): - """ - Validates schema of PMSH subscription - - Raises: - ValidationError: If the PMSH subscription schema is invalid - """ - sub_data = self.subscription.__dict__ - validate(instance=sub_data, schema=self.sub_schema) - nf_filter = sub_data["nfFilter"] - if not [filter_name for filter_name, val in nf_filter.items() if len(val) > 0]: - raise ValidationError("At least one filter within nfFilter must not be empty") - logger.debug("Subscription schema is valid.") - - def refresh_config(self): - """ - Update the relevant attributes of the AppConfig object. - - Raises: - Exception: if cbs request fails. - """ - try: - app_conf = self._get_pmsh_config() - self.operational_policy_name = app_conf['config'].get('operational_policy_name') - self.control_loop_name = app_conf['config'].get('control_loop_name') - self.subscription = Subscription(self.control_loop_name, - self.operational_policy_name, - **app_conf['config']['pmsh_policy']['subscription']) - logger.info("AppConfig data has been refreshed") - except Exception: - logger.error('Failed to refresh PMSH AppConfig') - raise - - def get_mr_sub(self, sub_name): - """ - Returns the MrSub object requested. - - Args: - sub_name: the key of the subscriber object. - - Returns: - MrSub: an Instance of an `MrSub` <MrSub> Object. - - Raises: - KeyError: if the sub_name is not found. - """ - try: - return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path, - self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name]) - except KeyError as e: - logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True) - raise - - def get_mr_pub(self, pub_name): - """ - Returns the MrPub object requested. - - Args: - pub_name: the key of the publisher object. - - Returns: - MrPub: an Instance of an `MrPub` <MrPub> Object. - - Raises: - KeyError: if the sub_name is not found. - """ - try: - return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path, - self.enable_tls, self.cert_params, **self.streams_publishes[pub_name]) - except KeyError as e: - logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True) - raise - - @property - def cert_params(self): - """ - Returns the tls artifact paths. - - Returns: - cert_path, key_path (tuple): the path to tls cert and key. - """ - return self.cert_path, self.key_path - - -class _DmaapMrClient: - def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): - """ - A DMaaP Message Router utility class. - Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only. - Args: - aaf_creds (dict): a dict of aaf secure credentials. - ca_cert_path (str): path to the ca certificate. - enable_tls (bool): TLS if True, else False - cert_params (tuple): client side (cert, key) tuple. - **kwargs: a dict of streams_{subscribes|publishes} data. - """ - self.topic_url = kwargs.get('dmaap_info').get('topic_url') - self.aaf_id = aaf_creds.get('aaf_id') - self.aaf_pass = aaf_creds.get('aaf_pass') - self.ca_cert_path = ca_cert_path - self.enable_tls = enable_tls - self.cert_params = cert_params - - -class _MrPub(_DmaapMrClient): - def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): - self.pub_name = pub_name - super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs) - - @mdc_handler - def publish_to_topic(self, event_json, **kwargs): - """ - Publish the event to the DMaaP Message Router topic. - - Args: - event_json (dict): the json data to be published. - - Raises: - Exception: if post request fails. - """ - try: - session = requests.Session() - headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'], - 'InvocationID': kwargs['invocation_id'], - 'RequestID': kwargs['request_id'] - } - logger.info(f'Publishing event to {self.topic_url}') - response = session.post(self.topic_url, headers=headers, - auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, - verify=(self.ca_cert_path if self.enable_tls else False)) - response.raise_for_status() - except Exception as e: - raise e - - def publish_subscription_event_data(self, subscription, nf): - """ - Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic. - - Args: - subscription (Subscription): the `Subscription` <Subscription> object. - nf (NetworkFunction): the NetworkFunction to include in the event. - """ - try: - subscription_event = subscription.prepare_subscription_event(nf) - logger.debug(f'Subscription event: {subscription_event}') - self.publish_to_topic(subscription_event) - except Exception as e: - logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True) - raise e - - -class _MrSub(_DmaapMrClient): - def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): - self.sub_name = sub_name - super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs) - - @mdc_handler - def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs): - """ - Returns the json data from the MrTopic. - - Args: - consumer_id (str): Within your subscribers group, a name that uniquely - identifies your subscribers process. - consumer_group (str): A name that uniquely identifies your subscribers. - timeout (int): The request timeout value in mSec. - - Returns: - list[str]: the json response from DMaaP Message Router topic. - """ - try: - session = requests.Session() - headers = {'accept': 'application/json', 'content-type': 'application/json', - 'InvocationID': kwargs['invocation_id'], - 'RequestID': kwargs['request_id']} - logger.info(f'Fetching messages from MR topic: {self.topic_url}') - response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}' - f'?timeout={timeout}', - auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers, - verify=(self.ca_cert_path if self.enable_tls else False)) - if response.status_code == 503: - logger.error(f'MR Service is unavailable at present: {response.content}') - pass - response.raise_for_status() - if response.ok: - return response.json() - except Exception as e: - logger.error(f'Failed to fetch message from MR: {e}', exc_info=True) - raise - - -class PeriodicTask(Timer): - """ - See :class:`Timer`. - """ - - def run(self): - self.function(*self.args, **self.kwargs) - while not self.finished.wait(self.interval): - try: - self.function(*self.args, **self.kwargs) - except Exception as e: - logger.error(f'Exception in thread: {self.name}: {e}', exc_info=True) diff --git a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py index 5065ce8a..cfcda091 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py @@ -16,9 +16,11 @@ # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== import json + +from mod.api.services.measurement_group_service import mg_nf_states, \ + AdministrativeState, MgNfState from mod.pmsh_config import MRTopic, AppConfig from mod import logger -from mod.subscription import AdministrativeState, subscription_nf_states, SubNfState from mod.api.db_models import MeasurementGroupModel, NfMeasureGroupRelationalModel from mod.api.services import measurement_group_service @@ -96,12 +98,12 @@ class PolicyResponseHandler: NfMeasureGroupRelationalModel.measurement_grp_name == measurement_group_name, NfMeasureGroupRelationalModel.nf_name == nf_name ).one_or_none() - if nf_msg_rel.nf_measure_grp_status == SubNfState.PENDING_DELETE.value: + if nf_msg_rel.nf_measure_grp_status == MgNfState.PENDING_DELETE.value: administrative_state = AdministrativeState.LOCKING.value - elif nf_msg_rel.nf_measure_grp_status == SubNfState.PENDING_CREATE.value: + elif nf_msg_rel.nf_measure_grp_status == MgNfState.PENDING_CREATE.value: administrative_state = AdministrativeState.UNLOCKED.value - nf_measure_grp_status = (subscription_nf_states[administrative_state] + nf_measure_grp_status = (mg_nf_states[administrative_state] [response_message]).value policy_response_handle_functions[administrative_state][response_message]( measurement_group_name=measurement_group_name, status=nf_measure_grp_status, diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py deleted file mode 100755 index ddb6e1f5..00000000 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ /dev/null @@ -1,295 +0,0 @@ -# ============LICENSE_START=================================================== -# Copyright (C) 2019-2022 Nordix Foundation. -# ============================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END===================================================== -from enum import Enum - -from mod import db, logger -from mod.api.db_models import SubscriptionModel, NfSubRelationalModel, NetworkFunctionModel - - -class SubNfState(Enum): - PENDING_CREATE = 'PENDING_CREATE' - CREATE_FAILED = 'CREATE_FAILED' - CREATED = 'CREATED' - PENDING_DELETE = 'PENDING_DELETE' - DELETE_FAILED = 'DELETE_FAILED' - DELETED = 'DELETED' - - -class AdministrativeState(Enum): - UNLOCKED = 'UNLOCKED' - LOCKING = 'LOCKING' - LOCKED = 'LOCKED' - FILTERING = 'FILTERING' - - -subscription_nf_states = { - AdministrativeState.LOCKED.value: { - 'success': SubNfState.DELETED, - 'failed': SubNfState.DELETE_FAILED - }, - AdministrativeState.UNLOCKED.value: { - 'success': SubNfState.CREATED, - 'failed': SubNfState.CREATE_FAILED - }, - AdministrativeState.LOCKING.value: { - 'success': SubNfState.DELETED, - 'failed': SubNfState.DELETE_FAILED - } -} - - -def _get_nf_objects(nf_sub_relationships): - nfs = [] - for nf_sub_entry in nf_sub_relationships: - nf_model_object = NetworkFunctionModel.query.filter( - NetworkFunctionModel.nf_name == nf_sub_entry.nf_name).one_or_none() - nfs.append(nf_model_object.to_nf()) - return nfs - - -class Subscription: - def __init__(self, control_loop_name, operational_policy_name, **kwargs): - self.subscriptionName = kwargs.get('subscriptionName') - self.administrativeState = kwargs.get('administrativeState') - self.fileBasedGP = kwargs.get('fileBasedGP') - self.fileLocation = kwargs.get('fileLocation') - self.nfFilter = kwargs.get('nfFilter') - self.measurementGroups = kwargs.get('measurementGroups') - self.control_loop_name = control_loop_name - self.operational_policy_name = operational_policy_name - self.create() - - def update_sub_params(self, admin_state, file_based_gp, file_location, meas_groups): - self.administrativeState = admin_state - self.fileBasedGP = file_based_gp - self.fileLocation = file_location - self.measurementGroups = meas_groups - - def create(self): - """ Creates a subscription database entry - - Returns: - Subscription object - """ - try: - existing_subscription = (SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()) - if existing_subscription is None: - new_subscription = \ - SubscriptionModel(subscription_name=self.subscriptionName, - operational_policy_name=self.operational_policy_name, - control_loop_name=self.control_loop_name, - status=AdministrativeState.LOCKED.value) - - db.session.add(new_subscription) - db.session.commit() - return new_subscription - else: - logger.debug(f'Subscription {self.subscriptionName} already exists,' - f' returning this subscription..') - return existing_subscription - except Exception as e: - logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}', - exc_info=True) - finally: - db.session.remove() - - def update_subscription_status(self): - """ Updates the status of subscription in subscription table """ - try: - SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName) \ - .update({SubscriptionModel.status: self.administrativeState}, - synchronize_session='evaluate') - - db.session.commit() - except Exception as e: - logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}', - exc_info=True) - finally: - db.session.remove() - - def prepare_subscription_event(self, nf): - """Prepare the sub event for publishing - - Args: - nf (NetworkFunction): the AAI nf. - - Returns: - dict: the Subscription event to be published. - """ - try: - clean_sub = \ - {k: v for k, v in self.__dict__.items() if - (k != 'nfFilter' and k != 'control_loop_name' and k != 'operational_policy_name')} - if self.administrativeState == AdministrativeState.LOCKING.value: - change_type = 'DELETE' - else: - change_type = 'CREATE' - - sub_event = { - 'nfName': nf.nf_name, - 'ipAddress': nf.ipv4_address if nf.ipv6_address in (None, '') else nf.ipv6_address, - 'blueprintName': nf.sdnc_model_name, - 'blueprintVersion': nf.sdnc_model_version, - 'operationalPolicyName': self.operational_policy_name, - 'changeType': change_type, - 'controlLoopName': self.control_loop_name, - 'subscription': clean_sub} - return sub_event - except Exception as e: - logger.error(f'Failed to prep Sub event for xNF {nf.nf_name}: {e}', exc_info=True) - raise - - def add_network_function_to_subscription(self, nf, sub_model): - """ Associates a network function to a Subscription - - Args: - sub_model(SubscriptionModel): The SubscriptionModel from the DB. - nf(NetworkFunction): A NetworkFunction object. - """ - try: - current_nf = nf.create() - existing_entry = NfSubRelationalModel.query.filter( - NfSubRelationalModel.subscription_name == self.subscriptionName, - NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none() - if existing_entry is None: - new_nf_sub = NfSubRelationalModel(self.subscriptionName, - nf.nf_name, SubNfState.PENDING_CREATE.value) - sub_model.nfs.append(new_nf_sub) - db.session.add(sub_model) - db.session.commit() - logger.info(f'Network function {current_nf.nf_name} added to Subscription ' - f'{self.subscriptionName}') - except Exception as e: - logger.error(f'Failed to add nf {nf.nf_name} to subscription ' - f'{self.subscriptionName}: {e}', exc_info=True) - logger.debug(f'Subscription {self.subscriptionName} now contains these XNFs:' - f'{[nf.nf_name for nf.nf_name in self.get_network_functions()]}') - - def get(self): - """ Retrieves a SubscriptionModel object - - Returns: - SubscriptionModel object else None - """ - sub_model = SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() - return sub_model - - def get_local_sub_admin_state(self): - """ Retrieves the subscription admin state - - Returns: - str: The admin state of the SubscriptionModel - """ - sub_model = SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() - db.session.remove() - return sub_model.status - - def create_subscription_on_nfs(self, nfs, mr_pub): - """ Publishes an event to create a Subscription on an nf - - Args: - nfs(list[NetworkFunction]): A list of NetworkFunction Objects. - mr_pub (_MrPub): MR publisher - """ - try: - existing_nfs = self.get_network_functions() - sub_model = self.get() - for nf in [new_nf for new_nf in nfs if new_nf not in existing_nfs]: - logger.info(f'Publishing event to create ' - f'Sub: {self.subscriptionName} on nf: {nf.nf_name}') - mr_pub.publish_subscription_event_data(self, nf) - self.add_network_function_to_subscription(nf, sub_model) - self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value, - nf.nf_name) - except Exception as err: - raise Exception(f'Error publishing create event to MR: {err}') - - def delete_subscription_from_nfs(self, nfs, mr_pub): - """ Publishes an event to delete a Subscription from an nf - - Args: - nfs(list[NetworkFunction]): A list of NetworkFunction Objects. - mr_pub (_MrPub): MR publisher - """ - try: - for nf in nfs: - logger.debug(f'Publishing Event to delete ' - f'Sub: {self.subscriptionName} from the nf: {nf.nf_name}') - mr_pub.publish_subscription_event_data(self, nf) - self.update_sub_nf_status(self.subscriptionName, - SubNfState.PENDING_DELETE.value, - nf.nf_name) - except Exception as err: - raise Exception(f'Error publishing delete event to MR: {err}') - - @staticmethod - def get_all_nfs_subscription_relations(): - """ Retrieves all network function to subscription relations - - Returns: - list(NfSubRelationalModel): NetworkFunctions per Subscription list else empty - """ - nf_per_subscriptions = NfSubRelationalModel.query.all() - db.session.remove() - return nf_per_subscriptions - - @staticmethod - def update_sub_nf_status(subscription_name, status, nf_name): - """ Updates the status of the subscription for a particular nf - - Args: - subscription_name (str): The subscription name - nf_name (str): The network function name - status (str): Status of the subscription - """ - try: - NfSubRelationalModel.query.filter( - NfSubRelationalModel.subscription_name == subscription_name, - NfSubRelationalModel.nf_name == nf_name). \ - update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate') - db.session.commit() - except Exception as e: - logger.error(f'Failed to update status of nf: {nf_name} for subscription: ' - f'{subscription_name}: {e}', exc_info=True) - - def get_network_functions(self): - nf_sub_relationships = NfSubRelationalModel.query.filter( - NfSubRelationalModel.subscription_name == self.subscriptionName) - nfs = _get_nf_objects(nf_sub_relationships) - db.session.remove() - return nfs - - def get_delete_failed_nfs(self): - nf_sub_relationships = NfSubRelationalModel.query.filter( - NfSubRelationalModel.subscription_name == self.subscriptionName, - NfSubRelationalModel.nf_sub_status == SubNfState.DELETE_FAILED.value) - nfs = _get_nf_objects(nf_sub_relationships) - db.session.remove() - return nfs - - def get_delete_pending_nfs(self): - nf_sub_relationships = NfSubRelationalModel.query.filter( - NfSubRelationalModel.subscription_name == self.subscriptionName, - NfSubRelationalModel.nf_sub_status == SubNfState.PENDING_DELETE.value) - nfs = _get_nf_objects(nf_sub_relationships) - db.session.remove() - return nfs diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py deleted file mode 100644 index 5fbb9a6c..00000000 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ /dev/null @@ -1,118 +0,0 @@ -# ============LICENSE_START=================================================== -# Copyright (C) 2020-2021 Nordix Foundation. -# ============================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END===================================================== -from jsonschema import ValidationError - -from mod import logger, aai_client -from mod.network_function import NetworkFunctionFilter -from mod.subscription import AdministrativeState - - -class SubscriptionHandler: - def __init__(self, mr_pub, aai_sub, app, app_conf): - self.mr_pub = mr_pub - self.aai_sub = aai_sub - self.app = app - self.app_conf = app_conf - self.aai_event_thread = None - - def execute(self): - """ - Checks for changes of administrative state in config and proceeds to process - the Subscription if a change has occurred - """ - self.app.app_context().push() - try: - local_admin_state = self.app_conf.subscription.get_local_sub_admin_state() - if local_admin_state == AdministrativeState.LOCKING.value: - self._check_for_failed_nfs() - else: - self.app_conf.refresh_config() - self.app_conf.validate_sub_schema() - new_administrative_state = self.app_conf.subscription.administrativeState - if local_admin_state == new_administrative_state: - logger.info(f'Administrative State did not change in the app config: ' - f'{new_administrative_state}') - else: - self._check_state_change(local_admin_state, new_administrative_state) - except (ValidationError, TypeError) as err: - logger.error(f'Error occurred during validation of subscription schema {err}', - exc_info=True) - except Exception as err: - logger.error(f'Error occurred during the activation/deactivation process {err}', - exc_info=True) - - def _check_state_change(self, local_admin_state, new_administrative_state): - if new_administrative_state == AdministrativeState.UNLOCKED.value: - logger.info(f'Administrative State has changed from {local_admin_state} ' - f'to {new_administrative_state}.') - self._activate(new_administrative_state) - elif new_administrative_state == AdministrativeState.LOCKED.value: - logger.info(f'Administrative State has changed from {local_admin_state} ' - f'to {new_administrative_state}.') - self._deactivate() - else: - raise Exception(f'Invalid AdministrativeState: {new_administrative_state}') - - def _activate(self, new_administrative_state): - if not self.app_conf.nf_filter: - self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter) - self.app_conf.subscription.update_sub_params(new_administrative_state, - self.app_conf.subscription.fileBasedGP, - self.app_conf.subscription.fileLocation, - self.app_conf.subscription.measurementGroups) - nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf, self.app_conf.nf_filter) - self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub) - self.app_conf.subscription.update_subscription_status() - - def _deactivate(self): - nfs = self.app_conf.subscription.get_network_functions() - if nfs: - self.stop_aai_event_thread() - self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value - logger.info('Subscription is now LOCKING/DEACTIVATING.') - self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub) - self.app_conf.subscription.update_subscription_status() - - def stop_aai_event_thread(self): - if self.aai_event_thread is not None: - self.aai_event_thread.cancel() - self.aai_event_thread = None - logger.info('Stopping polling for NFs events on AAI-EVENT topic in MR.') - - def _check_for_failed_nfs(self): - logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.') - del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs() - if del_failed_nfs or self.app_conf.subscription.get_delete_pending_nfs(): - for nf in del_failed_nfs: - nf_model = nf.get(nf.nf_name) - if nf_model.retry_count < 3: - logger.info(f'Retry deletion of subscription ' - f'{self.app_conf.subscription.subscriptionName} ' - f'from NF: {nf.nf_name}') - self.app_conf.subscription.delete_subscription_from_nfs([nf], self.mr_pub) - nf.increment_retry_count() - else: - logger.error(f'Failed to delete the subscription ' - f'{self.app_conf.subscription.subscriptionName} ' - f'from NF: {nf.nf_name} after {nf_model.retry_count} ' - f'attempts. Removing NF from DB') - nf.delete(nf_name=nf.nf_name) - else: - logger.info('Proceeding to LOCKED adminState.') - self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value - self.app_conf.subscription.update_subscription_status() diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index 1d8b0b34..dbc58e51 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -21,8 +21,22 @@ from mod.aai_event_handler import AAIEventHandler from mod import db, create_app, launch_api_server, logger from mod.exit_handler import ExitHandler from mod.pmsh_config import AppConfig -from mod.pmsh_utils import PeriodicTask from mod.policy_response_handler import PolicyResponseHandler +from threading import Timer + + +class PeriodicTask(Timer): + """ + See :class:`Timer`. + """ + + def run(self): + self.function(*self.args, **self.kwargs) + while not self.finished.wait(self.interval): + try: + self.function(*self.args, **self.kwargs) + except Exception as e: + logger.error(f'Exception in thread: {self.name}: {e}', exc_info=True) def main(): |