summaryrefslogtreecommitdiffstats
path: root/dcae-services-policy-sync/policysync/clients.py
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-services-policy-sync/policysync/clients.py')
-rw-r--r--dcae-services-policy-sync/policysync/clients.py512
1 files changed, 512 insertions, 0 deletions
diff --git a/dcae-services-policy-sync/policysync/clients.py b/dcae-services-policy-sync/policysync/clients.py
new file mode 100644
index 0000000..698bc86
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/clients.py
@@ -0,0 +1,512 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Clients for communicating with both the post dublin and pre dublin APIs"""
+import json
+import re
+import base64
+import uuid
+import asyncio
+import aiohttp
+import policysync.metrics as metrics
+from .util import get_module_logger
+
+logger = get_module_logger(__name__)
+
+# Websocket config
+WS_HEARTBEAT = 60
+WS_NOTIFICATIONS_ENDPOINT = "pdp/notifications"
+# REST config
+V1_DECISION_ENDPOINT = "policy/pdpx/v1/decision"
+V0_DECISION_ENDPOINT = "pdp/api"
+
+APPLICATION_JSON = "application/json"
+
+
+def get_single_regex(filters, ids):
+ """given a list of filters and ids returns a single regex for matching"""
+ filters = [] if filters is None else filters
+ ids = [] if ids is None else ["{}[.][0-9]+[.]xml".format(x) for x in ids]
+ return "|".join(filters + ids) if filters is not None else ""
+
+
+class BasePolicyClient:
+ """ Base policy client that is pluggable into inventory """
+ def __init__(self, pdp_url, headers=None):
+ self.headers = {} if headers is None else headers
+ self.session = None
+ self.pdp_url = pdp_url
+
+ def _init_rest_session(self):
+ """
+ initialize an aiohttp rest session
+ :returns: an aiohttp rest session
+ """
+ if self.session is None:
+ self.session = aiohttp.ClientSession(
+ headers=self.headers, raise_for_status=True
+ )
+
+ return self.session
+
+ async def _run_request(self, endpoint, request_data):
+ """
+ execute a particular REST request
+ :param endpoint: str rest endpoint to query
+ :param request_data: dictionary request data
+ :returns: dictionary response data
+ """
+ session = self._init_rest_session()
+ async with session.post(
+ "{0}/{1}".format(self.pdp_url, endpoint), json=request_data
+ ) as resp:
+ data = await resp.read()
+ return json.loads(data)
+
+ def supports_notifications(self):
+ """
+ does this particular client support real time notifictions
+ :returns: True
+ """
+ # in derived classes we may use self
+ # pylint: disable=no-self-use
+ return True
+
+ async def list_policies(self, filters=None, ids=None):
+ """
+ used to get a list of policies matching a particular ID
+ :param filters: list of regex filter strings for matching
+ :param ids: list of id strings for matching
+ :returns: List of policies matching filters or ids
+ """
+ raise NotImplementedError
+
+ async def get_config(self, filters=None, ids=None):
+ """
+ used to get a list of policies matching a particular ID
+ :returns: List of policies matching filters or ids
+ """
+ raise NotImplementedError
+
+ async def notificationhandler(self, callback, ids=None, filters=None):
+ """
+ Clients should implement this to support real time notifications
+ :param callback: func to execute when a matching notification is found
+ :param ids: list of id strings for matching
+ """
+ raise NotImplementedError
+
+ async def close(self):
+ """ close the policy client """
+ logger.info("closing websocket clients...")
+ if self.session:
+ await self.session.close()
+
+
+class PolicyClientV0(BasePolicyClient):
+ """
+ Supports the legacy v0 policy API use prior to ONAP Dublin
+ """
+ async def close(self):
+ """ close the policy client """
+ await super().close()
+ if self.ws_session is not None:
+ await self.ws_session.close()
+
+ def __init__(
+ self,
+ headers,
+ pdp_url,
+ decision_endpoint=V0_DECISION_ENDPOINT,
+ ws_endpoint=WS_NOTIFICATIONS_ENDPOINT
+ ):
+ """
+ Initialize a v0 policy client
+ :param headers: Headers to use for policy rest api
+ :param pdp_url: URL of the PDP
+ :param decision_endpoint: root for the decison API
+ :param websocket_endpoint: root of the websocket endpoint
+ """
+ super().__init__(pdp_url, headers=headers)
+ self.ws_session = None
+ self.session = None
+ self.decision_endpoint = decision_endpoint
+ self.ws_endpoint = ws_endpoint
+ self._ws = None
+
+ def _init_ws_session(self):
+ """initialize a websocket session for notifications"""
+ if self.ws_session is None:
+ self.ws_session = aiohttp.ClientSession()
+
+ return self.ws_session
+
+ @metrics.list_policy_exceptions.count_exceptions()
+ async def list_policies(self, filters=None, ids=None):
+ """
+ used to get a list of policies matching a particular ID
+ :param filters: list of regex filter strings for matching
+ :param ids: list of id strings for matching
+ :returns: List of policies matching filters or ids
+ """
+ request_data = self._prepare_request(filters, ids)
+ policies = await self._run_request(
+ f"{self.decision_endpoint}/listPolicy", request_data
+ )
+ return set(policies)
+
+ @classmethod
+ def _prepare_request(cls, filters, ids):
+ """prepare the request body for the v0 api"""
+ regex = get_single_regex(filters, ids)
+ return {"policyName": regex}
+
+ @metrics.get_config_exceptions.count_exceptions()
+ async def get_config(self, filters=None, ids=None):
+ """
+ Used to get the actual policy configuration from PDP
+ :return: the policy objects that are currently active
+ for the given set of filters
+ """
+ request_data = self._prepare_request(filters, ids)
+ policies = await self._run_request(
+ f"{self.decision_endpoint}/getConfig", request_data)
+
+ for policy in policies:
+ try:
+ policy["config"] = json.loads(policy["config"])
+ except json.JSONDecodeError:
+ pass
+
+ return policies
+
+ @classmethod
+ def _needs_update(cls, update, ids=None, filters=None):
+ """
+ Expect something like this
+ {
+ "removedPolicies": [{
+ "policyName": "xyz.45.xml",
+ "versionNo": "45"
+ }],
+ "loadedPolicies": [{
+ "policyName": "xyz.46.xml",
+ "versionNo": "46",
+ "matches": {
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "guard": "false",
+ "location": " Edge",
+ "TTLDate": "NA",
+ "uuid": "TestUUID",
+ "RiskLevel": "5",
+ "RiskType": "default"
+ },
+ "updateType": "UPDATE"
+ }],
+ "notificationType": "BOTH"
+ }
+ """
+ for policy in update.get("removedPolicies", []) + update.get(
+ "loadedPolicies", []
+ ):
+ if (
+ re.match(get_single_regex(filters, ids), policy["policyName"])
+ is not None
+ ):
+ return True
+
+ return False
+
+ async def notificationhandler(self, callback, ids=None, filters=None):
+ """
+ websocket based notification handler for
+ :param callback: function to execute when
+ a matching notification is found
+ :param ids: list of id strings for matching
+ """
+
+ url = self.pdp_url.replace("https", "wss")
+
+ # The websocket we start here will periodically
+ # send heartbeat (ping frames) to policy
+ # this ensures that we are never left hanging
+ # with our communication with policy.
+ session = self._init_ws_session()
+ try:
+ websocket = await session.ws_connect(
+ "{0}/{1}".format(url, self.ws_endpoint), heartbeat=WS_HEARTBEAT
+ )
+ logger.info("websock with policy established")
+ async for msg in websocket:
+ # check for websocket errors
+ # break out of this async for loop. to attempt reconnection
+ if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
+ break
+
+ if msg.type is (aiohttp.WSMsgType.TEXT):
+ if self._needs_update(
+ json.loads(msg.data),
+ ids=ids,
+ filters=filters
+ ):
+ logger.debug(
+ "notification received from pdp websocket -> %s", msg
+ )
+ await callback()
+ else:
+ logger.warning(
+ "unexpected websocket message type received %s", msg.type
+ )
+ except aiohttp.ClientError:
+ logger.exception("Received connection error with websocket")
+
+
+class PolicyClientV1(BasePolicyClient):
+ """
+ Supports the v1 policy API introduced in ONAP's dublin release
+ """
+
+ async def close(self):
+ """ close the policy client """
+ await super().close()
+ if self.dmaap_session is not None:
+ await self.dmaap_session.close()
+
+ def _init_dmaap_session(self):
+ """ initialize a dmaap session for notifications """
+ if self.dmaap_session is None:
+ self.dmaap_session = aiohttp.ClientSession(
+ headers=self.dmaap_headers,
+ raise_for_status=True
+ )
+
+ return self.dmaap_session
+
+ def __init__(
+ self,
+ headers,
+ pdp_url,
+ **kwargs,
+ ):
+ super().__init__(pdp_url, headers=headers)
+ self._ws = None
+ self.audit_uuid = str(uuid.uuid4())
+ self.dmaap_url = kwargs.get('dmaap_url')
+ self.dmaap_timeout = 15000
+ self.dmaap_session = None
+ self.dmaap_headers = kwargs.get('dmaap_headers', {})
+ self.decision = kwargs.get('v1_decision', V1_DECISION_ENDPOINT)
+
+ async def list_policies(self, filters=None, ids=None):
+ """
+ ONAP has no real equivalent to this.
+ :returns: None
+ """
+ # in derived classes we may use self
+ # pylint: disable=no-self-use
+ return None
+
+ @classmethod
+ def convert_to_policy(cls, policy_body):
+ """
+ Convert raw policy to format expected by microservices
+ :param policy_body: raw dictionary output from pdp
+ :returns: data in proper formatting
+ """
+ pdp_metadata = policy_body.get("metadata", {})
+ policy_id = pdp_metadata.get("policy-id")
+ policy_version = policy_body.get("version")
+ if not policy_id or policy_version is None:
+ logger.warning("Malformed policy is missing policy-id and version")
+ return None
+
+ policy_body["policyName"] = "{}.{}.xml".format(
+ policy_id, str(policy_version.replace(".", "-"))
+ )
+ policy_body["policyVersion"] = str(policy_version)
+ if "properties" in policy_body:
+ policy_body["config"] = policy_body["properties"]
+ del policy_body["properties"]
+
+ return policy_body
+
+ @metrics.get_config_exceptions.count_exceptions()
+ async def get_config(self, filters=None, ids=None):
+ """
+ Used to get the actual policy configuration from PDP
+ :returns: the policy objects that are currently active
+ for the given set of filters
+ """
+ if ids is None:
+ ids = []
+
+ request_data = {
+ "ONAPName": "DCAE",
+ "ONAPComponent": "policy-sync",
+ "ONAPInstance": self.audit_uuid,
+ "action": "configure",
+ "resource": {"policy-id": ids}
+ }
+
+ data = await self._run_request(self.decision, request_data)
+ out = []
+ for policy_body in data["policies"].values():
+ policy = self.convert_to_policy(policy_body)
+ if policy is not None:
+ out.append(policy)
+
+ return out
+
+ def supports_notifications(self):
+ """
+ Does this policy client support real time notifications
+ :returns: True if the dmaap url is set else return false
+ """
+ return self.dmaap_url is not None
+
+ @classmethod
+ def _needs_update(cls, update, ids):
+ """
+ expect something like this
+ {
+ "deployed-policies": [
+ {
+ "policy-type": "onap.policies.monitoring.tcagen2",
+ "policy-type-version": "1.0.0",
+ "policy-id": "onap.scaleout.tca",
+ "policy-version": "2.0.0",
+ "success-count": 3,
+ "failure-count": 0
+ }
+ ],
+ "undeployed-policies": [
+ {
+ "policy-type": "onap.policies.monitoring.tcagen2",
+ "policy-type-version": "1.0.0",
+ "policy-id": "onap.firewall.tca",
+ "policy-version": "6.0.0",
+ "success-count": 3,
+ "failure-count": 0
+ }
+ ]
+ }
+ """
+ for policy in update.get("deployed-policies", []) + update.get(
+ "undeployed-policies", []
+ ):
+ if policy.get("policy-id") in ids:
+ return True
+
+ return False
+
+ async def poll_dmaap(self, callback, ids=None):
+ """
+ one GET request to dmaap
+ :param callback: function to execute when a
+ matching notification is found
+ :param ids: list of id strings for matching
+ """
+ query = f"?timeout={self.dmaap_timeout}"
+ url = f"{self.dmaap_url}/{self.audit_uuid}/0{query}"
+ logger.info("polling topic: %s", url)
+ session = self._init_dmaap_session()
+ try:
+ async with session.get(url) as response:
+ messages = await response.read()
+
+ for msg in json.loads(messages):
+ if self._needs_update(json.loads(msg), ids):
+ logger.info(
+ "notification received from dmaap -> %s", msg
+ )
+ await callback()
+ except aiohttp.ClientError:
+ logger.exception('received connection error from dmaap topic')
+ # wait some time
+ await asyncio.sleep(30)
+
+ async def notificationhandler(self, callback, ids=None, filters=None):
+ """
+ dmaap based notification handler for
+ :param callback: function to execute when a
+ matching notification is found
+ :param ids: list of id strings for matching
+ """
+ if filters is not None:
+ logger.warning("filters are not supported with pdp v1..ignoring")
+ while True:
+ await self.poll_dmaap(callback, ids=ids)
+
+
+def get_client(
+ pdp_url,
+ use_v0=False,
+ **kwargs
+):
+ """
+ get a particular policy client
+ :param use_v0: whether this should be a v0 client or
+ :return: A policy client
+ """
+ if pdp_url is None:
+ raise ValueError("POLICY_SYNC_PDP_URL set or --pdp flag not set")
+
+ pdp_headers = {
+ "Accept": APPLICATION_JSON,
+ "Content-Type": APPLICATION_JSON
+ }
+
+ if 'pdp_user' in kwargs and 'pdp_password' in kwargs:
+ auth = base64.b64encode(
+ "{}:{}".format(
+ kwargs.get('pdp_user'),
+ kwargs.get('pdp_password')
+ ).encode("utf-8")
+ )
+ pdp_headers["Authorization"] = "Basic {}".format(auth.decode("utf-8"))
+
+ dmaap_headers = {
+ "Accept": APPLICATION_JSON,
+ "Content-Type": APPLICATION_JSON
+ }
+
+ logger.info(kwargs.get('dmaap_password'))
+ if 'dmaap_user' in kwargs and 'dmaap_password' in kwargs:
+ auth = base64.b64encode(
+ "{}:{}".format(
+ kwargs.get('dmaap_user'),
+ kwargs.get('dmaap_password')
+ ).encode("utf-8")
+ ).decode("utf-8")
+ dmaap_headers["Authorization"] = f"Basic {auth}"
+
+ # Create client (either v0 or v1) based on arguments)
+ if use_v0:
+ return PolicyClientV0(
+ pdp_headers,
+ pdp_url,
+ decision_endpoint=kwargs.get('v0_decision'),
+ ws_endpoint=kwargs.get('v0_notifications')
+ )
+
+ return PolicyClientV1(
+ pdp_headers,
+ pdp_url,
+ v1_decision=kwargs.get('v1_decision'),
+ dmaap_url=kwargs.get('dmaap_url'),
+ dmaap_headers=dmaap_headers
+ )