diff options
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod')
4 files changed, 379 insertions, 6 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py new file mode 100755 index 00000000..8b51a712 --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -0,0 +1,142 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2019-2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== +import json +import os +import uuid + +import requests +from requests.auth import HTTPBasicAuth + +import mod.pmsh_logging as logger +from mod.subscription import Subscription, XnfFilter + + +def get_pmsh_subscription_data(cbs_data): + """ + Returns the PMSH subscription data + + Args: + cbs_data: json app config from the Config Binding Service. + + Returns: + Subscription, set(Xnf): `Subscription` <Subscription> object, set of XNFs to be added. + + Raises: + RuntimeError: if AAI data cannot be retrieved. + """ + aai_xnf_data = _get_all_aai_xnf_data() + if aai_xnf_data: + sub = Subscription(**cbs_data['policy']['subscription']) + xnfs = _filter_xnf_data(aai_xnf_data, XnfFilter(**sub.nfFilter)) + else: + raise RuntimeError('Failed to get data from AAI') + return sub, xnfs + + +def _get_all_aai_xnf_data(): + """ + Return queried xnf data from the AAI service. + + Returns: + json: the json response from AAI query, else None. + """ + xnf_data = None + try: + session = requests.Session() + aai_endpoint = f'{_get_aai_service_url()}{"/aai/v16/query"}' + headers = {'accept': 'application/json', + 'content-type': 'application/json', + 'x-fromappid': 'dcae-pmsh', + 'x-transactionid': str(uuid.uuid1())} + json_data = """ + {'start': + ['network/pnfs', + 'network/generic-vnfs'] + }""" + params = {'format': 'simple', 'nodesOnly': 'true'} + response = session.put(aai_endpoint, headers=headers, + auth=HTTPBasicAuth('AAI', 'AAI'), + data=json_data, params=params, verify=False) + response.raise_for_status() + if response.ok: + xnf_data = json.loads(response.text) + except Exception as e: + logger.debug(e) + return xnf_data + + +def _get_aai_service_url(): + """ + Returns the URL of the AAI kubernetes service. + + Returns: + str: the AAI k8s service URL. + + Raises: + KeyError: if AAI env vars not found. + """ + try: + aai_service = os.environ['AAI_SERVICE_HOST'] + aai_ssl_port = os.environ['AAI_SERVICE_PORT_AAI_SSL'] + return f'https://{aai_service}:{aai_ssl_port}' + except KeyError as e: + logger.debug(f'Failed to get AAI env vars: {e}') + raise + + +def _filter_xnf_data(xnf_data, xnf_filter): + """ + Returns a list of filtered xnfs using the xnf_filter . + + Args: + xnf_data: the xnf json data from AAI. + xnf_filter: the `XnfFilter <XnfFilter>` to be applied. + + Returns: + set: a set of filtered xnfs. + + Raises: + KeyError: if AAI data cannot be parsed. + """ + xnf_set = set() + try: + for xnf in xnf_data['results']: + name_identifier = 'pnf-name' if xnf['node-type'] == 'pnf' else 'vnf-name' + if xnf_filter.is_xnf_in_filter(xnf['properties'].get(name_identifier)): + xnf_set.add(Xnf(xnf_name=xnf['properties'].get('name_identifier'), + orchestration_status=xnf['properties'].get('orchestration-status'))) + except KeyError as e: + logger.debug(f'Failed to parse AAI data: {e}') + raise + return xnf_set + + +class Xnf: + def __init__(self, **kwargs): + """ + Object representation of the XNF. + """ + self.xnf_name = kwargs.get('xnf_name') + self.orchestration_status = kwargs.get('orchestration_status') + + @classmethod + def xnf_def(cls): + return cls(xnf_name=None, orchestration_status=None) + + def __str__(self): + return f'xnf-name: {self.xnf_name}, orchestration-status: {self.orchestration_status}' diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py index 30c8db8e..f88ea137 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py @@ -64,34 +64,34 @@ def get_module_logger(mod_name): return logger -def create_loggers(): +def create_loggers(logs_path="/var/log/ONAP/pmsh/logs"): """ Public method to set the global logger, launched from Run This is *not* launched during unit testing, so unit tests do not create/write log files """ - makedirs("/var/log/ONAP/pmsh/logs", exist_ok=True) + makedirs(logs_path, exist_ok=True) # create the audit log - aud_file = "/var/log/ONAP/pmsh/logs/audit.log" + aud_file = logs_path + "/audit.log" open(aud_file, "a").close() # this is like "touch" global _AUDIT_LOGGER _AUDIT_LOGGER = _create_logger("pmsh_service_audit", aud_file) # create the error log - err_file = "/var/log/ONAP/pmsh/logs/error.log" + err_file = logs_path + "/error.log" open(err_file, "a").close() # this is like "touch" global _ERROR_LOGGER _ERROR_LOGGER = _create_logger("pmsh_service_error", err_file) # create the metrics log - met_file = "/var/log/ONAP/pmsh/logs/metrics.log" + met_file = logs_path + "/metrics.log" open(met_file, "a").close() # this is like "touch" global _METRICS_LOGGER _METRICS_LOGGER = _create_logger("pmsh_service_metrics", met_file) # create the debug log - debug_file = "/var/log/ONAP/pmsh/logs/debug.log" + debug_file = logs_path + "/debug.log" open(debug_file, "a").close() # this is like "touch" global _DEBUG_LOGGER _DEBUG_LOGGER = _create_logger("pmsh_service_debug", debug_file) diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py new file mode 100755 index 00000000..b665691d --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -0,0 +1,170 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2019-2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== +import uuid + +import requests +from requests.auth import HTTPBasicAuth + +import mod.pmsh_logging as logger + + +class AppConfig: + def __init__(self, **kwargs): + self.aaf_creds = {'aaf_id': kwargs.get('aaf_identity'), + 'aaf_pass': kwargs.get('aaf_password')} + self.cert_path = kwargs.get('cert_path') + self.key_path = kwargs.get('key_path') + self.streams_subscribes = kwargs.get('streams_subscribes') + self.streams_publishes = kwargs.get('streams_publishes') + + def get_mr_sub(self, sub_name): + """ + Returns the MrSub object requested. + + Args: + sub_name: the key of the subscriber object. + + Returns: + MrSub: an Instance of an `MrSub` <MrSub> Object. + + Raises: + KeyError: if the sub_name is not found. + """ + try: + return _MrSub(sub_name, self.aaf_creds, **self.streams_subscribes[sub_name]) + except KeyError as e: + logger.debug(e) + raise + + def get_mr_pub(self, pub_name): + """ + Returns the MrPub object requested. + + Args: + pub_name: the key of the publisher object. + + Returns: + MrPub: an Instance of an `MrPub` <MrPub> Object. + + Raises: + KeyError: if the sub_name is not found. + """ + try: + return _MrPub(pub_name, self.aaf_creds, **self.streams_publishes[pub_name]) + except KeyError as e: + logger.debug(e) + raise + + @property + def cert_params(self): + """ + Returns the tls artifact paths. + + Returns: + cert_path, key_path: the path to tls cert and key. + """ + return self.cert_path, self.key_path + + +class _DmaapMrClient: + def __init__(self, aaf_creds, **kwargs): + """ + A DMaaP Message Router utility class. + Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only. + Args: + aaf_creds: a dict of aaf secure credentials. + **kwargs: a dict of streams_{subscribes|publishes} data. + """ + self.topic_url = kwargs.get('dmaap_info').get('topic_url') + self.aaf_id = aaf_creds.get('aaf_id') + self.aaf_pass = aaf_creds.get('aaf_pass') + + +class _MrPub(_DmaapMrClient): + def __init__(self, pub_name, aaf_creds, **kwargs): + self.pub_name = pub_name + super().__init__(aaf_creds, **kwargs) + + def publish_to_topic(self, event_json): + """ + Publish the event to the DMaaP Message Router topic. + + Args: + event_json: the json data to be published. + + Raises: + Exception: if post request fails. + """ + try: + session = requests.Session() + headers = {'content-type': 'application/json', 'x-transactionId': str(uuid.uuid1())} + response = session.post(self.topic_url, headers=headers, + auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, + verify=False) + response.raise_for_status() + except Exception as e: + logger.debug(e) + raise + + def publish_subscription_event_data(self, subscription, xnf_name): + """ + Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic. + + Args: + subscription: the `Subscription` <Subscription> object. + xnf_name: the xnf to include in the event. + """ + try: + subscription_event = subscription.prepare_subscription_event(xnf_name) + self.publish_to_topic(subscription_event) + except Exception as e: + logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}') + + +class _MrSub(_DmaapMrClient): + def __init__(self, sub_name, aaf_creds, **kwargs): + self.sub_name = sub_name + super().__init__(aaf_creds, **kwargs) + + def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000): + """ + Returns the json data from the MrTopic. + + Args: + consumer_id: Within your subscribers group, a name that uniquely + identifies your subscribers process. + consumer_group: A name that uniquely identifies your subscribers. + timeout: The request timeout value in mSec. + + Returns: + list[str]: the json response from DMaaP Message Router topic, else None. + """ + topic_data = None + try: + session = requests.Session() + headers = {'accept': 'application/json', 'content-type': 'application/json'} + response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}' + f'?timeout={timeout}', + auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers, + verify=False) + response.raise_for_status() + if response.ok: + topic_data = response.json() + except Exception as e: + logger.debug(e) + return topic_data diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py new file mode 100755 index 00000000..aa3318ac --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -0,0 +1,61 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2019 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== +import re + + +class Subscription: + def __init__(self, **kwargs): + self.subscriptionName = kwargs.get('subscriptionName') + self.administrativeState = kwargs.get('administrativeState') + self.fileBasedGP = kwargs.get('fileBasedGP') + self.fileLocation = kwargs.get('fileLocation') + self.nfTypeModelInvariantId = kwargs.get('nfTypeModelInvariantId') + self.nfFilter = kwargs.get('nfFilter') + self.measurementGroups = kwargs.get('measurementGroups') + + def prepare_subscription_event(self, xnf_name): + """Prepare the sub event for publishing + + Args: + xnf_name: the AAI xnf name. + + Returns: + dict: the Subscription event to be published. + """ + clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} + clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}'}) + return clean_sub + + +class XnfFilter: + def __init__(self, **kwargs): + self.nf_sw_version = kwargs.get('swVersions') + self.nf_names = kwargs.get('nfNames') + self.regex_matcher = re.compile('|'.join(raw_regex for raw_regex in self.nf_names)) + + def is_xnf_in_filter(self, xnf_name): + """Match the xnf name against regex values in Subscription.nfFilter.nfNames + + Args: + xnf_name: the AAI xnf name. + + Returns: + bool: True if matched, else False. + """ + + return self.regex_matcher.search(xnf_name) |