summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoseph O'Leary <joseph.o.leary@est.tech>2020-08-18 08:15:56 +0000
committerGerrit Code Review <gerrit@onap.org>2020-08-18 08:15:56 +0000
commitd7f2b9e96e3c423a871fab757f3ae11372134125 (patch)
tree8bffddc6ff2e75161504a3302f37a856b2099030
parent4feffc20485b5bd304d0dc1edb9255c2cc55ed20 (diff)
parent38ccb471732faaad6a25defee0753c1c5ac60cf0 (diff)
Merge "Refactor and bug fixes"
-rwxr-xr-xcomponents/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml184
-rwxr-xr-xcomponents/pm-subscription-handler/dpo/spec/pmsh-component-spec.json18
-rwxr-xr-xcomponents/pm-subscription-handler/log_config.yaml54
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/__init__.py2
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_client.py6
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py3
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/exit_handler.py14
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/network_function.py9
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py32
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/subscription.py205
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py44
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/pmsh_service_main.py21
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_aai_event_handler.py33
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_controller.py9
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_exit_handler.py77
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_network_function.py11
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_subscription.py73
-rw-r--r--components/pm-subscription-handler/tests/test_subscription_handler.py95
18 files changed, 387 insertions, 503 deletions
diff --git a/components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml b/components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml
deleted file mode 100755
index e3f7987c..00000000
--- a/components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml
+++ /dev/null
@@ -1,184 +0,0 @@
-#
-# ============LICENSE_START=======================================================
-# Copyright (C) 2020 Nordix Foundation.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the 'License');
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an 'AS IS' BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# SPDX-License-Identifier: Apache-2.0
-# ============LICENSE_END=========================================================
-#
-
-tosca_definitions_version: cloudify_dsl_1_3
-
-imports:
- - 'http://www.getcloudify.org/spec/cloudify/4.5.5/types.yaml'
- - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R6/k8splugin/1.7.2/k8splugin_types.yaml'
- - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.ccsdk.platform.plugins/type_files/dmaap/dmaap.yaml'
- - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.ccsdk.platform.plugins/type_files/pgaas/1.1.0/pgaas_types.yaml'
- - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R6/clamppolicyplugin/1.1.0/clamppolicyplugin_types.yaml'
-
-inputs:
- tag_version:
- type: string
- description: Docker image to be used
- default: 'nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.pmsh:latest'
- replicas:
- type: integer
- description: Number of instances
- default: 1
- policy_model_id:
- type: 'string'
- default: 'onap.policies.monitoring.dcae-pm-subscription-handler'
- policy_id:
- type: 'string'
- default: 'onap.policies.monitoring.dcae-pm-subscription-handler'
- operational_policy_name:
- type: string
- default: 'pmsh-operational-policy'
- control_loop_name:
- type: string
- pmsh_publish_topic_name:
- type: string
- default: 'unauthenticated.DCAE_CL_OUTPUT'
- policy_feedback_topic_name:
- type: string
- default: 'PMSH_CL_INPUT'
- aai_notification_topic_name:
- type: string
- default: 'AAI-EVENT'
- publisher_client_role:
- type: string
- description: Client role to request secure access to topic
- default: 'org.onap.dcae.pmPublisher'
- subscriber_client_role:
- type: string
- description: Client role to request secure access to topic
- default: 'org.onap.dcae.pmSubscriber'
- client_id:
- type: string
- description: Client id for given AAF client
- default: 'dcae@dcae.onap.org'
- client_password:
- type: string
- description: Password for AAF client provided as client_id
- dcae_location:
- type: string
- description: DCAE location for the subscriber, used to set up routing
- default: 'san-francisco'
- cpu_limit:
- type: string
- default: '1000m'
- cpu_request:
- type: string
- default: '1000m'
- memory_limit:
- type: string
- default: '1024Mi'
- memory_request:
- type: string
- default: '1024Mi'
- pgaas_cluster_name:
- type: string
- default: 'dcae-pg-primary.onap'
-node_templates:
- pgaasvm:
- type: dcae.nodes.pgaas.database
- properties:
- writerfqdn: { get_input: pgaas_cluster_name }
- name: 'pmsh'
- pm_subscribe_topic:
- type: ccsdk.nodes.Topic
- properties:
- topic_name: { get_input: policy_feedback_topic_name }
- pmsh:
- type: dcae.nodes.ContainerizedServiceComponentUsingDmaap
- interfaces:
- cloudify.interfaces.lifecycle:
- create:
- inputs:
- ports:
- - '8443:0'
- envs:
- PMSH_PG_URL:
- { get_attribute: [ pgaasvm, admin, host ] }
- PMSH_PG_PASSWORD:
- { get_attribute: [ pgaasvm, admin, password ] }
- PMSH_PG_USERNAME:
- { get_attribute: [ pgaasvm, admin, user ] }
- PMSH_DB_NAME:
- { get_attribute: [ pgaasvm, admin, database ] }
-
- relationships:
- - type: ccsdk.relationships.subscribe_to_events
- target: pm_subscribe_topic
- - type: cloudify.relationships.depends_on
- target: pgaasvm
- - type: cloudify.relationships.depends_on
- target: pmsh-policy
-
- properties:
- service_component_type: 'dcae-pmsh'
- service_component_name_override: 'dcae-pmsh'
- application_config:
- aaf_identity: { get_input: client_id }
- aaf_password: { get_input: client_password }
- operational_policy_name: { get_input: operational_policy_name }
- control_loop_name: { get_input: control_loop_name }
- cert_path: '/opt/app/pmsh/etc/certs/cert.pem'
- key_path: '/opt/app/pmsh/etc/certs/key.pem'
- ca_cert_path: '/opt/app/pmsh/etc/certs/cacert.pem'
- streams_publishes:
- policy_pm_publisher:
- type: message_router
- dmaap_info:
- topic_url: {concat: ["https://message-router:3905/events/", { get_input: pmsh_publish_topic_name }]}
- streams_subscribes:
- policy_pm_subscriber:
- type: message_router
- dmaap_info: <<pm_subscribe_topic>>
- aai_subscriber:
- type: message_router
- dmaap_info:
- topic_url: {concat: ["https://message-router:3905/events/", { get_input: aai_notification_topic_name }]}
- resource_config:
- limits:
- cpu: { get_input: cpu_limit }
- memory: { get_input: memory_limit }
- requests:
- cpu: { get_input: cpu_request }
- memory: { get_input: memory_request }
- docker_config:
- healthcheck:
- endpoint: /healthcheck
- interval: 15s
- timeout: 1s
- type: https
- streams_subscribes:
- - name: pm_subscribe_topic
- location: { get_input: dcae_location }
- client_role: { get_input: subscriber_client_role }
- type: message-router
- image: { get_input: tag_version }
- replicas: { get_input: replicas }
- log_info:
- log_directory: '/var/log/ONAP/dcaegen2/services/pmsh'
- tls_info:
- cert_directory: '/opt/app/pmsh/etc/certs'
- use_tls: true
- pmsh-policy:
- type: clamp.nodes.policy
- properties:
- policy_model_id:
- get_input: policy_model_id
- policy_id:
- get_input: policy_id \ No newline at end of file
diff --git a/components/pm-subscription-handler/dpo/spec/pmsh-component-spec.json b/components/pm-subscription-handler/dpo/spec/pmsh-component-spec.json
index 43028857..69513bce 100755
--- a/components/pm-subscription-handler/dpo/spec/pmsh-component-spec.json
+++ b/components/pm-subscription-handler/dpo/spec/pmsh-component-spec.json
@@ -100,6 +100,24 @@
"designer_editable": false
},
{
+ "name": "enable_tls",
+ "value": true,
+ "description": "Boolean to (en|dis)able TLS",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": true,
+ "type": "boolean"
+ },
+ {
+ "name": "protocol",
+ "value": "https",
+ "type": "string",
+ "description": "Protocol PMSH api will use. If enable_tls is disabled, set protocol to http",
+ "sourced_at_deployment": false,
+ "policy_editable": false,
+ "designer_editable": true
+ },
+ {
"name": "policy_model_id",
"value": "onap.policies.monitoring.dcae-pm-initiation-handler",
"description": "PMSH monitoring policy model id",
diff --git a/components/pm-subscription-handler/log_config.yaml b/components/pm-subscription-handler/log_config.yaml
index 971c1882..b2d8f43c 100755
--- a/components/pm-subscription-handler/log_config.yaml
+++ b/components/pm-subscription-handler/log_config.yaml
@@ -1,27 +1,27 @@
-version: 1
-
-disable_existing_loggers: true
-
-loggers:
- onap_logger:
- level: INFO
- handlers: [onap_log_handler, stdout_handler]
- propagate: false
-handlers:
- onap_log_handler:
- class: logging.handlers.RotatingFileHandler
- filename: /var/log/ONAP/dcaegen2/services/pmsh/application.log
- mode: a
- maxBytes: 10000000
- backupCount: 10
- formatter: mdcFormatter
- stdout_handler:
- class: logging.StreamHandler
- formatter: mdcFormatter
-formatters:
- mdcFormatter:
- format: '%(asctime)s | %(threadName)s | %(thread)d | %(levelname)s | %(module)s
- | %(funcName)s | %(mdc)s | %(message)s'
- mdcfmt: '{ServiceName} | {RequestID} | {InvocationID}'
- datefmt: '%Y-%m-%dT%H:%M:%S%z'
- (): onaplogging.mdcformatter.MDCFormatter
+version: 1
+
+disable_existing_loggers: true
+
+loggers:
+ onap_logger:
+ level: INFO
+ handlers: [onap_log_handler, stdout_handler]
+ propagate: false
+handlers:
+ onap_log_handler:
+ class: logging.handlers.RotatingFileHandler
+ filename: /var/log/ONAP/dcaegen2/services/pmsh/application.log
+ mode: a
+ maxBytes: 10000000
+ backupCount: 10
+ formatter: mdcFormatter
+ stdout_handler:
+ class: logging.StreamHandler
+ formatter: mdcFormatter
+formatters:
+ mdcFormatter:
+ format: '%(asctime)s | %(threadName)s | %(thread)d | %(levelname)s | %(module)s
+ | %(funcName)s | %(mdc)s | %(message)s'
+ mdcfmt: '{ServiceName} | {RequestID} | {InvocationID}'
+ datefmt: '%Y-%m-%dT%H:%M:%S%z'
+ (): onaplogging.mdcformatter.MDCFormatter
diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
index 4c86ccda..58cd8b3c 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
@@ -43,9 +43,11 @@ def launch_api_server(app_config):
connex_app = _get_app()
connex_app.add_api('api/pmsh_swagger.yml')
if app_config.enable_tls:
+ logger.info('Launching secure http API server')
connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'),
ssl_context=app_config.cert_params)
else:
+ logger.info('Launching unsecure http API server')
connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'))
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
index bd052741..5a3c543a 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
@@ -114,11 +114,11 @@ def _filter_nf_data(nf_data, nf_filter):
Returns a list of filtered NetworkFunctions using the nf_filter.
Args:
- nf_data : the nf json data from AAI.
- nf_filter: the `NetworkFunctionFilter <NetworkFunctionFilter>` to be applied.
+ nf_data(dict): the nf json data from AAI.
+ nf_filter(NetworkFunctionFilter): the NetworkFunctionFilter to be applied.
Returns:
- set: a set of filtered NetworkFunctions.
+ set(NetworkFunctions): a set of filtered NetworkFunctions.
Raises:
KeyError: if AAI data cannot be parsed.
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
index f1e8cf27..60b69602 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
@@ -72,8 +72,7 @@ def _process_event(action, new_status, xnf_name, mr_pub, app_conf):
local_xnf = NetworkFunction.get(xnf_name)
if local_xnf is None:
- logger.info(f'Activating subscription for network function {xnf_name}')
- app_conf.subscription.process_subscription([NetworkFunction(
+ app_conf.subscription.activate_subscription([NetworkFunction(
nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf)
else:
logger.debug(f"Update Event for network function {xnf_name} will not be processed "
diff --git a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
index 3d02375d..12932966 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
@@ -25,28 +25,28 @@ class ExitHandler:
Args:
periodic_tasks (List[PeriodicTask]): PeriodicTasks that needs to be cancelled.
+ app_conf (AppConfig): The PMSH Application Configuration.
subscription_handler (SubscriptionHandler): The subscription handler instance.
"""
shutdown_signal_received = False
- def __init__(self, *, periodic_tasks, subscription_handler):
+ def __init__(self, *, periodic_tasks, app_conf, subscription_handler):
self.periodic_tasks = periodic_tasks
+ self.app_conf = app_conf
self.subscription_handler = subscription_handler
def __call__(self, sig_num, frame):
logger.info('Graceful shutdown of PMSH initiated.')
logger.debug(f'ExitHandler was called with signal number: {sig_num}.')
- current_sub = self.subscription_handler.current_sub
- if current_sub and current_sub.administrativeState == AdministrativeState.UNLOCKED.value:
+ current_sub = self.app_conf.subscription
+ if current_sub.administrativeState == AdministrativeState.UNLOCKED.value:
try:
+ current_sub.deactivate_subscription(self.subscription_handler.mr_pub, self.app_conf)
+ current_sub.update_subscription_status()
for thread in self.periodic_tasks:
logger.debug(f'Cancelling periodic task with thread name: {thread.name}.')
thread.cancel()
- current_sub.administrativeState = AdministrativeState.LOCKED.value
- current_sub.process_subscription(current_sub.get_network_functions(),
- self.subscription_handler.mr_pub,
- self.subscription_handler.app_conf)
except Exception as e:
logger.error(f'Failed to shut down PMSH application: {e}', exc_info=True)
ExitHandler.shutdown_signal_received = True
diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
index 979cc775..191e9512 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
@@ -17,7 +17,6 @@
# ============LICENSE_END=====================================================
import re
-from enum import Enum
from mod import logger, db
from mod.api.db_models import NetworkFunctionModel
@@ -55,7 +54,6 @@ class NetworkFunction:
db.session.commit()
logger.info(f'Network Function {new_nf.nf_name} successfully created.')
return new_nf
-
else:
logger.debug(f'Network function {existing_nf.nf_name} already exists,'
f' returning this network function..')
@@ -109,9 +107,4 @@ class NetworkFunctionFilter:
bool: True if matched, else False.
"""
return self.regex_matcher.search(nf_name) and \
- orchestration_status == OrchestrationStatus.ACTIVE.value
-
-
-class OrchestrationStatus(Enum):
- ACTIVE = 'Active'
- INVENTORIED = 'Inventoried'
+ orchestration_status == 'Active'
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index 50eb122b..872843d7 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -15,7 +15,6 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-import threading
import uuid
from os import getenv
from threading import Timer
@@ -45,20 +44,22 @@ def mdc_handler(function):
return decorator
-class ThreadSafeSingleton(type):
- _instances = {}
- _singleton_lock = threading.Lock()
+class MySingleton(object):
+ instances = {}
- def __call__(cls, *args, **kwargs):
- # double-checked locking pattern (https://en.wikipedia.org/wiki/Double-checked_locking)
- if cls not in cls._instances:
- with cls._singleton_lock:
- if cls not in cls._instances:
- cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
- return cls._instances[cls]
+ def __new__(cls, clz=None):
+ if clz is None:
+ if cls.__name__ not in MySingleton.instances:
+ MySingleton.instances[cls.__name__] = \
+ object.__new__(cls)
+ return MySingleton.instances[cls.__name__]
+ MySingleton.instances[clz.__name__] = clz()
+ MySingleton.first = clz
+ return type(clz.__name__, (MySingleton,), dict(clz.__dict__))
-class AppConfig(metaclass=ThreadSafeSingleton):
+class AppConfig:
+ INSTANCE = None
def __init__(self):
try:
@@ -78,6 +79,11 @@ class AppConfig(metaclass=ThreadSafeSingleton):
self.subscription = Subscription(**conf['policy']['subscription'])
self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter)
+ def __new__(cls, *args, **kwargs):
+ if AppConfig.INSTANCE is None:
+ AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs)
+ return AppConfig.INSTANCE
+
@mdc_handler
@retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
def _get_pmsh_config(self, **kwargs):
@@ -272,7 +278,7 @@ class _MrSub(_DmaapMrClient):
if response.ok:
return response.json()
except Exception as e:
- logger.error(f'Failed to fetch message from MR: {e}')
+ logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
raise
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
index dbcd7a5e..97bfc401 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
@@ -33,6 +33,7 @@ class SubNfState(Enum):
class AdministrativeState(Enum):
UNLOCKED = 'UNLOCKED'
LOCKED = 'LOCKED'
+ PENDING = 'PENDING'
subscription_nf_states = {
@@ -55,28 +56,7 @@ class Subscription:
self.fileLocation = kwargs.get('fileLocation')
self.nfFilter = kwargs.get('nfFilter')
self.measurementGroups = kwargs.get('measurementGroups')
-
- def prepare_subscription_event(self, xnf_name, app_conf):
- """Prepare the sub event for publishing
-
- Args:
- xnf_name: the AAI xnf name.
- app_conf (AppConfig): the application configuration.
-
- Returns:
- dict: the Subscription event to be published.
- """
- try:
- clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
- sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
- 'changeType': 'DELETE'
- if self.administrativeState == AdministrativeState.LOCKED.value
- else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
- 'subscription': clean_sub}
- return sub_event
- except Exception as e:
- logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True)
- raise
+ self.create()
def create(self):
""" Creates a subscription database entry
@@ -89,7 +69,7 @@ class Subscription:
SubscriptionModel.subscription_name == self.subscriptionName).one_or_none())
if existing_subscription is None:
new_subscription = SubscriptionModel(subscription_name=self.subscriptionName,
- status=self.administrativeState)
+ status=AdministrativeState.PENDING.value)
db.session.add(new_subscription)
db.session.commit()
return new_subscription
@@ -101,54 +81,111 @@ class Subscription:
logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}',
exc_info=True)
- def add_network_function_to_subscription(self, nf):
+ def update_subscription_status(self):
+ """ Updates the status of subscription in subscription table """
+ try:
+ SubscriptionModel.query.filter(
+ SubscriptionModel.subscription_name == self.subscriptionName)\
+ .update({SubscriptionModel.status: self.administrativeState},
+ synchronize_session='evaluate')
+
+ db.session.commit()
+ except Exception as e:
+ logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}',
+ exc_info=True)
+
+ def delete_subscription(self):
+ """ Deletes a subscription and all its association from the database. A network function
+ that is only associated with the subscription being removed will also be deleted."""
+ try:
+ subscription = SubscriptionModel.query.filter(
+ SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+ if subscription:
+ for nf_relationship in subscription.nfs:
+ other_nf_relationship = NfSubRelationalModel.query.filter(
+ NfSubRelationalModel.subscription_name != self.subscriptionName,
+ NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none()
+ if not other_nf_relationship:
+ db.session.delete(nf_relationship.nf)
+ db.session.delete(subscription)
+ db.session.commit()
+ except Exception as e:
+ logger.error(f'Failed to delete subscription: {self.subscriptionName} '
+ f'and it\'s relations from the DB: {e}', exc_info=True)
+
+ def prepare_subscription_event(self, xnf_name, app_conf):
+ """Prepare the sub event for publishing
+
+ Args:
+ xnf_name: the AAI xnf name.
+ app_conf (AppConfig): the application configuration.
+
+ Returns:
+ dict: the Subscription event to be published.
+ """
+ try:
+ clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
+ sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
+ 'changeType': 'DELETE'
+ if self.administrativeState == AdministrativeState.LOCKED.value
+ else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
+ 'subscription': clean_sub}
+ return sub_event
+ except Exception as e:
+ logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True)
+ raise
+
+ def add_network_function_to_subscription(self, nf, sub_model):
""" Associates a network function to a Subscription
Args:
- nf : A NetworkFunction object.
+ sub_model(SubscriptionModel): The SubscriptionModel from the DB.
+ nf(NetworkFunction): A NetworkFunction object.
"""
- current_sub = self.create()
try:
current_nf = nf.create()
- logger.debug(f'Adding network function {nf.nf_name} to Subscription '
- f'{current_sub.subscription_name}')
existing_entry = NfSubRelationalModel.query.filter(
- NfSubRelationalModel.subscription_name == current_sub.subscription_name,
+ NfSubRelationalModel.subscription_name == self.subscriptionName,
NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none()
if existing_entry is None:
- new_nf_sub = NfSubRelationalModel(current_sub.subscription_name,
+ new_nf_sub = NfSubRelationalModel(self.subscriptionName,
nf.nf_name, SubNfState.PENDING_CREATE.value)
- new_nf_sub.nf = current_nf
- current_sub.nfs.append(new_nf_sub)
- logger.debug(f'Network function {current_nf.nf_name} added to Subscription '
- f'{current_sub.subscription_name}')
- db.session.add(current_sub)
+ sub_model.nfs.append(new_nf_sub)
+ db.session.add(sub_model)
db.session.commit()
+ logger.info(f'Network function {current_nf.nf_name} added to Subscription '
+ f'{self.subscriptionName}')
except Exception as e:
logger.error(f'Failed to add nf {nf.nf_name} to subscription '
- f'{current_sub.subscription_name}: {e}', exc_info=True)
- logger.debug(f'Subscription {current_sub.subscription_name} now contains these XNFs:'
- f'{Subscription.get_nf_names_per_sub(current_sub.subscription_name)}')
+ f'{self.subscriptionName}: {e}', exc_info=True)
+ logger.debug(f'Subscription {self.subscriptionName} now contains these XNFs:'
+ f'{Subscription.get_nf_names_per_sub(self.subscriptionName)}')
- @staticmethod
- def get(subscription_name):
- """ Retrieves a subscription
-
- Args:
- subscription_name (str): The subscription name
+ def get(self):
+ """ Retrieves a SubscriptionModel object
Returns:
- Subscription object else None
+ SubscriptionModel object else None
"""
return SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == subscription_name).one_or_none()
+ SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+
+ def get_local_sub_admin_state(self):
+ """ Retrieves the subscription admin state
+
+ Returns:
+ str: The admin state of the SubscriptionModel
+ """
+ sub_model = SubscriptionModel.query.filter(
+ SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+ return sub_model.status
@staticmethod
def get_all():
""" Retrieves a list of subscriptions
Returns:
- list: Subscription list else empty
+ list(SubscriptionModel): Subscriptions list else empty
"""
return SubscriptionModel.query.all()
@@ -160,7 +197,7 @@ class Subscription:
subscription_name (str): The subscription name
Returns:
- list: List of network function names
+ list(str): List of network function names
"""
nf_sub_rel = NfSubRelationalModel.query.filter(
NfSubRelationalModel.subscription_name == subscription_name).all()
@@ -170,65 +207,41 @@ class Subscription:
return list_of_nfs
- def update_subscription_status(self):
- """ Updates the status of subscription in subscription table """
- try:
- SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == self.subscriptionName)\
- .update({SubscriptionModel.status: self.administrativeState},
- synchronize_session='evaluate')
-
- db.session.commit()
- except Exception as e:
- logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}',
- exc_info=True)
-
- def delete_subscription(self):
- """ Deletes a subscription and all its association from the database. A network function
- that is only associated with the subscription being removed will also be deleted."""
- try:
- subscription = SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
- if subscription:
- for nf_relationship in subscription.nfs:
- other_nf_relationship = NfSubRelationalModel.query.filter(
- NfSubRelationalModel.subscription_name != self.subscriptionName,
- NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none()
- if not other_nf_relationship:
- db.session.delete(nf_relationship.nf)
- db.session.delete(subscription)
- db.session.commit()
- except Exception as e:
- logger.error(f'Failed to delete subscription: {self.subscriptionName} '
- f'and it\'s relations from the DB: {e}', exc_info=True)
-
- def process_subscription(self, nfs, mr_pub, app_conf):
- action = 'Deactivate'
- sub_nf_state = SubNfState.PENDING_DELETE.value
- self.update_subscription_status()
-
- if self.administrativeState == AdministrativeState.UNLOCKED.value:
- action = 'Activate'
- sub_nf_state = SubNfState.PENDING_CREATE.value
- logger.info(f'{action} subscription initiated for {self.subscriptionName}.')
-
+ def activate_subscription(self, nfs, mr_pub, app_conf):
+ logger.info(f'Activate subscription initiated for {self.subscriptionName}.')
try:
+ sub_model = self.get()
for nf in nfs:
mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
- logger.debug(f'Publishing Event to {action} '
- f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
- if action == 'Activate':
- self.add_network_function_to_subscription(nf)
- self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name)
+ logger.info(f'Publishing event to activate '
+ f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+ self.add_network_function_to_subscription(nf, sub_model)
+ self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value,
+ nf.nf_name)
except Exception as err:
raise Exception(f'Error publishing activation event to MR: {err}')
+ def deactivate_subscription(self, mr_pub, app_conf):
+ nfs = self.get_network_functions()
+ try:
+ if nfs:
+ logger.info(f'Deactivate subscription initiated for {self.subscriptionName}.')
+ for nf in nfs:
+ mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
+ logger.debug(f'Publishing Event to deactivate '
+ f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+ self.update_sub_nf_status(self.subscriptionName,
+ SubNfState.PENDING_DELETE.value,
+ nf.nf_name)
+ except Exception as err:
+ raise Exception(f'Error publishing deactivation event to MR: {err}')
+
@staticmethod
def get_all_nfs_subscription_relations():
""" Retrieves all network function to subscription relations
Returns:
- list: NetworkFunctions per Subscription list else empty
+ list(NfSubRelationalModel): NetworkFunctions per Subscription list else empty
"""
nf_per_subscriptions = NfSubRelationalModel.query.all()
return nf_per_subscriptions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
index 74b6ac88..e74a1732 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
@@ -22,14 +22,12 @@ from mod.subscription import AdministrativeState
class SubscriptionHandler:
- def __init__(self, administrative_state, mr_pub, app, app_conf, aai_event_thread):
- self.current_nfs = None
- self.current_sub = None
- self.administrative_state = administrative_state
+ def __init__(self, mr_pub, app, app_conf, aai_event_thread, policy_event_thread):
self.mr_pub = mr_pub
self.app = app
self.app_conf = app_conf
self.aai_event_thread = aai_event_thread
+ self.policy_event_thread = policy_event_thread
def execute(self):
"""
@@ -37,25 +35,37 @@ class SubscriptionHandler:
the Subscription if a change has occurred
"""
self.app.app_context().push()
+ local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
new_administrative_state = self.app_conf.subscription.administrativeState
try:
- if self.administrative_state == new_administrative_state:
+ if local_admin_state == new_administrative_state:
logger.info('Administrative State did not change in the Config')
else:
- self.current_nfs = aai.get_pmsh_nfs_from_aai(self.app_conf)
- self.current_sub = self.app_conf.subscription
- logger.info(f'Administrative State has changed from {self.administrative_state} '
- f'to {new_administrative_state}.')
- self.administrative_state = new_administrative_state
- self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf)
-
if new_administrative_state == AdministrativeState.UNLOCKED.value:
- logger.info('Listening to AAI-EVENT topic in MR.')
- self.aai_event_thread.start()
+ self._activate(local_admin_state, new_administrative_state)
+ elif local_admin_state == AdministrativeState.PENDING.value:
+ logger.info('Administrative State is PENDING')
else:
- logger.info('Stop listening to AAI-EVENT topic in MR.')
- self.aai_event_thread.cancel()
-
+ self._deactivate(local_admin_state, new_administrative_state)
except Exception as err:
logger.error(f'Error occurred during the activation/deactivation process {err}',
exc_info=True)
+
+ def _activate(self, local_admin_state, new_administrative_state):
+ logger.info(f'Administrative State has changed from {local_admin_state} '
+ f'to {new_administrative_state}.')
+ existing_nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf)
+ self.app_conf.subscription.activate_subscription(existing_nfs_in_aai, self.mr_pub,
+ self.app_conf)
+ self.app_conf.subscription.update_subscription_status()
+ logger.info('Start listening for new NFs on AAI-EVENT topic in MR.')
+ self.aai_event_thread.start()
+ self.policy_event_thread.start()
+
+ def _deactivate(self, local_admin_state, new_administrative_state):
+ logger.info(f'Administrative State has changed from {local_admin_state} '
+ f'to {new_administrative_state}.')
+ self.aai_event_thread.cancel()
+ logger.info('Stop listening for NFs on AAI-EVENT topic in MR.')
+ self.app_conf.subscription.deactivate_subscription(self.mr_pub, self.app_conf)
+ self.app_conf.subscription.update_subscription_status()
diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
index b3c906d0..6b6b9baa 100755
--- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
+++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
@@ -23,7 +23,6 @@ from mod.aai_event_handler import process_aai_events
from mod.exit_handler import ExitHandler
from mod.pmsh_utils import AppConfig, PeriodicTask
from mod.policy_response_handler import PolicyResponseHandler
-from mod.subscription import Subscription, AdministrativeState
from mod.subscription_handler import SubscriptionHandler
@@ -40,27 +39,27 @@ def main():
except Exception as e:
logger.error(f'Failed to get config and create application: {e}', exc_info=True)
sys.exit(e)
- subscription_in_db = Subscription.get(app_conf.subscription.subscriptionName)
- administrative_state = subscription_in_db.status if subscription_in_db \
- else AdministrativeState.LOCKED.value
app_conf_thread = PeriodicTask(10, app_conf.refresh_config)
app_conf_thread.start()
- aai_event_thread = PeriodicTask(10, process_aai_events,
- args=(aai_event_mr_sub, policy_mr_pub, app, app_conf))
- subscription_handler = SubscriptionHandler(administrative_state,
- policy_mr_pub, app, app_conf, aai_event_thread)
+
policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app)
+ policy_response_handler_thread = PeriodicTask(25, policy_response_handler.poll_policy_topic)
+
+ aai_event_thread = PeriodicTask(20, process_aai_events,
+ args=(aai_event_mr_sub, policy_mr_pub, app, app_conf))
+
+ subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread,
+ policy_response_handler_thread)
subscription_handler_thread = PeriodicTask(30, subscription_handler.execute)
- policy_response_handler_thread = PeriodicTask(10, policy_response_handler.poll_policy_topic)
subscription_handler_thread.start()
- policy_response_handler_thread.start()
+
periodic_tasks = [app_conf_thread, aai_event_thread, subscription_handler_thread,
policy_response_handler_thread]
signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks,
- subscription_handler=subscription_handler))
+ app_conf=app_conf, subscription_handler=subscription_handler))
launch_api_server(app_conf)
except Exception as e:
diff --git a/components/pm-subscription-handler/tests/test_aai_event_handler.py b/components/pm-subscription-handler/tests/test_aai_event_handler.py
index d366dac5..9ac76477 100755
--- a/components/pm-subscription-handler/tests/test_aai_event_handler.py
+++ b/components/pm-subscription-handler/tests/test_aai_event_handler.py
@@ -16,42 +16,59 @@
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
import json
+import os
from os import path
+from test.support import EnvironmentVarGuard
from unittest import TestCase
from unittest.mock import patch, Mock
+from mod import create_app, db
from mod.aai_event_handler import process_aai_events
-from mod.network_function import NetworkFunction, OrchestrationStatus
+from mod.network_function import NetworkFunction
from mod.pmsh_utils import AppConfig
class AAIEventHandlerTest(TestCase):
+ @patch('mod.get_db_connection_url')
+ @patch('mod.update_logging_config')
@patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
- def setUp(self, mock_get_pmsh_config):
+ def setUp(self, mock_get_pmsh_config, mock_update_config, mock_get_db_url):
+ mock_get_db_url.return_value = 'sqlite://'
with open(path.join(path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
self.app_conf = AppConfig()
with open(path.join(path.dirname(__file__), 'data/mr_aai_events.json'), 'r') as data:
self.mr_aai_events = json.load(data)["mr_response"]
+ self.env = EnvironmentVarGuard()
+ self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
self.mock_mr_sub = Mock(get_from_topic=Mock(return_value=self.mr_aai_events))
self.mock_mr_pub = Mock()
self.mock_app = Mock()
+ self.app = create_app()
+ self.app_context = self.app.app_context()
+ self.app_context.push()
+ db.create_all()
- @patch('mod.subscription.Subscription.process_subscription')
+ def tearDown(self):
+ db.session.remove()
+ db.drop_all()
+ self.app_context.pop()
+
+ @patch('mod.subscription.Subscription.activate_subscription')
@patch('mod.aai_event_handler.NetworkFunction.delete')
@patch('mod.aai_event_handler.NetworkFunction.get')
def test_process_aai_update_and_delete_events(self, mock_nf_get, mock_nf_delete,
- mock_process_sub):
+ mock_activate_sub):
pnf_already_active = NetworkFunction(nf_name='pnf_already_active',
- orchestration_status=OrchestrationStatus.ACTIVE.value)
+ orchestration_status='Active')
mock_nf_get.side_effect = [None, pnf_already_active]
expected_nf_for_processing = NetworkFunction(
- nf_name='pnf_newly_discovered', orchestration_status=OrchestrationStatus.ACTIVE.value)
+ nf_name='pnf_newly_discovered', orchestration_status='Active')
process_aai_events(self.mock_mr_sub, self.mock_mr_pub, self.mock_app, self.app_conf)
- mock_process_sub.assert_called_once_with([expected_nf_for_processing],
- self.mock_mr_pub, self.app_conf)
+ mock_activate_sub.assert_called_once_with([expected_nf_for_processing],
+ self.mock_mr_pub, self.app_conf)
mock_nf_delete.assert_called_once_with(nf_name='pnf_to_be_deleted')
diff --git a/components/pm-subscription-handler/tests/test_controller.py b/components/pm-subscription-handler/tests/test_controller.py
index d324a07d..4fcecc37 100755
--- a/components/pm-subscription-handler/tests/test_controller.py
+++ b/components/pm-subscription-handler/tests/test_controller.py
@@ -47,27 +47,26 @@ class ControllerTestCase(unittest.TestCase):
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
- self.app_conf = AppConfig()
- self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
+ self.app_conf = AppConfig()
+ self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
def tearDown(self):
db.session.remove()
db.drop_all()
- self.app_context.pop()
def test_status_response_healthy(self):
self.assertEqual(status()['status'], 'healthy')
def test_get_all_sub_to_nf_relations(self):
- self.app_conf.subscription.create()
+ sub_model = self.app_conf.subscription.get()
for nf in [self.nf_1, self.nf_2]:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, sub_model)
all_subs = get_all_sub_to_nf_relations()
self.assertEqual(len(all_subs[0]['network_functions']), 2)
self.assertEqual(all_subs[0]['subscription_name'], 'ExtraPM-All-gNB-R2B')
diff --git a/components/pm-subscription-handler/tests/test_exit_handler.py b/components/pm-subscription-handler/tests/test_exit_handler.py
index ac1e15c6..d41bd03b 100755
--- a/components/pm-subscription-handler/tests/test_exit_handler.py
+++ b/components/pm-subscription-handler/tests/test_exit_handler.py
@@ -15,56 +15,43 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
+import json
import os
-import signal
-import threading
-import time
+from signal import SIGTERM, signal
+from test.support import EnvironmentVarGuard
from unittest import TestCase
-from unittest.mock import patch, Mock, MagicMock
+from unittest.mock import patch, Mock
-import pmsh_service_main
+from mod.api.db_models import NetworkFunctionModel
from mod.exit_handler import ExitHandler
-from mod.pmsh_utils import PeriodicTask
-from mod.subscription import AdministrativeState
+from mod.pmsh_utils import AppConfig
+from mod.subscription import Subscription
class ExitHandlerTests(TestCase):
-
- @patch('pmsh_service_main.create_app')
- @patch('pmsh_service_main.db')
- @patch('pmsh_service_main.AppConfig')
- @patch('pmsh_service_main.Subscription')
- @patch('pmsh_service_main.launch_api_server')
- @patch('pmsh_service_main.SubscriptionHandler')
- @patch.object(PeriodicTask, 'start')
- @patch.object(PeriodicTask, 'cancel')
- def test_terminate_signal_success(self, mock_task_cancel, mock_task_start, mock_sub_handler,
- mock_launch_api_server, mock_sub, mock_app_conf,
- mock_db, mock_app):
- pid = os.getpid()
- mock_db.get_app.return_value = Mock()
-
- mock_sub.administrativeState = AdministrativeState.UNLOCKED.value
- mock_sub.process_subscription = Mock()
- mock_sub_handler_instance = MagicMock(execute=Mock(), current_sub=mock_sub)
- mock_sub_handler.side_effect = [mock_sub_handler_instance]
-
- def mock_api_server_run(param):
- while mock_sub.administrativeState == AdministrativeState.UNLOCKED.value:
- time.sleep(1)
-
- mock_launch_api_server.side_effect = mock_api_server_run
-
- def trigger_signal():
- time.sleep(1)
- os.kill(pid, signal.SIGTERM)
-
- thread = threading.Thread(target=trigger_signal)
- thread.start()
-
- pmsh_service_main.main()
-
- self.assertEqual(4, mock_task_cancel.call_count)
+ @patch('mod.subscription.Subscription.create')
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ @patch('mod.pmsh_utils.PeriodicTask')
+ def setUp(self, mock_periodic_task, mock_get_pmsh_config, mock_sub_create):
+ self.env = EnvironmentVarGuard()
+ self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
+ with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
+ self.cbs_data = json.load(data)
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.mock_aai_event_thread = mock_periodic_task
+ self.app_conf = AppConfig()
+ self.sub = self.app_conf.subscription
+
+ @patch('mod.logger.debug')
+ @patch.object(Subscription, 'update_sub_nf_status')
+ @patch.object(Subscription, 'update_subscription_status')
+ @patch.object(Subscription, '_get_nf_models',
+ return_value=[NetworkFunctionModel('pnf1', 'ACTIVE')])
+ def test_terminate_signal_successful(self, mock_sub_get_nf_models, mock_upd_sub_status,
+ mock_upd_subnf_status, mock_logger):
+ handler = ExitHandler(periodic_tasks=[self.mock_aai_event_thread],
+ app_conf=self.app_conf,
+ subscription_handler=Mock())
+ signal(SIGTERM, handler)
+ os.kill(os.getpid(), SIGTERM)
self.assertTrue(ExitHandler.shutdown_signal_received)
- self.assertEqual(1, mock_sub.process_subscription.call_count)
- self.assertEqual(mock_sub.administrativeState, AdministrativeState.LOCKED.value)
diff --git a/components/pm-subscription-handler/tests/test_network_function.py b/components/pm-subscription-handler/tests/test_network_function.py
index 86baef83..cdfb1eb5 100755
--- a/components/pm-subscription-handler/tests/test_network_function.py
+++ b/components/pm-subscription-handler/tests/test_network_function.py
@@ -15,6 +15,7 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
+import json
import os
from test.support import EnvironmentVarGuard
from unittest import TestCase
@@ -22,23 +23,29 @@ from unittest.mock import patch
from mod import db, create_app
from mod.network_function import NetworkFunction
+from mod.pmsh_utils import AppConfig
from mod.subscription import Subscription
class NetworkFunctionTests(TestCase):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
@patch('mod.update_logging_config')
@patch('mod.get_db_connection_url')
- def setUp(self, mock_get_db_url, mock_update_config):
+ def setUp(self, mock_get_db_url, mock_update_config, mock_get_pmsh_config):
mock_get_db_url.return_value = 'sqlite://'
self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
+ with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
+ self.cbs_data = json.load(data)
+ mock_get_pmsh_config.return_value = self.cbs_data
self.env = EnvironmentVarGuard()
self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
+ self.app_conf = AppConfig()
def tearDown(self):
db.session.remove()
@@ -74,7 +81,7 @@ class NetworkFunctionTests(TestCase):
self.nf_2.create()
sub = Subscription(**{"subscriptionName": "sub"})
for nf in [self.nf_1, self.nf_2]:
- sub.add_network_function_to_subscription(nf)
+ sub.add_network_function_to_subscription(nf, self.app_conf.subscription.get())
NetworkFunction.delete(nf_name=self.nf_1.nf_name)
diff --git a/components/pm-subscription-handler/tests/test_subscription.py b/components/pm-subscription-handler/tests/test_subscription.py
index 74593a42..27a189c2 100755
--- a/components/pm-subscription-handler/tests/test_subscription.py
+++ b/components/pm-subscription-handler/tests/test_subscription.py
@@ -26,7 +26,7 @@ from requests import Session
import mod.aai_client as aai_client
from mod import db, create_app
from mod.api.db_models import NetworkFunctionModel
-from mod.network_function import NetworkFunction, OrchestrationStatus
+from mod.network_function import NetworkFunction
from mod.pmsh_utils import AppConfig
from mod.subscription import Subscription
@@ -51,27 +51,28 @@ class SubscriptionTest(TestCase):
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
- self.app_conf = AppConfig()
- self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
self.mock_mr_sub = mock_mr_sub
self.mock_mr_pub = mock_mr_pub
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
+ self.app_conf = AppConfig()
+ self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
+ self.sub_model = self.app_conf.subscription.get()
def tearDown(self):
- db.session.remove()
db.drop_all()
+ db.session.remove()
self.app_context.pop()
def test_xnf_filter_true(self):
self.assertTrue(self.app_conf.nf_filter.is_nf_in_filter('pnf1',
- OrchestrationStatus.ACTIVE.value))
+ 'Active'))
def test_xnf_filter_false(self):
self.assertFalse(self.app_conf.nf_filter.is_nf_in_filter('PNF-33',
- OrchestrationStatus.ACTIVE.value))
+ 'Active'))
def test_sub_measurement_group(self):
self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2)
@@ -82,18 +83,15 @@ class SubscriptionTest(TestCase):
def test_get_subscription(self):
sub_name = 'ExtraPM-All-gNB-R2B'
self.app_conf.subscription.create()
- new_sub = Subscription.get(sub_name)
+ new_sub = self.app_conf.subscription.get()
self.assertEqual(sub_name, new_sub.subscription_name)
- def test_get_subscription_no_match(self):
- sub_name = 'sub2_does_not_exist'
- sub = Subscription.get(sub_name)
- self.assertEqual(sub, None)
-
def test_get_nf_names_per_sub(self):
self.app_conf.subscription.create()
- self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
- self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1])
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
+ self.sub_model)
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1],
+ self.sub_model)
nfs = Subscription.get_nf_names_per_sub(self.app_conf.subscription.subscriptionName)
self.assertEqual(2, len(nfs))
@@ -105,37 +103,38 @@ class SubscriptionTest(TestCase):
def test_add_network_functions_per_subscription(self):
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(3, len(nfs_for_sub_1))
new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Active')
- self.app_conf.subscription.add_network_function_to_subscription(new_nf)
+ self.app_conf.subscription.add_network_function_to_subscription(new_nf, self.sub_model)
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(4, len(nf_subs))
def test_add_duplicate_network_functions_per_subscription(self):
- self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
+ self.sub_model)
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
- self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
+ self.sub_model)
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
def test_update_subscription_status(self):
- sub_name = 'ExtraPM-All-gNB-R2B'
self.app_conf.subscription.create()
self.app_conf.subscription.administrativeState = 'new_status'
self.app_conf.subscription.update_subscription_status()
- sub = Subscription.get(sub_name)
+ sub = self.app_conf.subscription.get()
self.assertEqual('new_status', sub.status)
def test_delete_subscription(self):
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
self.app_conf.subscription.delete_subscription()
self.assertEqual(0, len(Subscription.get_all()))
- self.assertEqual(None, Subscription.get(self.app_conf.subscription.subscriptionName))
+ self.assertEqual(None, self.app_conf.subscription.get())
self.assertEqual(0, len(Subscription.get_all_nfs_subscription_relations()))
self.assertEqual(0, len(NetworkFunction.get_all()))
self.assertEqual(None, NetworkFunction.get(nf_name=list(self.xnfs)[0].nf_name))
@@ -143,7 +142,7 @@ class SubscriptionTest(TestCase):
def test_update_sub_nf_status(self):
sub_name = 'ExtraPM-All-gNB-R2B'
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
sub_nfs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
@@ -154,33 +153,27 @@ class SubscriptionTest(TestCase):
@patch('mod.subscription.Subscription.add_network_function_to_subscription')
@patch('mod.subscription.Subscription.update_sub_nf_status')
- @patch('mod.subscription.Subscription.update_subscription_status')
- def test_process_activate_subscription(self, mock_update_sub_status,
- mock_update_sub_nf, mock_add_nfs):
- self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
- self.app_conf)
+ def test_process_activate_subscription(self, mock_update_sub_nf, mock_add_nfs):
+ self.app_conf.subscription.activate_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
+ self.app_conf)
- mock_update_sub_status.assert_called()
mock_add_nfs.assert_called()
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
'PENDING_CREATE', list(self.xnfs)[0].nf_name)
+ @patch('mod.subscription.Subscription.get_network_functions')
@patch('mod.subscription.Subscription.update_sub_nf_status')
- @patch('mod.subscription.Subscription.update_subscription_status')
- def test_process_deactivate_subscription(self, mock_update_sub_status,
- mock_update_sub_nf):
+ def test_process_deactivate_subscription(self, mock_update_sub_nf, mock_get_nfs):
self.app_conf.subscription.administrativeState = 'LOCKED'
- self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
- self.app_conf)
-
+ mock_get_nfs.return_value = [list(self.xnfs)[0]]
+ self.app_conf.subscription.deactivate_subscription(self.mock_mr_pub, self.app_conf)
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
'PENDING_DELETE', list(self.xnfs)[0].nf_name)
- mock_update_sub_status.assert_called()
- def test_process_subscription_exception(self):
- self.assertRaises(Exception, self.app_conf.subscription.process_subscription,
+ def test_activate_subscription_exception(self):
+ self.assertRaises(Exception, self.app_conf.subscription.activate_subscription,
[list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
def test_prepare_subscription_event(self):
@@ -193,7 +186,7 @@ class SubscriptionTest(TestCase):
def test_get_nf_models(self):
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
nf_models = self.app_conf.subscription._get_nf_models()
self.assertEqual(3, len(nf_models))
@@ -201,7 +194,7 @@ class SubscriptionTest(TestCase):
def test_get_network_functions(self):
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
nfs = self.app_conf.subscription.get_network_functions()
self.assertEqual(3, len(nfs))
diff --git a/components/pm-subscription-handler/tests/test_subscription_handler.py b/components/pm-subscription-handler/tests/test_subscription_handler.py
index 65a7c2c0..a6611666 100644
--- a/components/pm-subscription-handler/tests/test_subscription_handler.py
+++ b/components/pm-subscription-handler/tests/test_subscription_handler.py
@@ -17,9 +17,11 @@
# ============LICENSE_END=====================================================
import json
import os
+from test.support import EnvironmentVarGuard
from unittest import TestCase
from unittest.mock import patch
+from mod import create_app, db
from mod.network_function import NetworkFunction
from mod.pmsh_utils import AppConfig
from mod.subscription import AdministrativeState
@@ -28,72 +30,95 @@ from mod.subscription_handler import SubscriptionHandler
class SubscriptionHandlerTest(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.env = EnvironmentVarGuard()
+ cls.env.set('AAI_SERVICE_PORT', '8443')
+ cls.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
+ cls.nfs = [NetworkFunction(nf_name='pnf_1'), NetworkFunction(nf_name='pnf_2')]
+
+ @patch('mod.get_db_connection_url')
+ @patch('mod.update_logging_config')
@patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
- @patch('mod.create_app')
@patch('mod.pmsh_utils._MrPub')
@patch('mod.pmsh_utils.PeriodicTask')
- def setUp(self, mock_aai_event_thread, mock_mr_pub, mock_app, mock_get_pmsh_config):
+ def setUp(self, mock_periodic_task, mock_mr_pub, mock_get_pmsh_config, mock_update_config,
+ mock_get_db_url):
+ mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
- self.app_conf = AppConfig()
- self.mock_app = mock_app
self.mock_mr_pub = mock_mr_pub
- self.mock_aai_event_thread = mock_aai_event_thread
- self.nf_1 = NetworkFunction(nf_name='pnf_1')
- self.nf_2 = NetworkFunction(nf_name='pnf_2')
- self.nfs = [self.nf_1, self.nf_2]
+ self.mock_aai_event_thread = mock_periodic_task
+ self.mock_policy_event_thread = mock_periodic_task
+ self.app = create_app()
+ self.app.app_context().push()
+ db.create_all()
+ self.app_conf = AppConfig()
def tearDown(self):
- pass
+ db.drop_all()
+ db.session.remove()
+ @patch('mod.subscription.Subscription.get_local_sub_admin_state')
@patch('mod.logger.info')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_no_change_of_state(self, mock_get_aai, mock_logger):
+ def test_execute_no_change_of_state(self, mock_get_aai, mock_logger, mock_get_sub_status):
+ mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
mock_get_aai.return_value = self.nfs
- sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread)
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread,
+ self.mock_policy_event_thread)
sub_handler.execute()
mock_logger.assert_called_with('Administrative State did not change in the Config')
- @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+ @patch('mod.subscription.Subscription.activate_subscription')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_change_of_state_unlocked(self, mock_get_aai, mock_process_sub):
+ def test_execute_change_of_state_to_unlocked(self, mock_get_aai, mock_activate_sub,
+ mock_get_sub_status):
mock_get_aai.return_value = self.nfs
+ mock_get_sub_status.return_value = AdministrativeState.LOCKED.value
self.mock_aai_event_thread.return_value.start.return_value = 'start_method'
- sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread.return_value)
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread.return_value,
+ self.mock_policy_event_thread)
sub_handler.execute()
- self.assertEqual(AdministrativeState.UNLOCKED.value, sub_handler.administrative_state)
- mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
+ self.assertEqual(AdministrativeState.UNLOCKED.value,
+ self.app_conf.subscription.administrativeState)
+ mock_activate_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
self.mock_aai_event_thread.return_value.start.assert_called()
- @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+ @patch('mod.subscription.Subscription.deactivate_subscription')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_change_of_state_locked(self, mock_get_aai, mock_process_sub):
+ def test_execute_change_of_state_to_locked(self, mock_get_aai, mock_deactivate_sub,
+ mock_get_sub_status):
+ mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
+ self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
+ self.app_conf.subscription.update_subscription_status()
mock_get_aai.return_value = self.nfs
self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method'
- self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
- sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread.return_value)
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread.return_value,
+ self.mock_policy_event_thread)
sub_handler.execute()
- self.assertEqual(AdministrativeState.LOCKED.value, sub_handler.administrative_state)
- mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
+ mock_deactivate_sub.assert_called_with(self.mock_mr_pub, self.app_conf)
self.mock_aai_event_thread.return_value.cancel.assert_called()
- self.app_conf.subscription.administrativeState = AdministrativeState.UNLOCKED.value
- @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.subscription.Subscription.activate_subscription')
@patch('mod.logger.error')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_exception(self, mock_get_aai, mock_logger, mock_process_sub):
+ def test_execute_exception(self, mock_get_aai, mock_logger, mock_activate_sub):
mock_get_aai.return_value = self.nfs
- mock_process_sub.side_effect = Exception
- sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread)
+ mock_activate_sub.side_effect = Exception
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread,
+ self.mock_policy_event_thread)
sub_handler.execute()
mock_logger.assert_called_with('Error occurred during the activation/deactivation process ',
exc_info=True)