summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
blob: f50f5ab2c9e338f37ec6f15f86dda211ef0fcd6a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# ============LICENSE_START===================================================
#  Copyright (C) 2020-2021 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================

from mod import logger, aai_client
from mod.aai_event_handler import process_aai_events
from mod.pmsh_utils import PeriodicTask
from mod.subscription import AdministrativeState


class SubscriptionHandler:
    def __init__(self, mr_pub, aai_sub, app, app_conf):
        self.mr_pub = mr_pub
        self.aai_sub = aai_sub
        self.app = app
        self.app_conf = app_conf
        self.aai_event_thread = None

    def execute(self):
        """
        Checks for changes of administrative state in config and proceeds to process
        the Subscription if a change has occurred
        """
        self.app.app_context().push()
        try:
            local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
            if local_admin_state == AdministrativeState.LOCKING.value:
                self._check_for_failed_nfs()
            else:
                self.app_conf.refresh_config()
                new_administrative_state = self.app_conf.subscription.administrativeState
                if local_admin_state == new_administrative_state:
                    logger.info(f'Administrative State did not change in the app config: '
                                f'{new_administrative_state}')
                else:
                    self._check_state_change(local_admin_state, new_administrative_state)
        except Exception as err:
            logger.error(f'Error occurred during the activation/deactivation process {err}',
                         exc_info=True)

    def _check_state_change(self, local_admin_state, new_administrative_state):
        if new_administrative_state == AdministrativeState.UNLOCKED.value:
            logger.info(f'Administrative State has changed from {local_admin_state} '
                        f'to {new_administrative_state}.')
            self._activate(new_administrative_state)
        elif new_administrative_state == AdministrativeState.LOCKED.value:
            logger.info(f'Administrative State has changed from {local_admin_state} '
                        f'to {new_administrative_state}.')
            self._deactivate()
        else:
            raise Exception(f'Invalid AdministrativeState: {new_administrative_state}')

    def _activate(self, new_administrative_state):
        self._start_aai_event_thread()
        self.app_conf.subscription.update_sub_params(new_administrative_state,
                                                     self.app_conf.subscription.fileBasedGP,
                                                     self.app_conf.subscription.fileLocation,
                                                     self.app_conf.subscription.measurementGroups)
        nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
        self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub,
                                                              self.app_conf)
        self.app_conf.subscription.update_subscription_status()

    def _deactivate(self):
        nfs = self.app_conf.subscription.get_network_functions()
        if nfs:
            self.stop_aai_event_thread()
            self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value
            logger.info('Subscription is now LOCKING/DEACTIVATING.')
            self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub, self.app_conf)
            self.app_conf.subscription.update_subscription_status()

    def _start_aai_event_thread(self):
        logger.info('Starting polling for NF info on AAI-EVENT topic on DMaaP MR.')
        self.aai_event_thread = PeriodicTask(20, process_aai_events, args=(self.aai_sub,
                                                                           self.mr_pub,
                                                                           self.app,
                                                                           self.app_conf))
        self.aai_event_thread.name = 'aai_event_thread'
        self.aai_event_thread.start()

    def stop_aai_event_thread(self):
        if self.aai_event_thread is not None:
            self.aai_event_thread.cancel()
            self.aai_event_thread = None
            logger.info('Stopping polling for NFs events on AAI-EVENT topic in MR.')

    def _check_for_failed_nfs(self):
        logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.')
        del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs()
        if del_failed_nfs or self.app_conf.subscription.get_delete_pending_nfs():
            for nf in del_failed_nfs:
                nf_model = nf.get(nf.nf_name)
                if nf_model.retry_count < 3:
                    logger.info(f'Retry deletion of subscription '
                                f'{self.app_conf.subscription.subscriptionName} '
                                f'from NF: {nf.nf_name}')
                    self.app_conf.subscription.delete_subscription_from_nfs([nf], self.mr_pub,
                                                                            self.app_conf)
                    nf.increment_retry_count()
                else:
                    logger.error(f'Failed to delete the subscription '
                                 f'{self.app_conf.subscription.subscriptionName} '
                                 f'from NF: {nf.nf_name} after {nf_model.retry_count} '
                                 f'attempts. Removing NF from DB')
                    nf.delete(nf_name=nf.nf_name)
        else:
            logger.info('Proceeding to LOCKED adminState.')
            self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
            self.app_conf.subscription.update_subscription_status()