1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# ============LICENSE_START===================================================
# Copyright (C) 2020 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
import json
from tenacity import retry, wait_fixed, retry_if_exception_type
import mod.pmsh_logging as logger
from mod.network_function import NetworkFunction
from mod.subscription import Subscription, AdministrativeState, subscription_nf_states
policy_response_handle_functions = {
AdministrativeState.LOCKED.value: {
'success': NetworkFunction.delete,
'failed': Subscription.update_sub_nf_status
},
AdministrativeState.UNLOCKED.value: {
'success': Subscription.update_sub_nf_status,
'failed': Subscription.update_sub_nf_status
}
}
class PolicyResponseHandler:
def __init__(self, mr_sub, subscription_name, app):
self.mr_sub = mr_sub
self.subscription_name = subscription_name
self.app = app
@retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception))
def poll_policy_topic(self):
"""
This method polls MR for response from policy. It checks whether the message is for the
relevant subscription and then handles the response
"""
self.app.app_context().push()
administrative_state = Subscription.get(self.subscription_name).status
try:
response_data = self.mr_sub.get_from_topic('policy_response_consumer')
for data in response_data:
data = json.loads(data)
if data['status']['subscriptionName'] == self.subscription_name:
nf_name = data['status']['nfName']
response_message = data['status']['message']
self._handle_response(self.subscription_name, administrative_state,
nf_name, response_message)
except Exception as err:
raise Exception(f'Error trying to poll policy response topic on MR: {err}')
@staticmethod
def _handle_response(subscription_name, administrative_state, nf_name, response_message):
"""
Handles the response from Policy, updating the DB
Args:
subscription_name (str): The subscription name
administrative_state (str): The administrative state of the subscription
nf_name (str): The network function name
response_message (str): The message in the response regarding the state (success|failed)
"""
logger.debug(f'Response from MR: Sub: {subscription_name} for '
f'NF: {nf_name} received, updating the DB')
try:
sub_nf_status = subscription_nf_states[administrative_state][response_message].value
policy_response_handle_functions[administrative_state][response_message](
subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name)
except Exception as err:
raise Exception(f'Error changing nf_sub status in the DB: {err}')
|