From 38ccb471732faaad6a25defee0753c1c5ac60cf0 Mon Sep 17 00:00:00 2001 From: efiacor Date: Wed, 5 Aug 2020 10:12:04 +0100 Subject: Refactor and bug fixes Signed-off-by: efiacor Change-Id: I8fe91bfdd2f1a2c8a6ca914e52d82dce04bffc0e Issue-ID: DCAEGEN2-2146 --- .../dpo/blueprints/k8s-pmsh.yaml | 184 ------------------ .../dpo/spec/pmsh-component-spec.json | 18 ++ components/pm-subscription-handler/log_config.yaml | 54 +++--- .../pmsh_service/mod/__init__.py | 2 + .../pmsh_service/mod/aai_client.py | 6 +- .../pmsh_service/mod/aai_event_handler.py | 3 +- .../pmsh_service/mod/exit_handler.py | 14 +- .../pmsh_service/mod/network_function.py | 9 +- .../pmsh_service/mod/pmsh_utils.py | 32 ++-- .../pmsh_service/mod/subscription.py | 205 +++++++++++---------- .../pmsh_service/mod/subscription_handler.py | 44 +++-- .../pmsh_service/pmsh_service_main.py | 21 +-- .../tests/test_aai_event_handler.py | 33 +++- .../tests/test_controller.py | 9 +- .../tests/test_exit_handler.py | 77 ++++---- .../tests/test_network_function.py | 11 +- .../tests/test_subscription.py | 73 ++++---- .../tests/test_subscription_handler.py | 95 ++++++---- 18 files changed, 387 insertions(+), 503 deletions(-) delete mode 100755 components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml diff --git a/components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml b/components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml deleted file mode 100755 index e3f7987c..00000000 --- a/components/pm-subscription-handler/dpo/blueprints/k8s-pmsh.yaml +++ /dev/null @@ -1,184 +0,0 @@ -# -# ============LICENSE_START======================================================= -# Copyright (C) 2020 Nordix Foundation. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END========================================================= -# - -tosca_definitions_version: cloudify_dsl_1_3 - -imports: - - 'http://www.getcloudify.org/spec/cloudify/4.5.5/types.yaml' - - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R6/k8splugin/1.7.2/k8splugin_types.yaml' - - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.ccsdk.platform.plugins/type_files/dmaap/dmaap.yaml' - - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.ccsdk.platform.plugins/type_files/pgaas/1.1.0/pgaas_types.yaml' - - 'https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R6/clamppolicyplugin/1.1.0/clamppolicyplugin_types.yaml' - -inputs: - tag_version: - type: string - description: Docker image to be used - default: 'nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.pmsh:latest' - replicas: - type: integer - description: Number of instances - default: 1 - policy_model_id: - type: 'string' - default: 'onap.policies.monitoring.dcae-pm-subscription-handler' - policy_id: - type: 'string' - default: 'onap.policies.monitoring.dcae-pm-subscription-handler' - operational_policy_name: - type: string - default: 'pmsh-operational-policy' - control_loop_name: - type: string - pmsh_publish_topic_name: - type: string - default: 'unauthenticated.DCAE_CL_OUTPUT' - policy_feedback_topic_name: - type: string - default: 'PMSH_CL_INPUT' - aai_notification_topic_name: - type: string - default: 'AAI-EVENT' - publisher_client_role: - type: string - description: Client role to request secure access to topic - default: 'org.onap.dcae.pmPublisher' - subscriber_client_role: - type: string - description: Client role to request secure access to topic - default: 'org.onap.dcae.pmSubscriber' - client_id: - type: string - description: Client id for given AAF client - default: 'dcae@dcae.onap.org' - client_password: - type: string - description: Password for AAF client provided as client_id - dcae_location: - type: string - description: DCAE location for the subscriber, used to set up routing - default: 'san-francisco' - cpu_limit: - type: string - default: '1000m' - cpu_request: - type: string - default: '1000m' - memory_limit: - type: string - default: '1024Mi' - memory_request: - type: string - default: '1024Mi' - pgaas_cluster_name: - type: string - default: 'dcae-pg-primary.onap' -node_templates: - pgaasvm: - type: dcae.nodes.pgaas.database - properties: - writerfqdn: { get_input: pgaas_cluster_name } - name: 'pmsh' - pm_subscribe_topic: - type: ccsdk.nodes.Topic - properties: - topic_name: { get_input: policy_feedback_topic_name } - pmsh: - type: dcae.nodes.ContainerizedServiceComponentUsingDmaap - interfaces: - cloudify.interfaces.lifecycle: - create: - inputs: - ports: - - '8443:0' - envs: - PMSH_PG_URL: - { get_attribute: [ pgaasvm, admin, host ] } - PMSH_PG_PASSWORD: - { get_attribute: [ pgaasvm, admin, password ] } - PMSH_PG_USERNAME: - { get_attribute: [ pgaasvm, admin, user ] } - PMSH_DB_NAME: - { get_attribute: [ pgaasvm, admin, database ] } - - relationships: - - type: ccsdk.relationships.subscribe_to_events - target: pm_subscribe_topic - - type: cloudify.relationships.depends_on - target: pgaasvm - - type: cloudify.relationships.depends_on - target: pmsh-policy - - properties: - service_component_type: 'dcae-pmsh' - service_component_name_override: 'dcae-pmsh' - application_config: - aaf_identity: { get_input: client_id } - aaf_password: { get_input: client_password } - operational_policy_name: { get_input: operational_policy_name } - control_loop_name: { get_input: control_loop_name } - cert_path: '/opt/app/pmsh/etc/certs/cert.pem' - key_path: '/opt/app/pmsh/etc/certs/key.pem' - ca_cert_path: '/opt/app/pmsh/etc/certs/cacert.pem' - streams_publishes: - policy_pm_publisher: - type: message_router - dmaap_info: - topic_url: {concat: ["https://message-router:3905/events/", { get_input: pmsh_publish_topic_name }]} - streams_subscribes: - policy_pm_subscriber: - type: message_router - dmaap_info: <> - aai_subscriber: - type: message_router - dmaap_info: - topic_url: {concat: ["https://message-router:3905/events/", { get_input: aai_notification_topic_name }]} - resource_config: - limits: - cpu: { get_input: cpu_limit } - memory: { get_input: memory_limit } - requests: - cpu: { get_input: cpu_request } - memory: { get_input: memory_request } - docker_config: - healthcheck: - endpoint: /healthcheck - interval: 15s - timeout: 1s - type: https - streams_subscribes: - - name: pm_subscribe_topic - location: { get_input: dcae_location } - client_role: { get_input: subscriber_client_role } - type: message-router - image: { get_input: tag_version } - replicas: { get_input: replicas } - log_info: - log_directory: '/var/log/ONAP/dcaegen2/services/pmsh' - tls_info: - cert_directory: '/opt/app/pmsh/etc/certs' - use_tls: true - pmsh-policy: - type: clamp.nodes.policy - properties: - policy_model_id: - get_input: policy_model_id - policy_id: - get_input: policy_id \ No newline at end of file diff --git a/components/pm-subscription-handler/dpo/spec/pmsh-component-spec.json b/components/pm-subscription-handler/dpo/spec/pmsh-component-spec.json index 43028857..69513bce 100755 --- a/components/pm-subscription-handler/dpo/spec/pmsh-component-spec.json +++ b/components/pm-subscription-handler/dpo/spec/pmsh-component-spec.json @@ -99,6 +99,24 @@ "policy_editable": false, "designer_editable": false }, + { + "name": "enable_tls", + "value": true, + "description": "Boolean to (en|dis)able TLS", + "sourced_at_deployment": false, + "policy_editable": false, + "designer_editable": true, + "type": "boolean" + }, + { + "name": "protocol", + "value": "https", + "type": "string", + "description": "Protocol PMSH api will use. If enable_tls is disabled, set protocol to http", + "sourced_at_deployment": false, + "policy_editable": false, + "designer_editable": true + }, { "name": "policy_model_id", "value": "onap.policies.monitoring.dcae-pm-initiation-handler", diff --git a/components/pm-subscription-handler/log_config.yaml b/components/pm-subscription-handler/log_config.yaml index 971c1882..b2d8f43c 100755 --- a/components/pm-subscription-handler/log_config.yaml +++ b/components/pm-subscription-handler/log_config.yaml @@ -1,27 +1,27 @@ -version: 1 - -disable_existing_loggers: true - -loggers: - onap_logger: - level: INFO - handlers: [onap_log_handler, stdout_handler] - propagate: false -handlers: - onap_log_handler: - class: logging.handlers.RotatingFileHandler - filename: /var/log/ONAP/dcaegen2/services/pmsh/application.log - mode: a - maxBytes: 10000000 - backupCount: 10 - formatter: mdcFormatter - stdout_handler: - class: logging.StreamHandler - formatter: mdcFormatter -formatters: - mdcFormatter: - format: '%(asctime)s | %(threadName)s | %(thread)d | %(levelname)s | %(module)s - | %(funcName)s | %(mdc)s | %(message)s' - mdcfmt: '{ServiceName} | {RequestID} | {InvocationID}' - datefmt: '%Y-%m-%dT%H:%M:%S%z' - (): onaplogging.mdcformatter.MDCFormatter +version: 1 + +disable_existing_loggers: true + +loggers: + onap_logger: + level: INFO + handlers: [onap_log_handler, stdout_handler] + propagate: false +handlers: + onap_log_handler: + class: logging.handlers.RotatingFileHandler + filename: /var/log/ONAP/dcaegen2/services/pmsh/application.log + mode: a + maxBytes: 10000000 + backupCount: 10 + formatter: mdcFormatter + stdout_handler: + class: logging.StreamHandler + formatter: mdcFormatter +formatters: + mdcFormatter: + format: '%(asctime)s | %(threadName)s | %(thread)d | %(levelname)s | %(module)s + | %(funcName)s | %(mdc)s | %(message)s' + mdcfmt: '{ServiceName} | {RequestID} | {InvocationID}' + datefmt: '%Y-%m-%dT%H:%M:%S%z' + (): onaplogging.mdcformatter.MDCFormatter diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py index 4c86ccda..58cd8b3c 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py +++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py @@ -43,9 +43,11 @@ def launch_api_server(app_config): connex_app = _get_app() connex_app.add_api('api/pmsh_swagger.yml') if app_config.enable_tls: + logger.info('Launching secure http API server') connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), ssl_context=app_config.cert_params) else: + logger.info('Launching unsecure http API server') connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443')) diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index bd052741..5a3c543a 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -114,11 +114,11 @@ def _filter_nf_data(nf_data, nf_filter): Returns a list of filtered NetworkFunctions using the nf_filter. Args: - nf_data : the nf json data from AAI. - nf_filter: the `NetworkFunctionFilter ` to be applied. + nf_data(dict): the nf json data from AAI. + nf_filter(NetworkFunctionFilter): the NetworkFunctionFilter to be applied. Returns: - set: a set of filtered NetworkFunctions. + set(NetworkFunctions): a set of filtered NetworkFunctions. Raises: KeyError: if AAI data cannot be parsed. diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index f1e8cf27..60b69602 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -72,8 +72,7 @@ def _process_event(action, new_status, xnf_name, mr_pub, app_conf): local_xnf = NetworkFunction.get(xnf_name) if local_xnf is None: - logger.info(f'Activating subscription for network function {xnf_name}') - app_conf.subscription.process_subscription([NetworkFunction( + app_conf.subscription.activate_subscription([NetworkFunction( nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf) else: logger.debug(f"Update Event for network function {xnf_name} will not be processed " diff --git a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py index 3d02375d..12932966 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py @@ -25,28 +25,28 @@ class ExitHandler: Args: periodic_tasks (List[PeriodicTask]): PeriodicTasks that needs to be cancelled. + app_conf (AppConfig): The PMSH Application Configuration. subscription_handler (SubscriptionHandler): The subscription handler instance. """ shutdown_signal_received = False - def __init__(self, *, periodic_tasks, subscription_handler): + def __init__(self, *, periodic_tasks, app_conf, subscription_handler): self.periodic_tasks = periodic_tasks + self.app_conf = app_conf self.subscription_handler = subscription_handler def __call__(self, sig_num, frame): logger.info('Graceful shutdown of PMSH initiated.') logger.debug(f'ExitHandler was called with signal number: {sig_num}.') - current_sub = self.subscription_handler.current_sub - if current_sub and current_sub.administrativeState == AdministrativeState.UNLOCKED.value: + current_sub = self.app_conf.subscription + if current_sub.administrativeState == AdministrativeState.UNLOCKED.value: try: + current_sub.deactivate_subscription(self.subscription_handler.mr_pub, self.app_conf) + current_sub.update_subscription_status() for thread in self.periodic_tasks: logger.debug(f'Cancelling periodic task with thread name: {thread.name}.') thread.cancel() - current_sub.administrativeState = AdministrativeState.LOCKED.value - current_sub.process_subscription(current_sub.get_network_functions(), - self.subscription_handler.mr_pub, - self.subscription_handler.app_conf) except Exception as e: logger.error(f'Failed to shut down PMSH application: {e}', exc_info=True) ExitHandler.shutdown_signal_received = True diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py index 979cc775..191e9512 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py +++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py @@ -17,7 +17,6 @@ # ============LICENSE_END===================================================== import re -from enum import Enum from mod import logger, db from mod.api.db_models import NetworkFunctionModel @@ -55,7 +54,6 @@ class NetworkFunction: db.session.commit() logger.info(f'Network Function {new_nf.nf_name} successfully created.') return new_nf - else: logger.debug(f'Network function {existing_nf.nf_name} already exists,' f' returning this network function..') @@ -109,9 +107,4 @@ class NetworkFunctionFilter: bool: True if matched, else False. """ return self.regex_matcher.search(nf_name) and \ - orchestration_status == OrchestrationStatus.ACTIVE.value - - -class OrchestrationStatus(Enum): - ACTIVE = 'Active' - INVENTORIED = 'Inventoried' + orchestration_status == 'Active' diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 50eb122b..872843d7 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -15,7 +15,6 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== -import threading import uuid from os import getenv from threading import Timer @@ -45,20 +44,22 @@ def mdc_handler(function): return decorator -class ThreadSafeSingleton(type): - _instances = {} - _singleton_lock = threading.Lock() +class MySingleton(object): + instances = {} - def __call__(cls, *args, **kwargs): - # double-checked locking pattern (https://en.wikipedia.org/wiki/Double-checked_locking) - if cls not in cls._instances: - with cls._singleton_lock: - if cls not in cls._instances: - cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs) - return cls._instances[cls] + def __new__(cls, clz=None): + if clz is None: + if cls.__name__ not in MySingleton.instances: + MySingleton.instances[cls.__name__] = \ + object.__new__(cls) + return MySingleton.instances[cls.__name__] + MySingleton.instances[clz.__name__] = clz() + MySingleton.first = clz + return type(clz.__name__, (MySingleton,), dict(clz.__dict__)) -class AppConfig(metaclass=ThreadSafeSingleton): +class AppConfig: + INSTANCE = None def __init__(self): try: @@ -78,6 +79,11 @@ class AppConfig(metaclass=ThreadSafeSingleton): self.subscription = Subscription(**conf['policy']['subscription']) self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter) + def __new__(cls, *args, **kwargs): + if AppConfig.INSTANCE is None: + AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs) + return AppConfig.INSTANCE + @mdc_handler @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) def _get_pmsh_config(self, **kwargs): @@ -272,7 +278,7 @@ class _MrSub(_DmaapMrClient): if response.ok: return response.json() except Exception as e: - logger.error(f'Failed to fetch message from MR: {e}') + logger.error(f'Failed to fetch message from MR: {e}', exc_info=True) raise diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index dbcd7a5e..97bfc401 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -33,6 +33,7 @@ class SubNfState(Enum): class AdministrativeState(Enum): UNLOCKED = 'UNLOCKED' LOCKED = 'LOCKED' + PENDING = 'PENDING' subscription_nf_states = { @@ -55,28 +56,7 @@ class Subscription: self.fileLocation = kwargs.get('fileLocation') self.nfFilter = kwargs.get('nfFilter') self.measurementGroups = kwargs.get('measurementGroups') - - def prepare_subscription_event(self, xnf_name, app_conf): - """Prepare the sub event for publishing - - Args: - xnf_name: the AAI xnf name. - app_conf (AppConfig): the application configuration. - - Returns: - dict: the Subscription event to be published. - """ - try: - clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} - sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name, - 'changeType': 'DELETE' - if self.administrativeState == AdministrativeState.LOCKED.value - else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name, - 'subscription': clean_sub} - return sub_event - except Exception as e: - logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True) - raise + self.create() def create(self): """ Creates a subscription database entry @@ -89,7 +69,7 @@ class Subscription: SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()) if existing_subscription is None: new_subscription = SubscriptionModel(subscription_name=self.subscriptionName, - status=self.administrativeState) + status=AdministrativeState.PENDING.value) db.session.add(new_subscription) db.session.commit() return new_subscription @@ -101,54 +81,111 @@ class Subscription: logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}', exc_info=True) - def add_network_function_to_subscription(self, nf): + def update_subscription_status(self): + """ Updates the status of subscription in subscription table """ + try: + SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName)\ + .update({SubscriptionModel.status: self.administrativeState}, + synchronize_session='evaluate') + + db.session.commit() + except Exception as e: + logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}', + exc_info=True) + + def delete_subscription(self): + """ Deletes a subscription and all its association from the database. A network function + that is only associated with the subscription being removed will also be deleted.""" + try: + subscription = SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() + if subscription: + for nf_relationship in subscription.nfs: + other_nf_relationship = NfSubRelationalModel.query.filter( + NfSubRelationalModel.subscription_name != self.subscriptionName, + NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none() + if not other_nf_relationship: + db.session.delete(nf_relationship.nf) + db.session.delete(subscription) + db.session.commit() + except Exception as e: + logger.error(f'Failed to delete subscription: {self.subscriptionName} ' + f'and it\'s relations from the DB: {e}', exc_info=True) + + def prepare_subscription_event(self, xnf_name, app_conf): + """Prepare the sub event for publishing + + Args: + xnf_name: the AAI xnf name. + app_conf (AppConfig): the application configuration. + + Returns: + dict: the Subscription event to be published. + """ + try: + clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} + sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name, + 'changeType': 'DELETE' + if self.administrativeState == AdministrativeState.LOCKED.value + else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name, + 'subscription': clean_sub} + return sub_event + except Exception as e: + logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True) + raise + + def add_network_function_to_subscription(self, nf, sub_model): """ Associates a network function to a Subscription Args: - nf : A NetworkFunction object. + sub_model(SubscriptionModel): The SubscriptionModel from the DB. + nf(NetworkFunction): A NetworkFunction object. """ - current_sub = self.create() try: current_nf = nf.create() - logger.debug(f'Adding network function {nf.nf_name} to Subscription ' - f'{current_sub.subscription_name}') existing_entry = NfSubRelationalModel.query.filter( - NfSubRelationalModel.subscription_name == current_sub.subscription_name, + NfSubRelationalModel.subscription_name == self.subscriptionName, NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none() if existing_entry is None: - new_nf_sub = NfSubRelationalModel(current_sub.subscription_name, + new_nf_sub = NfSubRelationalModel(self.subscriptionName, nf.nf_name, SubNfState.PENDING_CREATE.value) - new_nf_sub.nf = current_nf - current_sub.nfs.append(new_nf_sub) - logger.debug(f'Network function {current_nf.nf_name} added to Subscription ' - f'{current_sub.subscription_name}') - db.session.add(current_sub) + sub_model.nfs.append(new_nf_sub) + db.session.add(sub_model) db.session.commit() + logger.info(f'Network function {current_nf.nf_name} added to Subscription ' + f'{self.subscriptionName}') except Exception as e: logger.error(f'Failed to add nf {nf.nf_name} to subscription ' - f'{current_sub.subscription_name}: {e}', exc_info=True) - logger.debug(f'Subscription {current_sub.subscription_name} now contains these XNFs:' - f'{Subscription.get_nf_names_per_sub(current_sub.subscription_name)}') + f'{self.subscriptionName}: {e}', exc_info=True) + logger.debug(f'Subscription {self.subscriptionName} now contains these XNFs:' + f'{Subscription.get_nf_names_per_sub(self.subscriptionName)}') - @staticmethod - def get(subscription_name): - """ Retrieves a subscription - - Args: - subscription_name (str): The subscription name + def get(self): + """ Retrieves a SubscriptionModel object Returns: - Subscription object else None + SubscriptionModel object else None """ return SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == subscription_name).one_or_none() + SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() + + def get_local_sub_admin_state(self): + """ Retrieves the subscription admin state + + Returns: + str: The admin state of the SubscriptionModel + """ + sub_model = SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() + return sub_model.status @staticmethod def get_all(): """ Retrieves a list of subscriptions Returns: - list: Subscription list else empty + list(SubscriptionModel): Subscriptions list else empty """ return SubscriptionModel.query.all() @@ -160,7 +197,7 @@ class Subscription: subscription_name (str): The subscription name Returns: - list: List of network function names + list(str): List of network function names """ nf_sub_rel = NfSubRelationalModel.query.filter( NfSubRelationalModel.subscription_name == subscription_name).all() @@ -170,65 +207,41 @@ class Subscription: return list_of_nfs - def update_subscription_status(self): - """ Updates the status of subscription in subscription table """ - try: - SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName)\ - .update({SubscriptionModel.status: self.administrativeState}, - synchronize_session='evaluate') - - db.session.commit() - except Exception as e: - logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}', - exc_info=True) - - def delete_subscription(self): - """ Deletes a subscription and all its association from the database. A network function - that is only associated with the subscription being removed will also be deleted.""" - try: - subscription = SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() - if subscription: - for nf_relationship in subscription.nfs: - other_nf_relationship = NfSubRelationalModel.query.filter( - NfSubRelationalModel.subscription_name != self.subscriptionName, - NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none() - if not other_nf_relationship: - db.session.delete(nf_relationship.nf) - db.session.delete(subscription) - db.session.commit() - except Exception as e: - logger.error(f'Failed to delete subscription: {self.subscriptionName} ' - f'and it\'s relations from the DB: {e}', exc_info=True) - - def process_subscription(self, nfs, mr_pub, app_conf): - action = 'Deactivate' - sub_nf_state = SubNfState.PENDING_DELETE.value - self.update_subscription_status() - - if self.administrativeState == AdministrativeState.UNLOCKED.value: - action = 'Activate' - sub_nf_state = SubNfState.PENDING_CREATE.value - logger.info(f'{action} subscription initiated for {self.subscriptionName}.') - + def activate_subscription(self, nfs, mr_pub, app_conf): + logger.info(f'Activate subscription initiated for {self.subscriptionName}.') try: + sub_model = self.get() for nf in nfs: mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf) - logger.debug(f'Publishing Event to {action} ' - f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') - if action == 'Activate': - self.add_network_function_to_subscription(nf) - self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name) + logger.info(f'Publishing event to activate ' + f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') + self.add_network_function_to_subscription(nf, sub_model) + self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value, + nf.nf_name) except Exception as err: raise Exception(f'Error publishing activation event to MR: {err}') + def deactivate_subscription(self, mr_pub, app_conf): + nfs = self.get_network_functions() + try: + if nfs: + logger.info(f'Deactivate subscription initiated for {self.subscriptionName}.') + for nf in nfs: + mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf) + logger.debug(f'Publishing Event to deactivate ' + f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') + self.update_sub_nf_status(self.subscriptionName, + SubNfState.PENDING_DELETE.value, + nf.nf_name) + except Exception as err: + raise Exception(f'Error publishing deactivation event to MR: {err}') + @staticmethod def get_all_nfs_subscription_relations(): """ Retrieves all network function to subscription relations Returns: - list: NetworkFunctions per Subscription list else empty + list(NfSubRelationalModel): NetworkFunctions per Subscription list else empty """ nf_per_subscriptions = NfSubRelationalModel.query.all() return nf_per_subscriptions diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index 74b6ac88..e74a1732 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -22,14 +22,12 @@ from mod.subscription import AdministrativeState class SubscriptionHandler: - def __init__(self, administrative_state, mr_pub, app, app_conf, aai_event_thread): - self.current_nfs = None - self.current_sub = None - self.administrative_state = administrative_state + def __init__(self, mr_pub, app, app_conf, aai_event_thread, policy_event_thread): self.mr_pub = mr_pub self.app = app self.app_conf = app_conf self.aai_event_thread = aai_event_thread + self.policy_event_thread = policy_event_thread def execute(self): """ @@ -37,25 +35,37 @@ class SubscriptionHandler: the Subscription if a change has occurred """ self.app.app_context().push() + local_admin_state = self.app_conf.subscription.get_local_sub_admin_state() new_administrative_state = self.app_conf.subscription.administrativeState try: - if self.administrative_state == new_administrative_state: + if local_admin_state == new_administrative_state: logger.info('Administrative State did not change in the Config') else: - self.current_nfs = aai.get_pmsh_nfs_from_aai(self.app_conf) - self.current_sub = self.app_conf.subscription - logger.info(f'Administrative State has changed from {self.administrative_state} ' - f'to {new_administrative_state}.') - self.administrative_state = new_administrative_state - self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf) - if new_administrative_state == AdministrativeState.UNLOCKED.value: - logger.info('Listening to AAI-EVENT topic in MR.') - self.aai_event_thread.start() + self._activate(local_admin_state, new_administrative_state) + elif local_admin_state == AdministrativeState.PENDING.value: + logger.info('Administrative State is PENDING') else: - logger.info('Stop listening to AAI-EVENT topic in MR.') - self.aai_event_thread.cancel() - + self._deactivate(local_admin_state, new_administrative_state) except Exception as err: logger.error(f'Error occurred during the activation/deactivation process {err}', exc_info=True) + + def _activate(self, local_admin_state, new_administrative_state): + logger.info(f'Administrative State has changed from {local_admin_state} ' + f'to {new_administrative_state}.') + existing_nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf) + self.app_conf.subscription.activate_subscription(existing_nfs_in_aai, self.mr_pub, + self.app_conf) + self.app_conf.subscription.update_subscription_status() + logger.info('Start listening for new NFs on AAI-EVENT topic in MR.') + self.aai_event_thread.start() + self.policy_event_thread.start() + + def _deactivate(self, local_admin_state, new_administrative_state): + logger.info(f'Administrative State has changed from {local_admin_state} ' + f'to {new_administrative_state}.') + self.aai_event_thread.cancel() + logger.info('Stop listening for NFs on AAI-EVENT topic in MR.') + self.app_conf.subscription.deactivate_subscription(self.mr_pub, self.app_conf) + self.app_conf.subscription.update_subscription_status() diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index b3c906d0..6b6b9baa 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -23,7 +23,6 @@ from mod.aai_event_handler import process_aai_events from mod.exit_handler import ExitHandler from mod.pmsh_utils import AppConfig, PeriodicTask from mod.policy_response_handler import PolicyResponseHandler -from mod.subscription import Subscription, AdministrativeState from mod.subscription_handler import SubscriptionHandler @@ -40,27 +39,27 @@ def main(): except Exception as e: logger.error(f'Failed to get config and create application: {e}', exc_info=True) sys.exit(e) - subscription_in_db = Subscription.get(app_conf.subscription.subscriptionName) - administrative_state = subscription_in_db.status if subscription_in_db \ - else AdministrativeState.LOCKED.value app_conf_thread = PeriodicTask(10, app_conf.refresh_config) app_conf_thread.start() - aai_event_thread = PeriodicTask(10, process_aai_events, - args=(aai_event_mr_sub, policy_mr_pub, app, app_conf)) - subscription_handler = SubscriptionHandler(administrative_state, - policy_mr_pub, app, app_conf, aai_event_thread) + policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app) + policy_response_handler_thread = PeriodicTask(25, policy_response_handler.poll_policy_topic) + + aai_event_thread = PeriodicTask(20, process_aai_events, + args=(aai_event_mr_sub, policy_mr_pub, app, app_conf)) + + subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread, + policy_response_handler_thread) subscription_handler_thread = PeriodicTask(30, subscription_handler.execute) - policy_response_handler_thread = PeriodicTask(10, policy_response_handler.poll_policy_topic) subscription_handler_thread.start() - policy_response_handler_thread.start() + periodic_tasks = [app_conf_thread, aai_event_thread, subscription_handler_thread, policy_response_handler_thread] signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks, - subscription_handler=subscription_handler)) + app_conf=app_conf, subscription_handler=subscription_handler)) launch_api_server(app_conf) except Exception as e: diff --git a/components/pm-subscription-handler/tests/test_aai_event_handler.py b/components/pm-subscription-handler/tests/test_aai_event_handler.py index d366dac5..9ac76477 100755 --- a/components/pm-subscription-handler/tests/test_aai_event_handler.py +++ b/components/pm-subscription-handler/tests/test_aai_event_handler.py @@ -16,42 +16,59 @@ # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== import json +import os from os import path +from test.support import EnvironmentVarGuard from unittest import TestCase from unittest.mock import patch, Mock +from mod import create_app, db from mod.aai_event_handler import process_aai_events -from mod.network_function import NetworkFunction, OrchestrationStatus +from mod.network_function import NetworkFunction from mod.pmsh_utils import AppConfig class AAIEventHandlerTest(TestCase): + @patch('mod.get_db_connection_url') + @patch('mod.update_logging_config') @patch('mod.pmsh_utils.AppConfig._get_pmsh_config') - def setUp(self, mock_get_pmsh_config): + def setUp(self, mock_get_pmsh_config, mock_update_config, mock_get_db_url): + mock_get_db_url.return_value = 'sqlite://' with open(path.join(path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data: self.cbs_data = json.load(data) mock_get_pmsh_config.return_value = self.cbs_data self.app_conf = AppConfig() with open(path.join(path.dirname(__file__), 'data/mr_aai_events.json'), 'r') as data: self.mr_aai_events = json.load(data)["mr_response"] + self.env = EnvironmentVarGuard() + self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml')) self.mock_mr_sub = Mock(get_from_topic=Mock(return_value=self.mr_aai_events)) self.mock_mr_pub = Mock() self.mock_app = Mock() + self.app = create_app() + self.app_context = self.app.app_context() + self.app_context.push() + db.create_all() - @patch('mod.subscription.Subscription.process_subscription') + def tearDown(self): + db.session.remove() + db.drop_all() + self.app_context.pop() + + @patch('mod.subscription.Subscription.activate_subscription') @patch('mod.aai_event_handler.NetworkFunction.delete') @patch('mod.aai_event_handler.NetworkFunction.get') def test_process_aai_update_and_delete_events(self, mock_nf_get, mock_nf_delete, - mock_process_sub): + mock_activate_sub): pnf_already_active = NetworkFunction(nf_name='pnf_already_active', - orchestration_status=OrchestrationStatus.ACTIVE.value) + orchestration_status='Active') mock_nf_get.side_effect = [None, pnf_already_active] expected_nf_for_processing = NetworkFunction( - nf_name='pnf_newly_discovered', orchestration_status=OrchestrationStatus.ACTIVE.value) + nf_name='pnf_newly_discovered', orchestration_status='Active') process_aai_events(self.mock_mr_sub, self.mock_mr_pub, self.mock_app, self.app_conf) - mock_process_sub.assert_called_once_with([expected_nf_for_processing], - self.mock_mr_pub, self.app_conf) + mock_activate_sub.assert_called_once_with([expected_nf_for_processing], + self.mock_mr_pub, self.app_conf) mock_nf_delete.assert_called_once_with(nf_name='pnf_to_be_deleted') diff --git a/components/pm-subscription-handler/tests/test_controller.py b/components/pm-subscription-handler/tests/test_controller.py index d324a07d..4fcecc37 100755 --- a/components/pm-subscription-handler/tests/test_controller.py +++ b/components/pm-subscription-handler/tests/test_controller.py @@ -47,27 +47,26 @@ class ControllerTestCase(unittest.TestCase): with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data: self.cbs_data = json.load(data) mock_get_pmsh_config.return_value = self.cbs_data - self.app_conf = AppConfig() - self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf) self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried') self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active') self.app = create_app() self.app_context = self.app.app_context() self.app_context.push() db.create_all() + self.app_conf = AppConfig() + self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf) def tearDown(self): db.session.remove() db.drop_all() - self.app_context.pop() def test_status_response_healthy(self): self.assertEqual(status()['status'], 'healthy') def test_get_all_sub_to_nf_relations(self): - self.app_conf.subscription.create() + sub_model = self.app_conf.subscription.get() for nf in [self.nf_1, self.nf_2]: - self.app_conf.subscription.add_network_function_to_subscription(nf) + self.app_conf.subscription.add_network_function_to_subscription(nf, sub_model) all_subs = get_all_sub_to_nf_relations() self.assertEqual(len(all_subs[0]['network_functions']), 2) self.assertEqual(all_subs[0]['subscription_name'], 'ExtraPM-All-gNB-R2B') diff --git a/components/pm-subscription-handler/tests/test_exit_handler.py b/components/pm-subscription-handler/tests/test_exit_handler.py index ac1e15c6..d41bd03b 100755 --- a/components/pm-subscription-handler/tests/test_exit_handler.py +++ b/components/pm-subscription-handler/tests/test_exit_handler.py @@ -15,56 +15,43 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== +import json import os -import signal -import threading -import time +from signal import SIGTERM, signal +from test.support import EnvironmentVarGuard from unittest import TestCase -from unittest.mock import patch, Mock, MagicMock +from unittest.mock import patch, Mock -import pmsh_service_main +from mod.api.db_models import NetworkFunctionModel from mod.exit_handler import ExitHandler -from mod.pmsh_utils import PeriodicTask -from mod.subscription import AdministrativeState +from mod.pmsh_utils import AppConfig +from mod.subscription import Subscription class ExitHandlerTests(TestCase): - - @patch('pmsh_service_main.create_app') - @patch('pmsh_service_main.db') - @patch('pmsh_service_main.AppConfig') - @patch('pmsh_service_main.Subscription') - @patch('pmsh_service_main.launch_api_server') - @patch('pmsh_service_main.SubscriptionHandler') - @patch.object(PeriodicTask, 'start') - @patch.object(PeriodicTask, 'cancel') - def test_terminate_signal_success(self, mock_task_cancel, mock_task_start, mock_sub_handler, - mock_launch_api_server, mock_sub, mock_app_conf, - mock_db, mock_app): - pid = os.getpid() - mock_db.get_app.return_value = Mock() - - mock_sub.administrativeState = AdministrativeState.UNLOCKED.value - mock_sub.process_subscription = Mock() - mock_sub_handler_instance = MagicMock(execute=Mock(), current_sub=mock_sub) - mock_sub_handler.side_effect = [mock_sub_handler_instance] - - def mock_api_server_run(param): - while mock_sub.administrativeState == AdministrativeState.UNLOCKED.value: - time.sleep(1) - - mock_launch_api_server.side_effect = mock_api_server_run - - def trigger_signal(): - time.sleep(1) - os.kill(pid, signal.SIGTERM) - - thread = threading.Thread(target=trigger_signal) - thread.start() - - pmsh_service_main.main() - - self.assertEqual(4, mock_task_cancel.call_count) + @patch('mod.subscription.Subscription.create') + @patch('mod.pmsh_utils.AppConfig._get_pmsh_config') + @patch('mod.pmsh_utils.PeriodicTask') + def setUp(self, mock_periodic_task, mock_get_pmsh_config, mock_sub_create): + self.env = EnvironmentVarGuard() + self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml')) + with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data: + self.cbs_data = json.load(data) + mock_get_pmsh_config.return_value = self.cbs_data + self.mock_aai_event_thread = mock_periodic_task + self.app_conf = AppConfig() + self.sub = self.app_conf.subscription + + @patch('mod.logger.debug') + @patch.object(Subscription, 'update_sub_nf_status') + @patch.object(Subscription, 'update_subscription_status') + @patch.object(Subscription, '_get_nf_models', + return_value=[NetworkFunctionModel('pnf1', 'ACTIVE')]) + def test_terminate_signal_successful(self, mock_sub_get_nf_models, mock_upd_sub_status, + mock_upd_subnf_status, mock_logger): + handler = ExitHandler(periodic_tasks=[self.mock_aai_event_thread], + app_conf=self.app_conf, + subscription_handler=Mock()) + signal(SIGTERM, handler) + os.kill(os.getpid(), SIGTERM) self.assertTrue(ExitHandler.shutdown_signal_received) - self.assertEqual(1, mock_sub.process_subscription.call_count) - self.assertEqual(mock_sub.administrativeState, AdministrativeState.LOCKED.value) diff --git a/components/pm-subscription-handler/tests/test_network_function.py b/components/pm-subscription-handler/tests/test_network_function.py index 86baef83..cdfb1eb5 100755 --- a/components/pm-subscription-handler/tests/test_network_function.py +++ b/components/pm-subscription-handler/tests/test_network_function.py @@ -15,6 +15,7 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== +import json import os from test.support import EnvironmentVarGuard from unittest import TestCase @@ -22,23 +23,29 @@ from unittest.mock import patch from mod import db, create_app from mod.network_function import NetworkFunction +from mod.pmsh_utils import AppConfig from mod.subscription import Subscription class NetworkFunctionTests(TestCase): + @patch('mod.pmsh_utils.AppConfig._get_pmsh_config') @patch('mod.update_logging_config') @patch('mod.get_db_connection_url') - def setUp(self, mock_get_db_url, mock_update_config): + def setUp(self, mock_get_db_url, mock_update_config, mock_get_pmsh_config): mock_get_db_url.return_value = 'sqlite://' self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried') self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active') + with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data: + self.cbs_data = json.load(data) + mock_get_pmsh_config.return_value = self.cbs_data self.env = EnvironmentVarGuard() self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml')) self.app = create_app() self.app_context = self.app.app_context() self.app_context.push() db.create_all() + self.app_conf = AppConfig() def tearDown(self): db.session.remove() @@ -74,7 +81,7 @@ class NetworkFunctionTests(TestCase): self.nf_2.create() sub = Subscription(**{"subscriptionName": "sub"}) for nf in [self.nf_1, self.nf_2]: - sub.add_network_function_to_subscription(nf) + sub.add_network_function_to_subscription(nf, self.app_conf.subscription.get()) NetworkFunction.delete(nf_name=self.nf_1.nf_name) diff --git a/components/pm-subscription-handler/tests/test_subscription.py b/components/pm-subscription-handler/tests/test_subscription.py index 74593a42..27a189c2 100755 --- a/components/pm-subscription-handler/tests/test_subscription.py +++ b/components/pm-subscription-handler/tests/test_subscription.py @@ -26,7 +26,7 @@ from requests import Session import mod.aai_client as aai_client from mod import db, create_app from mod.api.db_models import NetworkFunctionModel -from mod.network_function import NetworkFunction, OrchestrationStatus +from mod.network_function import NetworkFunction from mod.pmsh_utils import AppConfig from mod.subscription import Subscription @@ -51,27 +51,28 @@ class SubscriptionTest(TestCase): with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data: self.cbs_data = json.load(data) mock_get_pmsh_config.return_value = self.cbs_data - self.app_conf = AppConfig() - self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf) self.mock_mr_sub = mock_mr_sub self.mock_mr_pub = mock_mr_pub self.app = create_app() self.app_context = self.app.app_context() self.app_context.push() db.create_all() + self.app_conf = AppConfig() + self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf) + self.sub_model = self.app_conf.subscription.get() def tearDown(self): - db.session.remove() db.drop_all() + db.session.remove() self.app_context.pop() def test_xnf_filter_true(self): self.assertTrue(self.app_conf.nf_filter.is_nf_in_filter('pnf1', - OrchestrationStatus.ACTIVE.value)) + 'Active')) def test_xnf_filter_false(self): self.assertFalse(self.app_conf.nf_filter.is_nf_in_filter('PNF-33', - OrchestrationStatus.ACTIVE.value)) + 'Active')) def test_sub_measurement_group(self): self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2) @@ -82,18 +83,15 @@ class SubscriptionTest(TestCase): def test_get_subscription(self): sub_name = 'ExtraPM-All-gNB-R2B' self.app_conf.subscription.create() - new_sub = Subscription.get(sub_name) + new_sub = self.app_conf.subscription.get() self.assertEqual(sub_name, new_sub.subscription_name) - def test_get_subscription_no_match(self): - sub_name = 'sub2_does_not_exist' - sub = Subscription.get(sub_name) - self.assertEqual(sub, None) - def test_get_nf_names_per_sub(self): self.app_conf.subscription.create() - self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0]) - self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1]) + self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0], + self.sub_model) + self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1], + self.sub_model) nfs = Subscription.get_nf_names_per_sub(self.app_conf.subscription.subscriptionName) self.assertEqual(2, len(nfs)) @@ -105,37 +103,38 @@ class SubscriptionTest(TestCase): def test_add_network_functions_per_subscription(self): for nf in self.xnfs: - self.app_conf.subscription.add_network_function_to_subscription(nf) + self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model) nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations() self.assertEqual(3, len(nfs_for_sub_1)) new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Active') - self.app_conf.subscription.add_network_function_to_subscription(new_nf) + self.app_conf.subscription.add_network_function_to_subscription(new_nf, self.sub_model) nf_subs = Subscription.get_all_nfs_subscription_relations() self.assertEqual(4, len(nf_subs)) def test_add_duplicate_network_functions_per_subscription(self): - self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0]) + self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0], + self.sub_model) nf_subs = Subscription.get_all_nfs_subscription_relations() self.assertEqual(1, len(nf_subs)) - self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0]) + self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0], + self.sub_model) nf_subs = Subscription.get_all_nfs_subscription_relations() self.assertEqual(1, len(nf_subs)) def test_update_subscription_status(self): - sub_name = 'ExtraPM-All-gNB-R2B' self.app_conf.subscription.create() self.app_conf.subscription.administrativeState = 'new_status' self.app_conf.subscription.update_subscription_status() - sub = Subscription.get(sub_name) + sub = self.app_conf.subscription.get() self.assertEqual('new_status', sub.status) def test_delete_subscription(self): for nf in self.xnfs: - self.app_conf.subscription.add_network_function_to_subscription(nf) + self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model) self.app_conf.subscription.delete_subscription() self.assertEqual(0, len(Subscription.get_all())) - self.assertEqual(None, Subscription.get(self.app_conf.subscription.subscriptionName)) + self.assertEqual(None, self.app_conf.subscription.get()) self.assertEqual(0, len(Subscription.get_all_nfs_subscription_relations())) self.assertEqual(0, len(NetworkFunction.get_all())) self.assertEqual(None, NetworkFunction.get(nf_name=list(self.xnfs)[0].nf_name)) @@ -143,7 +142,7 @@ class SubscriptionTest(TestCase): def test_update_sub_nf_status(self): sub_name = 'ExtraPM-All-gNB-R2B' for nf in self.xnfs: - self.app_conf.subscription.add_network_function_to_subscription(nf) + self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model) sub_nfs = Subscription.get_all_nfs_subscription_relations() self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status) @@ -154,33 +153,27 @@ class SubscriptionTest(TestCase): @patch('mod.subscription.Subscription.add_network_function_to_subscription') @patch('mod.subscription.Subscription.update_sub_nf_status') - @patch('mod.subscription.Subscription.update_subscription_status') - def test_process_activate_subscription(self, mock_update_sub_status, - mock_update_sub_nf, mock_add_nfs): - self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub, - self.app_conf) + def test_process_activate_subscription(self, mock_update_sub_nf, mock_add_nfs): + self.app_conf.subscription.activate_subscription([list(self.xnfs)[0]], self.mock_mr_pub, + self.app_conf) - mock_update_sub_status.assert_called() mock_add_nfs.assert_called() self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called) mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName, 'PENDING_CREATE', list(self.xnfs)[0].nf_name) + @patch('mod.subscription.Subscription.get_network_functions') @patch('mod.subscription.Subscription.update_sub_nf_status') - @patch('mod.subscription.Subscription.update_subscription_status') - def test_process_deactivate_subscription(self, mock_update_sub_status, - mock_update_sub_nf): + def test_process_deactivate_subscription(self, mock_update_sub_nf, mock_get_nfs): self.app_conf.subscription.administrativeState = 'LOCKED' - self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub, - self.app_conf) - + mock_get_nfs.return_value = [list(self.xnfs)[0]] + self.app_conf.subscription.deactivate_subscription(self.mock_mr_pub, self.app_conf) self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called) mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName, 'PENDING_DELETE', list(self.xnfs)[0].nf_name) - mock_update_sub_status.assert_called() - def test_process_subscription_exception(self): - self.assertRaises(Exception, self.app_conf.subscription.process_subscription, + def test_activate_subscription_exception(self): + self.assertRaises(Exception, self.app_conf.subscription.activate_subscription, [list(self.xnfs)[0]], 'not_mr_pub', 'app_config') def test_prepare_subscription_event(self): @@ -193,7 +186,7 @@ class SubscriptionTest(TestCase): def test_get_nf_models(self): for nf in self.xnfs: - self.app_conf.subscription.add_network_function_to_subscription(nf) + self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model) nf_models = self.app_conf.subscription._get_nf_models() self.assertEqual(3, len(nf_models)) @@ -201,7 +194,7 @@ class SubscriptionTest(TestCase): def test_get_network_functions(self): for nf in self.xnfs: - self.app_conf.subscription.add_network_function_to_subscription(nf) + self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model) nfs = self.app_conf.subscription.get_network_functions() self.assertEqual(3, len(nfs)) diff --git a/components/pm-subscription-handler/tests/test_subscription_handler.py b/components/pm-subscription-handler/tests/test_subscription_handler.py index 65a7c2c0..a6611666 100644 --- a/components/pm-subscription-handler/tests/test_subscription_handler.py +++ b/components/pm-subscription-handler/tests/test_subscription_handler.py @@ -17,9 +17,11 @@ # ============LICENSE_END===================================================== import json import os +from test.support import EnvironmentVarGuard from unittest import TestCase from unittest.mock import patch +from mod import create_app, db from mod.network_function import NetworkFunction from mod.pmsh_utils import AppConfig from mod.subscription import AdministrativeState @@ -28,72 +30,95 @@ from mod.subscription_handler import SubscriptionHandler class SubscriptionHandlerTest(TestCase): + @classmethod + def setUpClass(cls): + cls.env = EnvironmentVarGuard() + cls.env.set('AAI_SERVICE_PORT', '8443') + cls.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml')) + cls.nfs = [NetworkFunction(nf_name='pnf_1'), NetworkFunction(nf_name='pnf_2')] + + @patch('mod.get_db_connection_url') + @patch('mod.update_logging_config') @patch('mod.pmsh_utils.AppConfig._get_pmsh_config') - @patch('mod.create_app') @patch('mod.pmsh_utils._MrPub') @patch('mod.pmsh_utils.PeriodicTask') - def setUp(self, mock_aai_event_thread, mock_mr_pub, mock_app, mock_get_pmsh_config): + def setUp(self, mock_periodic_task, mock_mr_pub, mock_get_pmsh_config, mock_update_config, + mock_get_db_url): + mock_get_db_url.return_value = 'sqlite://' with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data: self.cbs_data = json.load(data) mock_get_pmsh_config.return_value = self.cbs_data - self.app_conf = AppConfig() - self.mock_app = mock_app self.mock_mr_pub = mock_mr_pub - self.mock_aai_event_thread = mock_aai_event_thread - self.nf_1 = NetworkFunction(nf_name='pnf_1') - self.nf_2 = NetworkFunction(nf_name='pnf_2') - self.nfs = [self.nf_1, self.nf_2] + self.mock_aai_event_thread = mock_periodic_task + self.mock_policy_event_thread = mock_periodic_task + self.app = create_app() + self.app.app_context().push() + db.create_all() + self.app_conf = AppConfig() def tearDown(self): - pass + db.drop_all() + db.session.remove() + @patch('mod.subscription.Subscription.get_local_sub_admin_state') @patch('mod.logger.info') @patch('mod.aai_client.get_pmsh_nfs_from_aai') - def test_execute_no_change_of_state(self, mock_get_aai, mock_logger): + def test_execute_no_change_of_state(self, mock_get_aai, mock_logger, mock_get_sub_status): + mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value mock_get_aai.return_value = self.nfs - sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub, - self.mock_app, self.app_conf, - self.mock_aai_event_thread) + sub_handler = SubscriptionHandler(self.mock_mr_pub, + self.app, self.app_conf, + self.mock_aai_event_thread, + self.mock_policy_event_thread) sub_handler.execute() mock_logger.assert_called_with('Administrative State did not change in the Config') - @patch('mod.subscription.Subscription.process_subscription') + @patch('mod.subscription.Subscription.get_local_sub_admin_state') + @patch('mod.subscription.Subscription.activate_subscription') @patch('mod.aai_client.get_pmsh_nfs_from_aai') - def test_execute_change_of_state_unlocked(self, mock_get_aai, mock_process_sub): + def test_execute_change_of_state_to_unlocked(self, mock_get_aai, mock_activate_sub, + mock_get_sub_status): mock_get_aai.return_value = self.nfs + mock_get_sub_status.return_value = AdministrativeState.LOCKED.value self.mock_aai_event_thread.return_value.start.return_value = 'start_method' - sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub, - self.mock_app, self.app_conf, - self.mock_aai_event_thread.return_value) + sub_handler = SubscriptionHandler(self.mock_mr_pub, + self.app, self.app_conf, + self.mock_aai_event_thread.return_value, + self.mock_policy_event_thread) sub_handler.execute() - self.assertEqual(AdministrativeState.UNLOCKED.value, sub_handler.administrative_state) - mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf) + self.assertEqual(AdministrativeState.UNLOCKED.value, + self.app_conf.subscription.administrativeState) + mock_activate_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf) self.mock_aai_event_thread.return_value.start.assert_called() - @patch('mod.subscription.Subscription.process_subscription') + @patch('mod.subscription.Subscription.get_local_sub_admin_state') + @patch('mod.subscription.Subscription.deactivate_subscription') @patch('mod.aai_client.get_pmsh_nfs_from_aai') - def test_execute_change_of_state_locked(self, mock_get_aai, mock_process_sub): + def test_execute_change_of_state_to_locked(self, mock_get_aai, mock_deactivate_sub, + mock_get_sub_status): + mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value + self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value + self.app_conf.subscription.update_subscription_status() mock_get_aai.return_value = self.nfs self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method' - self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value - sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub, - self.mock_app, self.app_conf, - self.mock_aai_event_thread.return_value) + sub_handler = SubscriptionHandler(self.mock_mr_pub, + self.app, self.app_conf, + self.mock_aai_event_thread.return_value, + self.mock_policy_event_thread) sub_handler.execute() - self.assertEqual(AdministrativeState.LOCKED.value, sub_handler.administrative_state) - mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf) + mock_deactivate_sub.assert_called_with(self.mock_mr_pub, self.app_conf) self.mock_aai_event_thread.return_value.cancel.assert_called() - self.app_conf.subscription.administrativeState = AdministrativeState.UNLOCKED.value - @patch('mod.subscription.Subscription.process_subscription') + @patch('mod.subscription.Subscription.activate_subscription') @patch('mod.logger.error') @patch('mod.aai_client.get_pmsh_nfs_from_aai') - def test_execute_exception(self, mock_get_aai, mock_logger, mock_process_sub): + def test_execute_exception(self, mock_get_aai, mock_logger, mock_activate_sub): mock_get_aai.return_value = self.nfs - mock_process_sub.side_effect = Exception - sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub, - self.mock_app, self.app_conf, - self.mock_aai_event_thread) + mock_activate_sub.side_effect = Exception + sub_handler = SubscriptionHandler(self.mock_mr_pub, + self.app, self.app_conf, + self.mock_aai_event_thread, + self.mock_policy_event_thread) sub_handler.execute() mock_logger.assert_called_with('Error occurred during the activation/deactivation process ', exc_info=True) -- cgit 1.2.3-korg