summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2019-12-11 12:00:26 +0000
committerefiacor <fiachra.corcoran@est.tech>2020-01-20 12:54:46 +0000
commitbbe05d8a65ee0ac698d906b50282406bafe34f80 (patch)
treed2698f139427f9741eb8712682ffb73940a81d6e /components/pm-subscription-handler/pmsh_service/mod
parent85888c96619588108b7c97000ece0c13b27ac015 (diff)
Adding AAI client and apply filter
# Adding Subscription class # Adding MR and Utils helper classes Signed-off-by: efiacor <fiachra.corcoran@est.tech> Change-Id: I791b506d8cf166737d67cef22052852256f8a811 Issue-ID: DCAEGEN2-1930
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod')
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_client.py142
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py12
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py170
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/subscription.py61
4 files changed, 379 insertions, 6 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
new file mode 100755
index 00000000..8b51a712
--- /dev/null
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
@@ -0,0 +1,142 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2019-2020 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+import json
+import os
+import uuid
+
+import requests
+from requests.auth import HTTPBasicAuth
+
+import mod.pmsh_logging as logger
+from mod.subscription import Subscription, XnfFilter
+
+
+def get_pmsh_subscription_data(cbs_data):
+ """
+ Returns the PMSH subscription data
+
+ Args:
+ cbs_data: json app config from the Config Binding Service.
+
+ Returns:
+ Subscription, set(Xnf): `Subscription` <Subscription> object, set of XNFs to be added.
+
+ Raises:
+ RuntimeError: if AAI data cannot be retrieved.
+ """
+ aai_xnf_data = _get_all_aai_xnf_data()
+ if aai_xnf_data:
+ sub = Subscription(**cbs_data['policy']['subscription'])
+ xnfs = _filter_xnf_data(aai_xnf_data, XnfFilter(**sub.nfFilter))
+ else:
+ raise RuntimeError('Failed to get data from AAI')
+ return sub, xnfs
+
+
+def _get_all_aai_xnf_data():
+ """
+ Return queried xnf data from the AAI service.
+
+ Returns:
+ json: the json response from AAI query, else None.
+ """
+ xnf_data = None
+ try:
+ session = requests.Session()
+ aai_endpoint = f'{_get_aai_service_url()}{"/aai/v16/query"}'
+ headers = {'accept': 'application/json',
+ 'content-type': 'application/json',
+ 'x-fromappid': 'dcae-pmsh',
+ 'x-transactionid': str(uuid.uuid1())}
+ json_data = """
+ {'start':
+ ['network/pnfs',
+ 'network/generic-vnfs']
+ }"""
+ params = {'format': 'simple', 'nodesOnly': 'true'}
+ response = session.put(aai_endpoint, headers=headers,
+ auth=HTTPBasicAuth('AAI', 'AAI'),
+ data=json_data, params=params, verify=False)
+ response.raise_for_status()
+ if response.ok:
+ xnf_data = json.loads(response.text)
+ except Exception as e:
+ logger.debug(e)
+ return xnf_data
+
+
+def _get_aai_service_url():
+ """
+ Returns the URL of the AAI kubernetes service.
+
+ Returns:
+ str: the AAI k8s service URL.
+
+ Raises:
+ KeyError: if AAI env vars not found.
+ """
+ try:
+ aai_service = os.environ['AAI_SERVICE_HOST']
+ aai_ssl_port = os.environ['AAI_SERVICE_PORT_AAI_SSL']
+ return f'https://{aai_service}:{aai_ssl_port}'
+ except KeyError as e:
+ logger.debug(f'Failed to get AAI env vars: {e}')
+ raise
+
+
+def _filter_xnf_data(xnf_data, xnf_filter):
+ """
+ Returns a list of filtered xnfs using the xnf_filter .
+
+ Args:
+ xnf_data: the xnf json data from AAI.
+ xnf_filter: the `XnfFilter <XnfFilter>` to be applied.
+
+ Returns:
+ set: a set of filtered xnfs.
+
+ Raises:
+ KeyError: if AAI data cannot be parsed.
+ """
+ xnf_set = set()
+ try:
+ for xnf in xnf_data['results']:
+ name_identifier = 'pnf-name' if xnf['node-type'] == 'pnf' else 'vnf-name'
+ if xnf_filter.is_xnf_in_filter(xnf['properties'].get(name_identifier)):
+ xnf_set.add(Xnf(xnf_name=xnf['properties'].get('name_identifier'),
+ orchestration_status=xnf['properties'].get('orchestration-status')))
+ except KeyError as e:
+ logger.debug(f'Failed to parse AAI data: {e}')
+ raise
+ return xnf_set
+
+
+class Xnf:
+ def __init__(self, **kwargs):
+ """
+ Object representation of the XNF.
+ """
+ self.xnf_name = kwargs.get('xnf_name')
+ self.orchestration_status = kwargs.get('orchestration_status')
+
+ @classmethod
+ def xnf_def(cls):
+ return cls(xnf_name=None, orchestration_status=None)
+
+ def __str__(self):
+ return f'xnf-name: {self.xnf_name}, orchestration-status: {self.orchestration_status}'
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py
index 30c8db8e..f88ea137 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py
@@ -64,34 +64,34 @@ def get_module_logger(mod_name):
return logger
-def create_loggers():
+def create_loggers(logs_path="/var/log/ONAP/pmsh/logs"):
"""
Public method to set the global logger, launched from Run
This is *not* launched during unit testing, so unit tests do not
create/write log files
"""
- makedirs("/var/log/ONAP/pmsh/logs", exist_ok=True)
+ makedirs(logs_path, exist_ok=True)
# create the audit log
- aud_file = "/var/log/ONAP/pmsh/logs/audit.log"
+ aud_file = logs_path + "/audit.log"
open(aud_file, "a").close() # this is like "touch"
global _AUDIT_LOGGER
_AUDIT_LOGGER = _create_logger("pmsh_service_audit", aud_file)
# create the error log
- err_file = "/var/log/ONAP/pmsh/logs/error.log"
+ err_file = logs_path + "/error.log"
open(err_file, "a").close() # this is like "touch"
global _ERROR_LOGGER
_ERROR_LOGGER = _create_logger("pmsh_service_error", err_file)
# create the metrics log
- met_file = "/var/log/ONAP/pmsh/logs/metrics.log"
+ met_file = logs_path + "/metrics.log"
open(met_file, "a").close() # this is like "touch"
global _METRICS_LOGGER
_METRICS_LOGGER = _create_logger("pmsh_service_metrics", met_file)
# create the debug log
- debug_file = "/var/log/ONAP/pmsh/logs/debug.log"
+ debug_file = logs_path + "/debug.log"
open(debug_file, "a").close() # this is like "touch"
global _DEBUG_LOGGER
_DEBUG_LOGGER = _create_logger("pmsh_service_debug", debug_file)
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
new file mode 100755
index 00000000..b665691d
--- /dev/null
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -0,0 +1,170 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2019-2020 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+import uuid
+
+import requests
+from requests.auth import HTTPBasicAuth
+
+import mod.pmsh_logging as logger
+
+
+class AppConfig:
+ def __init__(self, **kwargs):
+ self.aaf_creds = {'aaf_id': kwargs.get('aaf_identity'),
+ 'aaf_pass': kwargs.get('aaf_password')}
+ self.cert_path = kwargs.get('cert_path')
+ self.key_path = kwargs.get('key_path')
+ self.streams_subscribes = kwargs.get('streams_subscribes')
+ self.streams_publishes = kwargs.get('streams_publishes')
+
+ def get_mr_sub(self, sub_name):
+ """
+ Returns the MrSub object requested.
+
+ Args:
+ sub_name: the key of the subscriber object.
+
+ Returns:
+ MrSub: an Instance of an `MrSub` <MrSub> Object.
+
+ Raises:
+ KeyError: if the sub_name is not found.
+ """
+ try:
+ return _MrSub(sub_name, self.aaf_creds, **self.streams_subscribes[sub_name])
+ except KeyError as e:
+ logger.debug(e)
+ raise
+
+ def get_mr_pub(self, pub_name):
+ """
+ Returns the MrPub object requested.
+
+ Args:
+ pub_name: the key of the publisher object.
+
+ Returns:
+ MrPub: an Instance of an `MrPub` <MrPub> Object.
+
+ Raises:
+ KeyError: if the sub_name is not found.
+ """
+ try:
+ return _MrPub(pub_name, self.aaf_creds, **self.streams_publishes[pub_name])
+ except KeyError as e:
+ logger.debug(e)
+ raise
+
+ @property
+ def cert_params(self):
+ """
+ Returns the tls artifact paths.
+
+ Returns:
+ cert_path, key_path: the path to tls cert and key.
+ """
+ return self.cert_path, self.key_path
+
+
+class _DmaapMrClient:
+ def __init__(self, aaf_creds, **kwargs):
+ """
+ A DMaaP Message Router utility class.
+ Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only.
+ Args:
+ aaf_creds: a dict of aaf secure credentials.
+ **kwargs: a dict of streams_{subscribes|publishes} data.
+ """
+ self.topic_url = kwargs.get('dmaap_info').get('topic_url')
+ self.aaf_id = aaf_creds.get('aaf_id')
+ self.aaf_pass = aaf_creds.get('aaf_pass')
+
+
+class _MrPub(_DmaapMrClient):
+ def __init__(self, pub_name, aaf_creds, **kwargs):
+ self.pub_name = pub_name
+ super().__init__(aaf_creds, **kwargs)
+
+ def publish_to_topic(self, event_json):
+ """
+ Publish the event to the DMaaP Message Router topic.
+
+ Args:
+ event_json: the json data to be published.
+
+ Raises:
+ Exception: if post request fails.
+ """
+ try:
+ session = requests.Session()
+ headers = {'content-type': 'application/json', 'x-transactionId': str(uuid.uuid1())}
+ response = session.post(self.topic_url, headers=headers,
+ auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
+ verify=False)
+ response.raise_for_status()
+ except Exception as e:
+ logger.debug(e)
+ raise
+
+ def publish_subscription_event_data(self, subscription, xnf_name):
+ """
+ Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
+
+ Args:
+ subscription: the `Subscription` <Subscription> object.
+ xnf_name: the xnf to include in the event.
+ """
+ try:
+ subscription_event = subscription.prepare_subscription_event(xnf_name)
+ self.publish_to_topic(subscription_event)
+ except Exception as e:
+ logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
+
+
+class _MrSub(_DmaapMrClient):
+ def __init__(self, sub_name, aaf_creds, **kwargs):
+ self.sub_name = sub_name
+ super().__init__(aaf_creds, **kwargs)
+
+ def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000):
+ """
+ Returns the json data from the MrTopic.
+
+ Args:
+ consumer_id: Within your subscribers group, a name that uniquely
+ identifies your subscribers process.
+ consumer_group: A name that uniquely identifies your subscribers.
+ timeout: The request timeout value in mSec.
+
+ Returns:
+ list[str]: the json response from DMaaP Message Router topic, else None.
+ """
+ topic_data = None
+ try:
+ session = requests.Session()
+ headers = {'accept': 'application/json', 'content-type': 'application/json'}
+ response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
+ f'?timeout={timeout}',
+ auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
+ verify=False)
+ response.raise_for_status()
+ if response.ok:
+ topic_data = response.json()
+ except Exception as e:
+ logger.debug(e)
+ return topic_data
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
new file mode 100755
index 00000000..aa3318ac
--- /dev/null
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
@@ -0,0 +1,61 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2019 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+import re
+
+
+class Subscription:
+ def __init__(self, **kwargs):
+ self.subscriptionName = kwargs.get('subscriptionName')
+ self.administrativeState = kwargs.get('administrativeState')
+ self.fileBasedGP = kwargs.get('fileBasedGP')
+ self.fileLocation = kwargs.get('fileLocation')
+ self.nfTypeModelInvariantId = kwargs.get('nfTypeModelInvariantId')
+ self.nfFilter = kwargs.get('nfFilter')
+ self.measurementGroups = kwargs.get('measurementGroups')
+
+ def prepare_subscription_event(self, xnf_name):
+ """Prepare the sub event for publishing
+
+ Args:
+ xnf_name: the AAI xnf name.
+
+ Returns:
+ dict: the Subscription event to be published.
+ """
+ clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
+ clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}'})
+ return clean_sub
+
+
+class XnfFilter:
+ def __init__(self, **kwargs):
+ self.nf_sw_version = kwargs.get('swVersions')
+ self.nf_names = kwargs.get('nfNames')
+ self.regex_matcher = re.compile('|'.join(raw_regex for raw_regex in self.nf_names))
+
+ def is_xnf_in_filter(self, xnf_name):
+ """Match the xnf name against regex values in Subscription.nfFilter.nfNames
+
+ Args:
+ xnf_name: the AAI xnf name.
+
+ Returns:
+ bool: True if matched, else False.
+ """
+
+ return self.regex_matcher.search(xnf_name)