From 03e058dbd9e298eda2ce4fcb3a5babaf08434cf5 Mon Sep 17 00:00:00 2001 From: emartin <ephraim.martin@est.tech> Date: Wed, 25 Mar 2020 10:05:35 +0000 Subject: Handle graceful exit of PMSH Issue-ID: DCAEGEN2-1832 Change-Id: If0362e1927f7013d25f0cf23ade5ce9d2bdea8e3 Signed-off-by: emartin <ephraim.martin@est.tech> --- components/pm-subscription-handler/Changelog.md | 1 + .../pmsh_service/mod/exit_handler.py | 48 ++++++++++++++ .../pmsh_service/mod/pmsh_utils.py | 7 ++- .../pmsh_service/mod/subscription_handler.py | 12 ++-- .../pmsh_service/pmsh_service_main.py | 14 +++-- .../tests/test_exit_handler.py | 73 ++++++++++++++++++++++ .../tests/test_subscription_handler.py | 32 +++++----- 7 files changed, 162 insertions(+), 25 deletions(-) create mode 100755 components/pm-subscription-handler/pmsh_service/mod/exit_handler.py create mode 100755 components/pm-subscription-handler/tests/test_exit_handler.py diff --git a/components/pm-subscription-handler/Changelog.md b/components/pm-subscription-handler/Changelog.md index 0a849bab..0542d131 100644 --- a/components/pm-subscription-handler/Changelog.md +++ b/components/pm-subscription-handler/Changelog.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). * Moved subscription processing from main into its own subscription_handler module * Removed policy response handling functions from pmsh_utils and introduced policy_response_handler * Network function filter now resides in network_function instead of subscription +* Added graceful handling upon receiving SIGTERM signal ## [1.0.1] ### Fixed diff --git a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py new file mode 100755 index 00000000..adc4941c --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py @@ -0,0 +1,48 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== + +from mod.pmsh_utils import logger +from mod.subscription import AdministrativeState + + +class ExitHandler: + """ Handles PMSH graceful exit when a SIGTERM signal is received. + + Args: + periodic_tasks (List[PeriodicTask]): PeriodicTasks that needs to be cancelled. + subscription_handler (SubscriptionHandler): The subscription handler instance. + """ + + shutdown_signal_received = False + + def __init__(self, *, periodic_tasks, subscription_handler): + self.periodic_tasks = periodic_tasks + self.subscription_handler = subscription_handler + + def __call__(self, sig_num, frame): + logger.debug(f'ExitHandler was called with signal number: {sig_num}.') + current_sub = self.subscription_handler.current_sub + if current_sub and current_sub.administrativeState == AdministrativeState.UNLOCKED.value: + for thread in self.periodic_tasks: + logger.debug(f'Cancelling periodic task with thread name: {thread.name}.') + thread.cancel() + current_sub.administrativeState = AdministrativeState.LOCKED.value + current_sub.process_subscription(self.subscription_handler.current_nfs, + self.subscription_handler.mr_pub, + self.subscription_handler.app_conf) + ExitHandler.shutdown_signal_received = True diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 1fc3a097..750b7211 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -17,10 +17,12 @@ # ============LICENSE_END===================================================== import uuid +from threading import Timer + import requests -import mod.pmsh_logging as logger from requests.auth import HTTPBasicAuth -from threading import Timer + +import mod.pmsh_logging as logger class AppConfig: @@ -180,5 +182,6 @@ class PeriodicTask(Timer): """ def run(self): + self.function(*self.args, **self.kwargs) while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs) diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index a615aa77..40b8c962 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -22,14 +22,16 @@ from mod.subscription import AdministrativeState class SubscriptionHandler: - def __init__(self, config_handler, administrative_state, mr_pub, - aai_event_thread, app, app_conf): + def __init__(self, config_handler, administrative_state, mr_pub, app, app_conf, + aai_event_thread): + self.current_nfs = None + self.current_sub = None self.config_handler = config_handler self.administrative_state = administrative_state self.mr_pub = mr_pub - self.aai_event_thread = aai_event_thread self.app = app self.app_conf = app_conf + self.aai_event_thread = aai_event_thread def execute(self): """ @@ -44,9 +46,9 @@ class SubscriptionHandler: if self.administrative_state == new_administrative_state: logger.debug('Administrative State did not change in the Config') else: - sub, network_functions = aai.get_pmsh_subscription_data(config) + self.current_sub, self.current_nfs = aai.get_pmsh_subscription_data(config) self.administrative_state = new_administrative_state - sub.process_subscription(network_functions, self.mr_pub, self.app_conf) + self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf) if new_administrative_state == AdministrativeState.UNLOCKED.value: logger.debug('Listening to AAI-EVENT topic in MR.') diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index af5aece2..60cf89c0 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -17,16 +17,18 @@ # ============LICENSE_END===================================================== import sys +from signal import signal, SIGTERM import mod.aai_client as aai import mod.pmsh_logging as logger from mod import db, create_app, launch_api_server +from mod.aai_event_handler import process_aai_events from mod.config_handler import ConfigHandler +from mod.exit_handler import ExitHandler from mod.pmsh_utils import AppConfig, PeriodicTask from mod.policy_response_handler import PolicyResponseHandler from mod.subscription import Subscription, AdministrativeState from mod.subscription_handler import SubscriptionHandler -from mod.aai_event_handler import process_aai_events def main(): @@ -46,17 +48,21 @@ def main(): else AdministrativeState.LOCKED.value aai_event_thread = PeriodicTask(10, process_aai_events, - args=(mr_aai_event_sub, sub, policy_mr_pub, app, app_conf)) + args=(mr_aai_event_sub, + sub, policy_mr_pub, app, app_conf)) subscription_handler = SubscriptionHandler(config_handler, administrative_state, - policy_mr_pub, aai_event_thread, app, app_conf) + policy_mr_pub, app, app_conf, aai_event_thread) policy_response_handler = PolicyResponseHandler(policy_mr_sub, sub.subscriptionName, app) subscription_handler_thread = PeriodicTask(30, subscription_handler.execute) policy_response_handler_thread = PeriodicTask(5, policy_response_handler.poll_policy_topic) - subscription_handler_thread.start() policy_response_handler_thread.start() + periodic_tasks = [aai_event_thread, subscription_handler_thread, + policy_response_handler_thread] + signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks, + subscription_handler=subscription_handler)) launch_api_server(app_conf) except Exception as e: diff --git a/components/pm-subscription-handler/tests/test_exit_handler.py b/components/pm-subscription-handler/tests/test_exit_handler.py new file mode 100755 index 00000000..0cce1db9 --- /dev/null +++ b/components/pm-subscription-handler/tests/test_exit_handler.py @@ -0,0 +1,73 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== +import os +import signal +import threading +import time +from unittest import TestCase +from unittest.mock import patch, Mock, MagicMock + +import pmsh_service_main +from mod.exit_handler import ExitHandler +from mod.pmsh_utils import PeriodicTask +from mod.subscription import AdministrativeState + + +class ExitHandlerTests(TestCase): + + @patch('pmsh_service_main.ConfigHandler') + @patch('pmsh_service_main.create_app') + @patch('pmsh_service_main.db') + @patch('pmsh_service_main.aai.get_pmsh_subscription_data') + @patch('pmsh_service_main.AppConfig') + @patch('pmsh_service_main.Subscription') + @patch('pmsh_service_main.launch_api_server') + @patch('pmsh_service_main.SubscriptionHandler') + @patch.object(PeriodicTask, 'start') + @patch.object(PeriodicTask, 'cancel') + def test_terminate_signal_success(self, mock_task_cancel, mock_task_start, mock_sub_handler, + mock_launch_api_server, mock_sub, mock_app_conf, mock_aai, + mock_db, mock_app, mock_config_handler): + pid = os.getpid() + mock_aai.return_value = [Mock(), Mock()] + mock_db.get_app.return_value = Mock() + + mock_sub.administrativeState = AdministrativeState.UNLOCKED.value + mock_sub.process_subscription = Mock() + mock_sub_handler_instance = MagicMock(execute=Mock(), current_sub=mock_sub) + mock_sub_handler.side_effect = [mock_sub_handler_instance] + + def mock_api_server_run(param): + while mock_sub.administrativeState == AdministrativeState.UNLOCKED.value: + time.sleep(1) + + mock_launch_api_server.side_effect = mock_api_server_run + + def trigger_signal(): + time.sleep(1) + os.kill(pid, signal.SIGTERM) + + thread = threading.Thread(target=trigger_signal) + thread.start() + + pmsh_service_main.main() + + self.assertEqual(3, mock_task_cancel.call_count) + self.assertTrue(ExitHandler.shutdown_signal_received) + self.assertEqual(1, mock_sub.process_subscription.call_count) + self.assertEqual(mock_sub.administrativeState, AdministrativeState.LOCKED.value) diff --git a/components/pm-subscription-handler/tests/test_subscription_handler.py b/components/pm-subscription-handler/tests/test_subscription_handler.py index 0eed7c45..168d0366 100644 --- a/components/pm-subscription-handler/tests/test_subscription_handler.py +++ b/components/pm-subscription-handler/tests/test_subscription_handler.py @@ -15,14 +15,14 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== -import os import json +import os from unittest import TestCase from unittest.mock import patch -from mod.subscription_handler import SubscriptionHandler -from mod.subscription import AdministrativeState from mod.network_function import NetworkFunction +from mod.subscription import AdministrativeState +from mod.subscription_handler import SubscriptionHandler class SubscriptionHandlerTest(TestCase): @@ -30,19 +30,19 @@ class SubscriptionHandlerTest(TestCase): @patch('mod.create_app') @patch('mod.subscription.Subscription') @patch('mod.pmsh_utils._MrPub') - @patch('mod.pmsh_utils.PeriodicTask') @patch('mod.config_handler.ConfigHandler') @patch('mod.pmsh_utils.AppConfig') - def setUp(self, mock_app_conf, mock_config_handler, mock_aai_thread, mock_mr_pub, + @patch('mod.pmsh_utils.PeriodicTask') + def setUp(self, mock_aai_event_thread, mock_app_conf, mock_config_handler, mock_mr_pub, mock_sub, mock_app): with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data: self.cbs_data_1 = json.load(data) self.mock_app = mock_app self.mock_sub = mock_sub self.mock_mr_pub = mock_mr_pub - self.mock_aai_thread = mock_aai_thread self.mock_config_handler = mock_config_handler self.mock_app_conf = mock_app_conf + self.mock_aai_event_thread = mock_aai_event_thread self.nf_1 = NetworkFunction(nf_name='pnf_1') self.nf_2 = NetworkFunction(nf_name='pnf_2') self.nfs = [self.nf_1, self.nf_2] @@ -54,7 +54,8 @@ class SubscriptionHandlerTest(TestCase): self.mock_config_handler.get_config.return_value = self.cbs_data_1 sub_handler = SubscriptionHandler(self.mock_config_handler, AdministrativeState.UNLOCKED.value, self.mock_mr_pub, - self.mock_aai_thread, self.mock_app, self.mock_app_conf) + self.mock_app, self.mock_app_conf, + self.mock_aai_event_thread) sub_handler.execute() mock_logger.assert_called_with('Administrative State did not change in the Config') @@ -62,34 +63,36 @@ class SubscriptionHandlerTest(TestCase): @patch('mod.aai_client.get_pmsh_subscription_data') def test_execute_change_of_state_unlocked(self, mock_get_aai): mock_get_aai.return_value = self.mock_sub, self.nfs - self.mock_aai_thread.return_value.start.return_value = 'start_method' + self.mock_aai_event_thread.return_value.start.return_value = 'start_method' self.mock_config_handler.get_config.return_value = self.cbs_data_1 sub_handler = SubscriptionHandler(self.mock_config_handler, AdministrativeState.LOCKED.value, self.mock_mr_pub, - self.mock_aai_thread, self.mock_app, self.mock_app_conf) + self.mock_app, self.mock_app_conf, + self.mock_aai_event_thread.return_value) sub_handler.execute() self.assertEqual(AdministrativeState.UNLOCKED.value, sub_handler.administrative_state) self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub, self.mock_app_conf) - self.mock_aai_thread.start.assert_called() + self.mock_aai_event_thread.return_value.start.assert_called() @patch('mod.aai_client.get_pmsh_subscription_data') def test_execute_change_of_state_locked(self, mock_get_aai): mock_get_aai.return_value = self.mock_sub, self.nfs - self.mock_aai_thread.return_value.cancel.return_value = 'cancel_method' + self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method' self.cbs_data_1['policy']['subscription']['administrativeState'] = \ AdministrativeState.LOCKED.value self.mock_config_handler.get_config.return_value = self.cbs_data_1 sub_handler = SubscriptionHandler(self.mock_config_handler, AdministrativeState.UNLOCKED.value, self.mock_mr_pub, - self.mock_aai_thread, self.mock_app, self.mock_app_conf) + self.mock_app, self.mock_app_conf, + self.mock_aai_event_thread.return_value) sub_handler.execute() self.assertEqual(AdministrativeState.LOCKED.value, sub_handler.administrative_state) self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub, self.mock_app_conf) - self.mock_aai_thread.cancel.assert_called() + self.mock_aai_event_thread.return_value.cancel.assert_called() @patch('mod.pmsh_logging.debug') @patch('mod.aai_client.get_pmsh_subscription_data') @@ -99,7 +102,8 @@ class SubscriptionHandlerTest(TestCase): self.mock_sub.process_subscription.side_effect = Exception sub_handler = SubscriptionHandler(self.mock_config_handler, AdministrativeState.LOCKED.value, self.mock_mr_pub, - self.mock_aai_thread, self.mock_app, self.mock_app_conf) + self.mock_app, self.mock_app_conf, + self.mock_aai_event_thread) sub_handler.execute() mock_logger.assert_called_with('Error occurred during the activation/deactivation process ') -- cgit 1.2.3-korg