summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod
diff options
context:
space:
mode:
authorraviteja.karumuri <raviteja.karumuri@est.tech>2022-03-02 17:59:27 +0000
committerraviteja.karumuri <raviteja.karumuri@est.tech>2022-03-03 16:54:04 +0000
commit5a2c43f2add2c6d4af8331a40b174684eac11b34 (patch)
tree4e76bac8e33d80b1e0e9af1fdf2b57fe6a9a7bf2 /components/pm-subscription-handler/pmsh_service/mod
parent5f69c24ad78121a2840b5299583791e557f8b535 (diff)
[PMSH] Cleaning up old App Config, subscription handler and it's subsequent calls
Issue-ID: DCAEGEN2-3085 Signed-off-by: Raviteja, Karumuri <raviteja.karumuri@est.tech> Change-Id: I7b862648eebf59844aaa9d09697b7f2a693c9d94
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod')
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/__init__.py23
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_client.py53
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/api/controller.py2
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py46
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py4
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/exit_handler.py2
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py34
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py313
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py10
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/subscription.py295
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py118
11 files changed, 105 insertions, 795 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
index 1024417e..548670c3 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2019-2021 Nordix Foundation.
+# Copyright (C) 2019-2022 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,7 +15,7 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-import logging as logging
+import logging
import os
import pathlib
import ssl
@@ -26,6 +26,9 @@ from flask_sqlalchemy import SQLAlchemy
from onaplogging import monkey
from onaplogging.mdcContext import MDC
from ruamel.yaml import YAML
+import uuid
+from functools import wraps
+from os import getenv
db = SQLAlchemy()
basedir = os.path.abspath(os.path.dirname(__file__))
@@ -33,6 +36,22 @@ _connexion_app = None
logger = logging.getLogger('onap_logger')
+def mdc_handler(func):
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ request_id = str(uuid.uuid1())
+ invocation_id = str(uuid.uuid1())
+ MDC.put('ServiceName', getenv('HOSTNAME'))
+ MDC.put('RequestID', request_id)
+ MDC.put('InvocationID', invocation_id)
+
+ kwargs['request_id'] = request_id
+ kwargs['invocation_id'] = invocation_id
+ return func(*args, **kwargs)
+
+ return wrapper
+
+
def _get_app():
global _connexion_app
if not _connexion_app:
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
index d2aeb0f0..437a18f0 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2019-2021 Nordix Foundation.
+# Copyright (C) 2019-2022 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,8 +20,7 @@ from os import environ
import requests
from requests.auth import HTTPBasicAuth
import mod.network_function
-import mod.pmsh_utils
-from mod import logger
+from mod import logger, mdc_handler
def get_pmsh_nfs_from_aai(app_conf, nf_filter):
@@ -68,8 +67,8 @@ def _get_all_aai_nf_data(app_conf):
}"""
params = {'format': 'simple', 'nodesOnly': 'true'}
response = session.put(aai_named_query_endpoint, headers=headers,
- auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'),
- app_conf.aaf_creds.get('aaf_pass')),
+ auth=HTTPBasicAuth(app_conf.aaf_id,
+ app_conf.aaf_pass),
data=data, params=params,
verify=(app_conf.ca_cert_path if app_conf.enable_tls else False),
cert=((app_conf.cert_path,
@@ -102,7 +101,7 @@ def _get_aai_service_url():
raise
-@mod.pmsh_utils.mdc_handler
+@mdc_handler
def _get_aai_request_headers(**kwargs):
return {'accept': 'application/json',
'content-type': 'application/json',
@@ -160,28 +159,22 @@ def get_aai_model_data(app_conf, model_invariant_id, model_version_id, nf_name):
Returns:
json (dict): the sdnc_model json object.
-
- Raises:
- Exception: if AAI model data cannot be retrieved.
"""
- try:
- session = requests.Session()
- aai_model_ver_endpoint = \
- f'{_get_aai_service_url()}/service-design-and-creation/models/model/' \
- f'{model_invariant_id}/model-vers/model-ver/{model_version_id}'
-
- logger.info(f'Fetching sdnc-model info for xNF: {nf_name} from AAI.')
- headers = _get_aai_request_headers()
- response = session.get(aai_model_ver_endpoint, headers=headers,
- auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'),
- app_conf.aaf_creds.get('aaf_pass')),
- verify=(app_conf.ca_cert_path if app_conf.enable_tls else False),
- cert=((app_conf.cert_path,
- app_conf.key_path) if app_conf.enable_tls else None))
- response.raise_for_status()
- if response.ok:
- data = json.loads(response.text)
- logger.debug(f'Successfully fetched sdnc-model info from AAI: {data}')
- return data
- except Exception:
- raise
+ session = requests.Session()
+ aai_model_ver_endpoint = \
+ f'{_get_aai_service_url()}/service-design-and-creation/models/model/' \
+ f'{model_invariant_id}/model-vers/model-ver/{model_version_id}'
+
+ logger.info(f'Fetching sdnc-model info for xNF: {nf_name} from AAI.')
+ headers = _get_aai_request_headers()
+ response = session.get(aai_model_ver_endpoint, headers=headers,
+ auth=HTTPBasicAuth(app_conf.aaf_id,
+ app_conf.aaf_pass),
+ verify=(app_conf.ca_cert_path if app_conf.enable_tls else False),
+ cert=((app_conf.cert_path,
+ app_conf.key_path) if app_conf.enable_tls else None))
+ response.raise_for_status()
+ if response.ok:
+ data = json.loads(response.text)
+ logger.debug(f'Successfully fetched sdnc-model info from AAI: {data}')
+ return data
diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/controller.py b/components/pm-subscription-handler/pmsh_service/mod/api/controller.py
index 57d3e021..2e811c28 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/api/controller.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/api/controller.py
@@ -22,7 +22,7 @@ from mod.api.services import subscription_service, measurement_group_service
from connexion import NoContent
from mod.api.custom_exception import InvalidDataException, DuplicateDataException, \
DataConflictException
-from mod.subscription import AdministrativeState
+from mod.api.services.measurement_group_service import AdministrativeState
def status():
diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py
index 07d1b642..145a492c 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py
@@ -23,8 +23,40 @@ from mod import db, logger
from mod.api.services import nf_service, subscription_service
from mod.network_function import NetworkFunction
from mod.pmsh_config import MRTopic, AppConfig
-from mod.subscription import AdministrativeState, SubNfState
from sqlalchemy import or_
+from enum import Enum
+
+
+class MgNfState(Enum):
+ PENDING_CREATE = 'PENDING_CREATE'
+ CREATE_FAILED = 'CREATE_FAILED'
+ CREATED = 'CREATED'
+ PENDING_DELETE = 'PENDING_DELETE'
+ DELETE_FAILED = 'DELETE_FAILED'
+ DELETED = 'DELETED'
+
+
+class AdministrativeState(Enum):
+ UNLOCKED = 'UNLOCKED'
+ LOCKING = 'LOCKING'
+ LOCKED = 'LOCKED'
+ FILTERING = 'FILTERING'
+
+
+mg_nf_states = {
+ AdministrativeState.LOCKED.value: {
+ 'success': MgNfState.DELETED,
+ 'failed': MgNfState.DELETE_FAILED
+ },
+ AdministrativeState.UNLOCKED.value: {
+ 'success': MgNfState.CREATED,
+ 'failed': MgNfState.CREATE_FAILED
+ },
+ AdministrativeState.LOCKING.value: {
+ 'success': MgNfState.DELETED,
+ 'failed': MgNfState.DELETE_FAILED
+ }
+}
def save_measurement_group(measurement_group, subscription_name):
@@ -229,7 +261,7 @@ def deactivate_nfs(sub_model, measurement_group, nf_meas_relations):
logger.info(f'Saving measurement group to nf name, measure_grp_name: {nf.nf_name},'
f'{measurement_group.measurement_group_name} with DELETE request')
update_measurement_group_nf_status(measurement_group.measurement_group_name,
- SubNfState.PENDING_DELETE.value, nf.nf_name)
+ MgNfState.PENDING_DELETE.value, nf.nf_name)
try:
network_function = NetworkFunction(**nf.serialize_meas_group_nfs())
logger.info(f'Publishing event for nf name, measure_grp_name: {nf.nf_name},'
@@ -267,7 +299,7 @@ def activate_nfs(sub_model, measurement_group):
apply_nf_status_to_measurement_group(nf.nf_name,
measurement_group.measurement_group_name,
- SubNfState.PENDING_CREATE.value)
+ MgNfState.PENDING_CREATE.value)
db.session.commit()
try:
network_function = NetworkFunction(**nf.serialize_nf())
@@ -331,12 +363,12 @@ def filter_nf_to_meas_grp(nf_name, measurement_group_name, status):
status (string): status of the network function for measurement group
"""
try:
- if status == SubNfState.DELETED.value:
+ if status == MgNfState.DELETED.value:
delete_nf_to_measurement_group(nf_name, measurement_group_name,
- SubNfState.DELETED.value)
- elif status == SubNfState.CREATED.value:
+ MgNfState.DELETED.value)
+ elif status == MgNfState.CREATED.value:
update_measurement_group_nf_status(measurement_group_name,
- SubNfState.CREATED.value, nf_name)
+ MgNfState.CREATED.value, nf_name)
nf_measurement_group_rels = NfMeasureGroupRelationalModel.query.filter(
NfMeasureGroupRelationalModel.measurement_grp_name == measurement_group_name,
or_(NfMeasureGroupRelationalModel.nf_measure_grp_status.like('PENDING_%'),
diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py
index 032fc4a0..99b72dfb 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py
@@ -23,7 +23,7 @@ from mod.api.db_models import SubscriptionModel, NfSubRelationalModel, \
from mod.api.services import measurement_group_service, nf_service
from mod.api.custom_exception import InvalidDataException, DuplicateDataException, \
DataConflictException
-from mod.subscription import AdministrativeState, SubNfState
+from mod.api.services.measurement_group_service import MgNfState, AdministrativeState
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload
@@ -155,7 +155,7 @@ def apply_measurement_grp_to_nfs(filtered_nfs, unlocked_mgs):
f'{measurement_group.measurement_group_name}')
measurement_group_service.apply_nf_status_to_measurement_group(
nf.nf_name, measurement_group.measurement_group_name,
- SubNfState.PENDING_CREATE.value)
+ MgNfState.PENDING_CREATE.value)
def check_missing_data(subscription):
diff --git a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
index 16223790..69f2408f 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
@@ -18,7 +18,7 @@
from mod import logger, db
from mod.api.services import subscription_service, measurement_group_service
-from mod.subscription import AdministrativeState
+from mod.api.services.measurement_group_service import AdministrativeState
class ExitHandler:
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py
index 9c282ab7..295fc379 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2021 Nordix Foundation.
+# Copyright (C) 2021-2022 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -28,8 +28,7 @@ from onap_dcae_cbs_docker_client.client import get_all
from requests.auth import HTTPBasicAuth
from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
-from mod import logger
-from mod.pmsh_utils import mdc_handler
+from mod import logger, mdc_handler
@unique
@@ -67,8 +66,6 @@ class AppConfig(metaclass=MetaSingleton):
self.aaf_pass = app_config['config'].get('aaf_password')
self.streams_publishes = app_config['config'].get('streams_publishes')
self.streams_subscribes = app_config['config'].get('streams_subscribes')
- # TODO: aaf_creds variable should be removed on code cleanup
- self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass}
@staticmethod
def get_instance():
@@ -94,7 +91,7 @@ class AppConfig(metaclass=MetaSingleton):
return config
except Exception as e:
logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
- raise ValueError(e)
+ raise ValueError(e) from e
@mdc_handler
def publish_to_topic(self, mr_topic, event_json, **kwargs):
@@ -104,22 +101,16 @@ class AppConfig(metaclass=MetaSingleton):
Args:
mr_topic (enum) : Message Router topic to publish.
event_json (dict): the json data to be published.
-
- Raises:
- Exception: if post request fails.
"""
- try:
- session = requests.Session()
- topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
- headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
- 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
- logger.info(f'Publishing event to MR topic: {topic_url}')
- response = session.post(topic_url, headers=headers,
- auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
- verify=(self.ca_cert_path if self.enable_tls else False))
- response.raise_for_status()
- except Exception as e:
- raise e
+ session = requests.Session()
+ topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
+ headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
+ 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
+ logger.info(f'Publishing event to MR topic: {topic_url}')
+ response = session.post(topic_url, headers=headers,
+ auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
+ verify=(self.ca_cert_path if self.enable_tls else False))
+ response.raise_for_status()
@mdc_handler
def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000,
@@ -149,7 +140,6 @@ class AppConfig(metaclass=MetaSingleton):
verify=(self.ca_cert_path if self.enable_tls else False))
if response.status_code == 503:
logger.error(f'MR Service is unavailable at present: {response.content}')
- pass
response.raise_for_status()
if response.ok:
return response.json()
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
deleted file mode 100755
index d1790bbb..00000000
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ /dev/null
@@ -1,313 +0,0 @@
-# ============LICENSE_START===================================================
-# Copyright (C) 2019-2021 Nordix Foundation.
-# ============================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# SPDX-License-Identifier: Apache-2.0
-# ============LICENSE_END=====================================================
-import json
-import os
-import uuid
-from functools import wraps
-from json import JSONDecodeError
-from os import getenv
-from threading import Timer
-
-import requests
-from jsonschema import validate, ValidationError
-from onap_dcae_cbs_docker_client.client import get_all
-from onaplogging.mdcContext import MDC
-from requests.auth import HTTPBasicAuth
-from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
-
-from mod import logger
-from mod.subscription import Subscription
-
-
-def mdc_handler(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- request_id = str(uuid.uuid1())
- invocation_id = str(uuid.uuid1())
- MDC.put('ServiceName', getenv('HOSTNAME'))
- MDC.put('RequestID', request_id)
- MDC.put('InvocationID', invocation_id)
-
- kwargs['request_id'] = request_id
- kwargs['invocation_id'] = invocation_id
- return func(*args, **kwargs)
-
- return wrapper
-
-
-def _load_sub_schema_from_file():
- try:
- with open(os.path.join(os.path.dirname(__file__), 'sub_schema.json')) as sub:
- return json.load(sub)
- except OSError as err:
- logger.error(f'Failed to read sub schema file: {err}', exc_info=True)
- except JSONDecodeError as json_err:
- logger.error(f'sub schema file is not a valid JSON file: {json_err}', exc_info=True)
-
-
-class AppConfig:
- INSTANCE = None
-
- def __init__(self):
- conf = self._get_pmsh_config()
- self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'),
- 'aaf_pass': conf['config'].get('aaf_password')}
- self.enable_tls = conf['config'].get('enable_tls')
- self.ca_cert_path = conf['config'].get('ca_cert_path')
- self.cert_path = conf['config'].get('cert_path')
- self.key_path = conf['config'].get('key_path')
- self.streams_subscribes = conf['config'].get('streams_subscribes')
- self.streams_publishes = conf['config'].get('streams_publishes')
- self.operational_policy_name = conf['config'].get('operational_policy_name')
- self.control_loop_name = conf['config'].get('control_loop_name')
- self.sub_schema = _load_sub_schema_from_file()
- self.subscription = Subscription(self.control_loop_name,
- self.operational_policy_name,
- **conf['config']['pmsh_policy']['subscription'])
- self.nf_filter = None
-
- def __new__(cls, *args, **kwargs):
- if AppConfig.INSTANCE is None:
- AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs)
- return AppConfig.INSTANCE
-
- @mdc_handler
- @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
- retry=retry_if_exception_type(ValueError))
- def _get_pmsh_config(self, **kwargs):
- """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
- is received, it retries after 2 seconds for 5 times before raising an exception.
-
- Returns:
- dict: Dictionary representation of the the service configuration
-
- Raises:
- Exception: If any error occurred pulling configuration from Config binding service.
- """
- try:
- logger.info('Attempting to fetch PMSH Configuration from CBS.')
- config = get_all()
- logger.info(f'Successfully fetched PMSH config from CBS: {config}')
- return config
- except Exception as e:
- logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
- raise ValueError(e)
-
- def validate_sub_schema(self):
- """
- Validates schema of PMSH subscription
-
- Raises:
- ValidationError: If the PMSH subscription schema is invalid
- """
- sub_data = self.subscription.__dict__
- validate(instance=sub_data, schema=self.sub_schema)
- nf_filter = sub_data["nfFilter"]
- if not [filter_name for filter_name, val in nf_filter.items() if len(val) > 0]:
- raise ValidationError("At least one filter within nfFilter must not be empty")
- logger.debug("Subscription schema is valid.")
-
- def refresh_config(self):
- """
- Update the relevant attributes of the AppConfig object.
-
- Raises:
- Exception: if cbs request fails.
- """
- try:
- app_conf = self._get_pmsh_config()
- self.operational_policy_name = app_conf['config'].get('operational_policy_name')
- self.control_loop_name = app_conf['config'].get('control_loop_name')
- self.subscription = Subscription(self.control_loop_name,
- self.operational_policy_name,
- **app_conf['config']['pmsh_policy']['subscription'])
- logger.info("AppConfig data has been refreshed")
- except Exception:
- logger.error('Failed to refresh PMSH AppConfig')
- raise
-
- def get_mr_sub(self, sub_name):
- """
- Returns the MrSub object requested.
-
- Args:
- sub_name: the key of the subscriber object.
-
- Returns:
- MrSub: an Instance of an `MrSub` <MrSub> Object.
-
- Raises:
- KeyError: if the sub_name is not found.
- """
- try:
- return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path,
- self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name])
- except KeyError as e:
- logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True)
- raise
-
- def get_mr_pub(self, pub_name):
- """
- Returns the MrPub object requested.
-
- Args:
- pub_name: the key of the publisher object.
-
- Returns:
- MrPub: an Instance of an `MrPub` <MrPub> Object.
-
- Raises:
- KeyError: if the sub_name is not found.
- """
- try:
- return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path,
- self.enable_tls, self.cert_params, **self.streams_publishes[pub_name])
- except KeyError as e:
- logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True)
- raise
-
- @property
- def cert_params(self):
- """
- Returns the tls artifact paths.
-
- Returns:
- cert_path, key_path (tuple): the path to tls cert and key.
- """
- return self.cert_path, self.key_path
-
-
-class _DmaapMrClient:
- def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
- """
- A DMaaP Message Router utility class.
- Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only.
- Args:
- aaf_creds (dict): a dict of aaf secure credentials.
- ca_cert_path (str): path to the ca certificate.
- enable_tls (bool): TLS if True, else False
- cert_params (tuple): client side (cert, key) tuple.
- **kwargs: a dict of streams_{subscribes|publishes} data.
- """
- self.topic_url = kwargs.get('dmaap_info').get('topic_url')
- self.aaf_id = aaf_creds.get('aaf_id')
- self.aaf_pass = aaf_creds.get('aaf_pass')
- self.ca_cert_path = ca_cert_path
- self.enable_tls = enable_tls
- self.cert_params = cert_params
-
-
-class _MrPub(_DmaapMrClient):
- def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
- self.pub_name = pub_name
- super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
-
- @mdc_handler
- def publish_to_topic(self, event_json, **kwargs):
- """
- Publish the event to the DMaaP Message Router topic.
-
- Args:
- event_json (dict): the json data to be published.
-
- Raises:
- Exception: if post request fails.
- """
- try:
- session = requests.Session()
- headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
- 'InvocationID': kwargs['invocation_id'],
- 'RequestID': kwargs['request_id']
- }
- logger.info(f'Publishing event to {self.topic_url}')
- response = session.post(self.topic_url, headers=headers,
- auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
- verify=(self.ca_cert_path if self.enable_tls else False))
- response.raise_for_status()
- except Exception as e:
- raise e
-
- def publish_subscription_event_data(self, subscription, nf):
- """
- Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
-
- Args:
- subscription (Subscription): the `Subscription` <Subscription> object.
- nf (NetworkFunction): the NetworkFunction to include in the event.
- """
- try:
- subscription_event = subscription.prepare_subscription_event(nf)
- logger.debug(f'Subscription event: {subscription_event}')
- self.publish_to_topic(subscription_event)
- except Exception as e:
- logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True)
- raise e
-
-
-class _MrSub(_DmaapMrClient):
- def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
- self.sub_name = sub_name
- super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
-
- @mdc_handler
- def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs):
- """
- Returns the json data from the MrTopic.
-
- Args:
- consumer_id (str): Within your subscribers group, a name that uniquely
- identifies your subscribers process.
- consumer_group (str): A name that uniquely identifies your subscribers.
- timeout (int): The request timeout value in mSec.
-
- Returns:
- list[str]: the json response from DMaaP Message Router topic.
- """
- try:
- session = requests.Session()
- headers = {'accept': 'application/json', 'content-type': 'application/json',
- 'InvocationID': kwargs['invocation_id'],
- 'RequestID': kwargs['request_id']}
- logger.info(f'Fetching messages from MR topic: {self.topic_url}')
- response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
- f'?timeout={timeout}',
- auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
- verify=(self.ca_cert_path if self.enable_tls else False))
- if response.status_code == 503:
- logger.error(f'MR Service is unavailable at present: {response.content}')
- pass
- response.raise_for_status()
- if response.ok:
- return response.json()
- except Exception as e:
- logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
- raise
-
-
-class PeriodicTask(Timer):
- """
- See :class:`Timer`.
- """
-
- def run(self):
- self.function(*self.args, **self.kwargs)
- while not self.finished.wait(self.interval):
- try:
- self.function(*self.args, **self.kwargs)
- except Exception as e:
- logger.error(f'Exception in thread: {self.name}: {e}', exc_info=True)
diff --git a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
index 5065ce8a..cfcda091 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
@@ -16,9 +16,11 @@
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
import json
+
+from mod.api.services.measurement_group_service import mg_nf_states, \
+ AdministrativeState, MgNfState
from mod.pmsh_config import MRTopic, AppConfig
from mod import logger
-from mod.subscription import AdministrativeState, subscription_nf_states, SubNfState
from mod.api.db_models import MeasurementGroupModel, NfMeasureGroupRelationalModel
from mod.api.services import measurement_group_service
@@ -96,12 +98,12 @@ class PolicyResponseHandler:
NfMeasureGroupRelationalModel.measurement_grp_name == measurement_group_name,
NfMeasureGroupRelationalModel.nf_name == nf_name
).one_or_none()
- if nf_msg_rel.nf_measure_grp_status == SubNfState.PENDING_DELETE.value:
+ if nf_msg_rel.nf_measure_grp_status == MgNfState.PENDING_DELETE.value:
administrative_state = AdministrativeState.LOCKING.value
- elif nf_msg_rel.nf_measure_grp_status == SubNfState.PENDING_CREATE.value:
+ elif nf_msg_rel.nf_measure_grp_status == MgNfState.PENDING_CREATE.value:
administrative_state = AdministrativeState.UNLOCKED.value
- nf_measure_grp_status = (subscription_nf_states[administrative_state]
+ nf_measure_grp_status = (mg_nf_states[administrative_state]
[response_message]).value
policy_response_handle_functions[administrative_state][response_message](
measurement_group_name=measurement_group_name, status=nf_measure_grp_status,
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
deleted file mode 100755
index ddb6e1f5..00000000
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py
+++ /dev/null
@@ -1,295 +0,0 @@
-# ============LICENSE_START===================================================
-# Copyright (C) 2019-2022 Nordix Foundation.
-# ============================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# SPDX-License-Identifier: Apache-2.0
-# ============LICENSE_END=====================================================
-from enum import Enum
-
-from mod import db, logger
-from mod.api.db_models import SubscriptionModel, NfSubRelationalModel, NetworkFunctionModel
-
-
-class SubNfState(Enum):
- PENDING_CREATE = 'PENDING_CREATE'
- CREATE_FAILED = 'CREATE_FAILED'
- CREATED = 'CREATED'
- PENDING_DELETE = 'PENDING_DELETE'
- DELETE_FAILED = 'DELETE_FAILED'
- DELETED = 'DELETED'
-
-
-class AdministrativeState(Enum):
- UNLOCKED = 'UNLOCKED'
- LOCKING = 'LOCKING'
- LOCKED = 'LOCKED'
- FILTERING = 'FILTERING'
-
-
-subscription_nf_states = {
- AdministrativeState.LOCKED.value: {
- 'success': SubNfState.DELETED,
- 'failed': SubNfState.DELETE_FAILED
- },
- AdministrativeState.UNLOCKED.value: {
- 'success': SubNfState.CREATED,
- 'failed': SubNfState.CREATE_FAILED
- },
- AdministrativeState.LOCKING.value: {
- 'success': SubNfState.DELETED,
- 'failed': SubNfState.DELETE_FAILED
- }
-}
-
-
-def _get_nf_objects(nf_sub_relationships):
- nfs = []
- for nf_sub_entry in nf_sub_relationships:
- nf_model_object = NetworkFunctionModel.query.filter(
- NetworkFunctionModel.nf_name == nf_sub_entry.nf_name).one_or_none()
- nfs.append(nf_model_object.to_nf())
- return nfs
-
-
-class Subscription:
- def __init__(self, control_loop_name, operational_policy_name, **kwargs):
- self.subscriptionName = kwargs.get('subscriptionName')
- self.administrativeState = kwargs.get('administrativeState')
- self.fileBasedGP = kwargs.get('fileBasedGP')
- self.fileLocation = kwargs.get('fileLocation')
- self.nfFilter = kwargs.get('nfFilter')
- self.measurementGroups = kwargs.get('measurementGroups')
- self.control_loop_name = control_loop_name
- self.operational_policy_name = operational_policy_name
- self.create()
-
- def update_sub_params(self, admin_state, file_based_gp, file_location, meas_groups):
- self.administrativeState = admin_state
- self.fileBasedGP = file_based_gp
- self.fileLocation = file_location
- self.measurementGroups = meas_groups
-
- def create(self):
- """ Creates a subscription database entry
-
- Returns:
- Subscription object
- """
- try:
- existing_subscription = (SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == self.subscriptionName).one_or_none())
- if existing_subscription is None:
- new_subscription = \
- SubscriptionModel(subscription_name=self.subscriptionName,
- operational_policy_name=self.operational_policy_name,
- control_loop_name=self.control_loop_name,
- status=AdministrativeState.LOCKED.value)
-
- db.session.add(new_subscription)
- db.session.commit()
- return new_subscription
- else:
- logger.debug(f'Subscription {self.subscriptionName} already exists,'
- f' returning this subscription..')
- return existing_subscription
- except Exception as e:
- logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}',
- exc_info=True)
- finally:
- db.session.remove()
-
- def update_subscription_status(self):
- """ Updates the status of subscription in subscription table """
- try:
- SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == self.subscriptionName) \
- .update({SubscriptionModel.status: self.administrativeState},
- synchronize_session='evaluate')
-
- db.session.commit()
- except Exception as e:
- logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}',
- exc_info=True)
- finally:
- db.session.remove()
-
- def prepare_subscription_event(self, nf):
- """Prepare the sub event for publishing
-
- Args:
- nf (NetworkFunction): the AAI nf.
-
- Returns:
- dict: the Subscription event to be published.
- """
- try:
- clean_sub = \
- {k: v for k, v in self.__dict__.items() if
- (k != 'nfFilter' and k != 'control_loop_name' and k != 'operational_policy_name')}
- if self.administrativeState == AdministrativeState.LOCKING.value:
- change_type = 'DELETE'
- else:
- change_type = 'CREATE'
-
- sub_event = {
- 'nfName': nf.nf_name,
- 'ipAddress': nf.ipv4_address if nf.ipv6_address in (None, '') else nf.ipv6_address,
- 'blueprintName': nf.sdnc_model_name,
- 'blueprintVersion': nf.sdnc_model_version,
- 'operationalPolicyName': self.operational_policy_name,
- 'changeType': change_type,
- 'controlLoopName': self.control_loop_name,
- 'subscription': clean_sub}
- return sub_event
- except Exception as e:
- logger.error(f'Failed to prep Sub event for xNF {nf.nf_name}: {e}', exc_info=True)
- raise
-
- def add_network_function_to_subscription(self, nf, sub_model):
- """ Associates a network function to a Subscription
-
- Args:
- sub_model(SubscriptionModel): The SubscriptionModel from the DB.
- nf(NetworkFunction): A NetworkFunction object.
- """
- try:
- current_nf = nf.create()
- existing_entry = NfSubRelationalModel.query.filter(
- NfSubRelationalModel.subscription_name == self.subscriptionName,
- NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none()
- if existing_entry is None:
- new_nf_sub = NfSubRelationalModel(self.subscriptionName,
- nf.nf_name, SubNfState.PENDING_CREATE.value)
- sub_model.nfs.append(new_nf_sub)
- db.session.add(sub_model)
- db.session.commit()
- logger.info(f'Network function {current_nf.nf_name} added to Subscription '
- f'{self.subscriptionName}')
- except Exception as e:
- logger.error(f'Failed to add nf {nf.nf_name} to subscription '
- f'{self.subscriptionName}: {e}', exc_info=True)
- logger.debug(f'Subscription {self.subscriptionName} now contains these XNFs:'
- f'{[nf.nf_name for nf.nf_name in self.get_network_functions()]}')
-
- def get(self):
- """ Retrieves a SubscriptionModel object
-
- Returns:
- SubscriptionModel object else None
- """
- sub_model = SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
- return sub_model
-
- def get_local_sub_admin_state(self):
- """ Retrieves the subscription admin state
-
- Returns:
- str: The admin state of the SubscriptionModel
- """
- sub_model = SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
- db.session.remove()
- return sub_model.status
-
- def create_subscription_on_nfs(self, nfs, mr_pub):
- """ Publishes an event to create a Subscription on an nf
-
- Args:
- nfs(list[NetworkFunction]): A list of NetworkFunction Objects.
- mr_pub (_MrPub): MR publisher
- """
- try:
- existing_nfs = self.get_network_functions()
- sub_model = self.get()
- for nf in [new_nf for new_nf in nfs if new_nf not in existing_nfs]:
- logger.info(f'Publishing event to create '
- f'Sub: {self.subscriptionName} on nf: {nf.nf_name}')
- mr_pub.publish_subscription_event_data(self, nf)
- self.add_network_function_to_subscription(nf, sub_model)
- self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value,
- nf.nf_name)
- except Exception as err:
- raise Exception(f'Error publishing create event to MR: {err}')
-
- def delete_subscription_from_nfs(self, nfs, mr_pub):
- """ Publishes an event to delete a Subscription from an nf
-
- Args:
- nfs(list[NetworkFunction]): A list of NetworkFunction Objects.
- mr_pub (_MrPub): MR publisher
- """
- try:
- for nf in nfs:
- logger.debug(f'Publishing Event to delete '
- f'Sub: {self.subscriptionName} from the nf: {nf.nf_name}')
- mr_pub.publish_subscription_event_data(self, nf)
- self.update_sub_nf_status(self.subscriptionName,
- SubNfState.PENDING_DELETE.value,
- nf.nf_name)
- except Exception as err:
- raise Exception(f'Error publishing delete event to MR: {err}')
-
- @staticmethod
- def get_all_nfs_subscription_relations():
- """ Retrieves all network function to subscription relations
-
- Returns:
- list(NfSubRelationalModel): NetworkFunctions per Subscription list else empty
- """
- nf_per_subscriptions = NfSubRelationalModel.query.all()
- db.session.remove()
- return nf_per_subscriptions
-
- @staticmethod
- def update_sub_nf_status(subscription_name, status, nf_name):
- """ Updates the status of the subscription for a particular nf
-
- Args:
- subscription_name (str): The subscription name
- nf_name (str): The network function name
- status (str): Status of the subscription
- """
- try:
- NfSubRelationalModel.query.filter(
- NfSubRelationalModel.subscription_name == subscription_name,
- NfSubRelationalModel.nf_name == nf_name). \
- update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate')
- db.session.commit()
- except Exception as e:
- logger.error(f'Failed to update status of nf: {nf_name} for subscription: '
- f'{subscription_name}: {e}', exc_info=True)
-
- def get_network_functions(self):
- nf_sub_relationships = NfSubRelationalModel.query.filter(
- NfSubRelationalModel.subscription_name == self.subscriptionName)
- nfs = _get_nf_objects(nf_sub_relationships)
- db.session.remove()
- return nfs
-
- def get_delete_failed_nfs(self):
- nf_sub_relationships = NfSubRelationalModel.query.filter(
- NfSubRelationalModel.subscription_name == self.subscriptionName,
- NfSubRelationalModel.nf_sub_status == SubNfState.DELETE_FAILED.value)
- nfs = _get_nf_objects(nf_sub_relationships)
- db.session.remove()
- return nfs
-
- def get_delete_pending_nfs(self):
- nf_sub_relationships = NfSubRelationalModel.query.filter(
- NfSubRelationalModel.subscription_name == self.subscriptionName,
- NfSubRelationalModel.nf_sub_status == SubNfState.PENDING_DELETE.value)
- nfs = _get_nf_objects(nf_sub_relationships)
- db.session.remove()
- return nfs
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
deleted file mode 100644
index 5fbb9a6c..00000000
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
+++ /dev/null
@@ -1,118 +0,0 @@
-# ============LICENSE_START===================================================
-# Copyright (C) 2020-2021 Nordix Foundation.
-# ============================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# SPDX-License-Identifier: Apache-2.0
-# ============LICENSE_END=====================================================
-from jsonschema import ValidationError
-
-from mod import logger, aai_client
-from mod.network_function import NetworkFunctionFilter
-from mod.subscription import AdministrativeState
-
-
-class SubscriptionHandler:
- def __init__(self, mr_pub, aai_sub, app, app_conf):
- self.mr_pub = mr_pub
- self.aai_sub = aai_sub
- self.app = app
- self.app_conf = app_conf
- self.aai_event_thread = None
-
- def execute(self):
- """
- Checks for changes of administrative state in config and proceeds to process
- the Subscription if a change has occurred
- """
- self.app.app_context().push()
- try:
- local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
- if local_admin_state == AdministrativeState.LOCKING.value:
- self._check_for_failed_nfs()
- else:
- self.app_conf.refresh_config()
- self.app_conf.validate_sub_schema()
- new_administrative_state = self.app_conf.subscription.administrativeState
- if local_admin_state == new_administrative_state:
- logger.info(f'Administrative State did not change in the app config: '
- f'{new_administrative_state}')
- else:
- self._check_state_change(local_admin_state, new_administrative_state)
- except (ValidationError, TypeError) as err:
- logger.error(f'Error occurred during validation of subscription schema {err}',
- exc_info=True)
- except Exception as err:
- logger.error(f'Error occurred during the activation/deactivation process {err}',
- exc_info=True)
-
- def _check_state_change(self, local_admin_state, new_administrative_state):
- if new_administrative_state == AdministrativeState.UNLOCKED.value:
- logger.info(f'Administrative State has changed from {local_admin_state} '
- f'to {new_administrative_state}.')
- self._activate(new_administrative_state)
- elif new_administrative_state == AdministrativeState.LOCKED.value:
- logger.info(f'Administrative State has changed from {local_admin_state} '
- f'to {new_administrative_state}.')
- self._deactivate()
- else:
- raise Exception(f'Invalid AdministrativeState: {new_administrative_state}')
-
- def _activate(self, new_administrative_state):
- if not self.app_conf.nf_filter:
- self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter)
- self.app_conf.subscription.update_sub_params(new_administrative_state,
- self.app_conf.subscription.fileBasedGP,
- self.app_conf.subscription.fileLocation,
- self.app_conf.subscription.measurementGroups)
- nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf, self.app_conf.nf_filter)
- self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub)
- self.app_conf.subscription.update_subscription_status()
-
- def _deactivate(self):
- nfs = self.app_conf.subscription.get_network_functions()
- if nfs:
- self.stop_aai_event_thread()
- self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value
- logger.info('Subscription is now LOCKING/DEACTIVATING.')
- self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub)
- self.app_conf.subscription.update_subscription_status()
-
- def stop_aai_event_thread(self):
- if self.aai_event_thread is not None:
- self.aai_event_thread.cancel()
- self.aai_event_thread = None
- logger.info('Stopping polling for NFs events on AAI-EVENT topic in MR.')
-
- def _check_for_failed_nfs(self):
- logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.')
- del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs()
- if del_failed_nfs or self.app_conf.subscription.get_delete_pending_nfs():
- for nf in del_failed_nfs:
- nf_model = nf.get(nf.nf_name)
- if nf_model.retry_count < 3:
- logger.info(f'Retry deletion of subscription '
- f'{self.app_conf.subscription.subscriptionName} '
- f'from NF: {nf.nf_name}')
- self.app_conf.subscription.delete_subscription_from_nfs([nf], self.mr_pub)
- nf.increment_retry_count()
- else:
- logger.error(f'Failed to delete the subscription '
- f'{self.app_conf.subscription.subscriptionName} '
- f'from NF: {nf.nf_name} after {nf_model.retry_count} '
- f'attempts. Removing NF from DB')
- nf.delete(nf_name=nf.nf_name)
- else:
- logger.info('Proceeding to LOCKED adminState.')
- self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
- self.app_conf.subscription.update_subscription_status()