diff options
author | Tony Hansen <tony@att.com> | 2021-02-18 01:15:28 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2021-02-18 01:15:28 +0000 |
commit | 5f2aa5fa415ad402c6c3acac1d5138515daf68a3 (patch) | |
tree | 11e43aadd64cb29ffb14b63c737cb0e88c87fd1c /dcae-services-policy-sync | |
parent | 6118a2bb89e3a57d429616f7f8e182046d0008a5 (diff) | |
parent | fffe41c078fa427f2a62035ee2d6cc5cd407238c (diff) |
Merge "Seed policysync container code"
Diffstat (limited to 'dcae-services-policy-sync')
18 files changed, 2573 insertions, 0 deletions
diff --git a/dcae-services-policy-sync/Dockerfile b/dcae-services-policy-sync/Dockerfile new file mode 100644 index 0000000..f317a85 --- /dev/null +++ b/dcae-services-policy-sync/Dockerfile @@ -0,0 +1,46 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +FROM nexus3.onap.org:10001/onap/integration-python:7.0.1 as build + + +USER root + +RUN python3 -m venv /policysync +# Need GCC, musl and associated dependencies to compile dependencies against musl +RUN apk add --no-cache --virtual .build-deps gcc musl-dev + +WORKDIR /app + + +# Install dependencies first to speed up builds +ADD setup.py setup.py +RUN /policysync/bin/pip install -e . + +# Add the code now +ADD policysync policysync +RUN /policysync/bin/pip install . + +FROM nexus3.onap.org:10001/onap/integration-python:7.0.1 as runtime + +COPY --from=build /policysync /policysync + +USER onap +ENTRYPOINT [ "/policysync/bin/policysync" ] + + + + diff --git a/dcae-services-policy-sync/README.md b/dcae-services-policy-sync/README.md new file mode 100644 index 0000000..519aea6 --- /dev/null +++ b/dcae-services-policy-sync/README.md @@ -0,0 +1,150 @@ +# Policy Sync
+This page serves as an implementation for the Policy sync container described in the [wiki](https://wiki.onap.org/display/DW/Policy+function+as+Sidecar)
+
+
+Policy Sync utility is a python based utility that interfaces with the ONAP/ECOMP policy websocket and REST APIs. It is designed to keep a local listing of policies in sync with an environment's policy distribution point (PDP). It functions well as a Kubernetes sidecar container which can pull down the latest policies for consumption by an application container.
+
+The sync utility primarily utilizes the PDP's websocket notification API to receive policy update notifications. It also includes a periodic check of the PDP for resilliency purposes in the event of websocket API issues.
+
+
+## Build and Run
+Easiest way to use is via docker by building the provided docker file
+
+```bash
+docker build . -t policy-puller
+```
+
+If you want to run it in a non containerized environment, an easy way is to use python virtual environments.
+```bash
+# Create a virtual environment in venv folder and activate it
+python3 -m venv venv
+source venv/bin/activate
+
+# install the utility
+pip install .
+
+# Utility is now installed and usable in your virtual environment. Test it with:
+policysync -h
+```
+
+## Configuration
+
+Configuration is currently done via either env variables or by flag. Flags take precedence env variables, env variables take precedence over default
+
+### General configuration
+General configuration that is used regardless of which PDP API you are using.
+
+| ENV Variable | Flag | Description | Default |
+| --------------------------| -------------------|----------------------------------------------|-----------------------------------|
+| POLICY_SYNC_PDP_URL | --pdp-url | PDP URL to query | None (must be set in env or flag) |
+| POLICY_SYNC_FILTER | --filters | yaml list of regex of policies to match | [] |
+| POLICY_SYNC_ID | --ids | yaml list of ids of policies to match | [] |
+| POLICY_SYNC_DURATION | --duration | duration in seconds for periodic checks | 2600 |
+| POLICY_SYNC_OUTFILE | --outfile | File to output policies to | ./policies.json |
+| POLICY_SYNC_PDP_USER | --pdp-user | Set user if you need basic auth for PDP | None |
+| POLICY_SYNC_PDP_PASS | --pdp-password | Set pass if you need basic auth for PDP | None |
+| POLICY_SYNC_HTTP_METRICS | --http-metrics | Whether to expose prometheus metrics | True |
+| POLICY_SYNC_HTTP_BIND | --http-bind | host:port for exporting prometheus metrics | localhost:8000 |
+| POLICY_SYNC_LOGGING_CONFIG| --logging-config | Path to a python formatted logging file | None (logs will write to stderr) |
+| POLICY_SYNC_V0_ENABLE | --use-v0 | Set to true to enable usage of legacy v0 API | False |
+
+### V1 Specific Configuration (Used as of the Dublin release)
+Configurable variables used for the V1 API used in the ONAP Dublin Release.
+
+Note: Policy filters are not currently supported in the current policy release but will be eventually.
+
+| ENV Variable | Flag | Description | Default |
+| ---------------------------------|------------------------|----------------------------------------|------------------------------|
+| POLICY_SYNC_V1_DECISION_ENDPOINT | --v1-decision-endpoint | Endpoint to query for PDP decisions | policy/pdpx/v1/decision |
+| POLICY_SYNC_V1_DMAAP_URL | --v1-dmaap-topic | Dmaap url with topic for notifications | None |
+| POLICY_SYNC_V1_DMAAP_USER | --v1-dmaap-user | User to use for DMaaP notifications | None |
+| POLICY_SYNC_V1_DMAAP_PASS | --v1-dmaap-pass | Password to use for DMaaP notifications| None |
+
+
+
+### V0 Specific Configuration (Legacy Policy API)
+Configurable variables used for the legacy V0 API Prior to the ONAP release. Only valid when --use-v0 is set to True
+
+
+| ENV Variable | Flag | Description | Default |
+| ---------------------------------|------------------------|----------------------------------------|------------------------------|
+| POLICY_SYNC_V0_NOTIFIY_ENDPOINT | --v0-notifiy-endpoint | websock endpoint for pdp notifications | pdp/notifications |
+| POLICY_SYNC_V0_DECISION_ENDPOINT | --v0-decision-endpoint | rest endpoint for pdp decisions | pdp/api |
+
+## Usage
+
+You can run in a pure docker setup:
+```bash
+# Run the container
+docker run
+ --env POLICY_SYNC_PDP_USER=<username> \
+ --env POLICY_SYNC_PDP_PASS=<password> \
+ --env POLICY_SYNC_PDP_URL=<path_to_pdp> \
+ --env POLICY_SYNC_V1_DMAAP_URL='https://<dmaap_host>:3905/events/<dmaap_topic>' \
+ --env POLICY_SYNC_V1_DMAAP_PASS='<user>' \
+ --env POLICY_SYNC_V1_DMAAP_USER='<pass>' \
+ --env POLICY_SYNC_ID=['DCAE.Config_MS_AGING_UVERSE_PROD'] \
+ -v $(pwd)/policy-volume:/etc/policy \
+ nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.policy-sync:1.0.0
+```
+
+Or on Kubernetes:
+```yaml
+# policy-config-map
+apiVersion: v1
+kind: policy-config-map
+metadata:
+ name: special-config
+ namespace: default
+data:
+ POLICY_SYNC_PDP_USER: myusername
+ POLICY_SYNC_PDP_PASS: mypassword
+ POLICY_SYNC_PDP_URL: <path_to_pdp>
+ POLICY_SYNC_V1_DMAAP_URL: 'https://<dmaap_host>:3905/events/<dmaap_topic>' \
+ POLICY_SYNC_V1_DMAAP_PASS: '<user>' \
+ POLICY_SYNC_V1_DMAAP_USER: '<pass>' \
+ POLICY_SYNC_FILTER: '["DCAE.Config_MS_AGING_UVERSE_PROD"]'
+
+
+---
+
+apiVersion: v1
+kind: Pod
+metadata:
+ name: Sidecar sample app
+spec:
+ restartPolicy: Never
+
+
+ # The shared volume that the two containers use to communicate...empty dir for simplicity
+ volumes:
+ - name: policy-shared
+ emptyDir: {}
+
+ containers:
+
+ # Sample app that uses inotifyd (part of busybox/alpine). For demonstration purposes only...
+ - name: main
+ image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.policy-sync:1.0.0
+ volumeMounts:
+ - name: policy-shared
+ mountPath: /etc/policies.json
+ subPath: policies.json
+ # For details on what this does see: https://wiki.alpinelinux.org/wiki/Inotifyd
+ # you can replace '-' arg below with a shell script to do more interesting
+ cmd: [ "inotifyd", "-", "/etc/policies.json:c" ]
+
+
+ # The sidecar app which keeps the policies in sync
+ - name: policy-sync
+ image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.policy-sync:1.0.0
+ envFrom:
+ - configMapRef:
+ name: special-config
+
+ volumeMounts:
+ - name: policy-shared
+ mountPath: /etc/policies
+```
+
+
diff --git a/dcae-services-policy-sync/policysync/__init__.py b/dcae-services-policy-sync/policysync/__init__.py new file mode 100644 index 0000000..7c04dfd --- /dev/null +++ b/dcae-services-policy-sync/policysync/__init__.py @@ -0,0 +1,16 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" nothing here """ diff --git a/dcae-services-policy-sync/policysync/clients.py b/dcae-services-policy-sync/policysync/clients.py new file mode 100644 index 0000000..698bc86 --- /dev/null +++ b/dcae-services-policy-sync/policysync/clients.py @@ -0,0 +1,512 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +"""Clients for communicating with both the post dublin and pre dublin APIs""" +import json +import re +import base64 +import uuid +import asyncio +import aiohttp +import policysync.metrics as metrics +from .util import get_module_logger + +logger = get_module_logger(__name__) + +# Websocket config +WS_HEARTBEAT = 60 +WS_NOTIFICATIONS_ENDPOINT = "pdp/notifications" +# REST config +V1_DECISION_ENDPOINT = "policy/pdpx/v1/decision" +V0_DECISION_ENDPOINT = "pdp/api" + +APPLICATION_JSON = "application/json" + + +def get_single_regex(filters, ids): + """given a list of filters and ids returns a single regex for matching""" + filters = [] if filters is None else filters + ids = [] if ids is None else ["{}[.][0-9]+[.]xml".format(x) for x in ids] + return "|".join(filters + ids) if filters is not None else "" + + +class BasePolicyClient: + """ Base policy client that is pluggable into inventory """ + def __init__(self, pdp_url, headers=None): + self.headers = {} if headers is None else headers + self.session = None + self.pdp_url = pdp_url + + def _init_rest_session(self): + """ + initialize an aiohttp rest session + :returns: an aiohttp rest session + """ + if self.session is None: + self.session = aiohttp.ClientSession( + headers=self.headers, raise_for_status=True + ) + + return self.session + + async def _run_request(self, endpoint, request_data): + """ + execute a particular REST request + :param endpoint: str rest endpoint to query + :param request_data: dictionary request data + :returns: dictionary response data + """ + session = self._init_rest_session() + async with session.post( + "{0}/{1}".format(self.pdp_url, endpoint), json=request_data + ) as resp: + data = await resp.read() + return json.loads(data) + + def supports_notifications(self): + """ + does this particular client support real time notifictions + :returns: True + """ + # in derived classes we may use self + # pylint: disable=no-self-use + return True + + async def list_policies(self, filters=None, ids=None): + """ + used to get a list of policies matching a particular ID + :param filters: list of regex filter strings for matching + :param ids: list of id strings for matching + :returns: List of policies matching filters or ids + """ + raise NotImplementedError + + async def get_config(self, filters=None, ids=None): + """ + used to get a list of policies matching a particular ID + :returns: List of policies matching filters or ids + """ + raise NotImplementedError + + async def notificationhandler(self, callback, ids=None, filters=None): + """ + Clients should implement this to support real time notifications + :param callback: func to execute when a matching notification is found + :param ids: list of id strings for matching + """ + raise NotImplementedError + + async def close(self): + """ close the policy client """ + logger.info("closing websocket clients...") + if self.session: + await self.session.close() + + +class PolicyClientV0(BasePolicyClient): + """ + Supports the legacy v0 policy API use prior to ONAP Dublin + """ + async def close(self): + """ close the policy client """ + await super().close() + if self.ws_session is not None: + await self.ws_session.close() + + def __init__( + self, + headers, + pdp_url, + decision_endpoint=V0_DECISION_ENDPOINT, + ws_endpoint=WS_NOTIFICATIONS_ENDPOINT + ): + """ + Initialize a v0 policy client + :param headers: Headers to use for policy rest api + :param pdp_url: URL of the PDP + :param decision_endpoint: root for the decison API + :param websocket_endpoint: root of the websocket endpoint + """ + super().__init__(pdp_url, headers=headers) + self.ws_session = None + self.session = None + self.decision_endpoint = decision_endpoint + self.ws_endpoint = ws_endpoint + self._ws = None + + def _init_ws_session(self): + """initialize a websocket session for notifications""" + if self.ws_session is None: + self.ws_session = aiohttp.ClientSession() + + return self.ws_session + + @metrics.list_policy_exceptions.count_exceptions() + async def list_policies(self, filters=None, ids=None): + """ + used to get a list of policies matching a particular ID + :param filters: list of regex filter strings for matching + :param ids: list of id strings for matching + :returns: List of policies matching filters or ids + """ + request_data = self._prepare_request(filters, ids) + policies = await self._run_request( + f"{self.decision_endpoint}/listPolicy", request_data + ) + return set(policies) + + @classmethod + def _prepare_request(cls, filters, ids): + """prepare the request body for the v0 api""" + regex = get_single_regex(filters, ids) + return {"policyName": regex} + + @metrics.get_config_exceptions.count_exceptions() + async def get_config(self, filters=None, ids=None): + """ + Used to get the actual policy configuration from PDP + :return: the policy objects that are currently active + for the given set of filters + """ + request_data = self._prepare_request(filters, ids) + policies = await self._run_request( + f"{self.decision_endpoint}/getConfig", request_data) + + for policy in policies: + try: + policy["config"] = json.loads(policy["config"]) + except json.JSONDecodeError: + pass + + return policies + + @classmethod + def _needs_update(cls, update, ids=None, filters=None): + """ + Expect something like this + { + "removedPolicies": [{ + "policyName": "xyz.45.xml", + "versionNo": "45" + }], + "loadedPolicies": [{ + "policyName": "xyz.46.xml", + "versionNo": "46", + "matches": { + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "guard": "false", + "location": " Edge", + "TTLDate": "NA", + "uuid": "TestUUID", + "RiskLevel": "5", + "RiskType": "default" + }, + "updateType": "UPDATE" + }], + "notificationType": "BOTH" + } + """ + for policy in update.get("removedPolicies", []) + update.get( + "loadedPolicies", [] + ): + if ( + re.match(get_single_regex(filters, ids), policy["policyName"]) + is not None + ): + return True + + return False + + async def notificationhandler(self, callback, ids=None, filters=None): + """ + websocket based notification handler for + :param callback: function to execute when + a matching notification is found + :param ids: list of id strings for matching + """ + + url = self.pdp_url.replace("https", "wss") + + # The websocket we start here will periodically + # send heartbeat (ping frames) to policy + # this ensures that we are never left hanging + # with our communication with policy. + session = self._init_ws_session() + try: + websocket = await session.ws_connect( + "{0}/{1}".format(url, self.ws_endpoint), heartbeat=WS_HEARTBEAT + ) + logger.info("websock with policy established") + async for msg in websocket: + # check for websocket errors + # break out of this async for loop. to attempt reconnection + if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): + break + + if msg.type is (aiohttp.WSMsgType.TEXT): + if self._needs_update( + json.loads(msg.data), + ids=ids, + filters=filters + ): + logger.debug( + "notification received from pdp websocket -> %s", msg + ) + await callback() + else: + logger.warning( + "unexpected websocket message type received %s", msg.type + ) + except aiohttp.ClientError: + logger.exception("Received connection error with websocket") + + +class PolicyClientV1(BasePolicyClient): + """ + Supports the v1 policy API introduced in ONAP's dublin release + """ + + async def close(self): + """ close the policy client """ + await super().close() + if self.dmaap_session is not None: + await self.dmaap_session.close() + + def _init_dmaap_session(self): + """ initialize a dmaap session for notifications """ + if self.dmaap_session is None: + self.dmaap_session = aiohttp.ClientSession( + headers=self.dmaap_headers, + raise_for_status=True + ) + + return self.dmaap_session + + def __init__( + self, + headers, + pdp_url, + **kwargs, + ): + super().__init__(pdp_url, headers=headers) + self._ws = None + self.audit_uuid = str(uuid.uuid4()) + self.dmaap_url = kwargs.get('dmaap_url') + self.dmaap_timeout = 15000 + self.dmaap_session = None + self.dmaap_headers = kwargs.get('dmaap_headers', {}) + self.decision = kwargs.get('v1_decision', V1_DECISION_ENDPOINT) + + async def list_policies(self, filters=None, ids=None): + """ + ONAP has no real equivalent to this. + :returns: None + """ + # in derived classes we may use self + # pylint: disable=no-self-use + return None + + @classmethod + def convert_to_policy(cls, policy_body): + """ + Convert raw policy to format expected by microservices + :param policy_body: raw dictionary output from pdp + :returns: data in proper formatting + """ + pdp_metadata = policy_body.get("metadata", {}) + policy_id = pdp_metadata.get("policy-id") + policy_version = policy_body.get("version") + if not policy_id or policy_version is None: + logger.warning("Malformed policy is missing policy-id and version") + return None + + policy_body["policyName"] = "{}.{}.xml".format( + policy_id, str(policy_version.replace(".", "-")) + ) + policy_body["policyVersion"] = str(policy_version) + if "properties" in policy_body: + policy_body["config"] = policy_body["properties"] + del policy_body["properties"] + + return policy_body + + @metrics.get_config_exceptions.count_exceptions() + async def get_config(self, filters=None, ids=None): + """ + Used to get the actual policy configuration from PDP + :returns: the policy objects that are currently active + for the given set of filters + """ + if ids is None: + ids = [] + + request_data = { + "ONAPName": "DCAE", + "ONAPComponent": "policy-sync", + "ONAPInstance": self.audit_uuid, + "action": "configure", + "resource": {"policy-id": ids} + } + + data = await self._run_request(self.decision, request_data) + out = [] + for policy_body in data["policies"].values(): + policy = self.convert_to_policy(policy_body) + if policy is not None: + out.append(policy) + + return out + + def supports_notifications(self): + """ + Does this policy client support real time notifications + :returns: True if the dmaap url is set else return false + """ + return self.dmaap_url is not None + + @classmethod + def _needs_update(cls, update, ids): + """ + expect something like this + { + "deployed-policies": [ + { + "policy-type": "onap.policies.monitoring.tcagen2", + "policy-type-version": "1.0.0", + "policy-id": "onap.scaleout.tca", + "policy-version": "2.0.0", + "success-count": 3, + "failure-count": 0 + } + ], + "undeployed-policies": [ + { + "policy-type": "onap.policies.monitoring.tcagen2", + "policy-type-version": "1.0.0", + "policy-id": "onap.firewall.tca", + "policy-version": "6.0.0", + "success-count": 3, + "failure-count": 0 + } + ] + } + """ + for policy in update.get("deployed-policies", []) + update.get( + "undeployed-policies", [] + ): + if policy.get("policy-id") in ids: + return True + + return False + + async def poll_dmaap(self, callback, ids=None): + """ + one GET request to dmaap + :param callback: function to execute when a + matching notification is found + :param ids: list of id strings for matching + """ + query = f"?timeout={self.dmaap_timeout}" + url = f"{self.dmaap_url}/{self.audit_uuid}/0{query}" + logger.info("polling topic: %s", url) + session = self._init_dmaap_session() + try: + async with session.get(url) as response: + messages = await response.read() + + for msg in json.loads(messages): + if self._needs_update(json.loads(msg), ids): + logger.info( + "notification received from dmaap -> %s", msg + ) + await callback() + except aiohttp.ClientError: + logger.exception('received connection error from dmaap topic') + # wait some time + await asyncio.sleep(30) + + async def notificationhandler(self, callback, ids=None, filters=None): + """ + dmaap based notification handler for + :param callback: function to execute when a + matching notification is found + :param ids: list of id strings for matching + """ + if filters is not None: + logger.warning("filters are not supported with pdp v1..ignoring") + while True: + await self.poll_dmaap(callback, ids=ids) + + +def get_client( + pdp_url, + use_v0=False, + **kwargs +): + """ + get a particular policy client + :param use_v0: whether this should be a v0 client or + :return: A policy client + """ + if pdp_url is None: + raise ValueError("POLICY_SYNC_PDP_URL set or --pdp flag not set") + + pdp_headers = { + "Accept": APPLICATION_JSON, + "Content-Type": APPLICATION_JSON + } + + if 'pdp_user' in kwargs and 'pdp_password' in kwargs: + auth = base64.b64encode( + "{}:{}".format( + kwargs.get('pdp_user'), + kwargs.get('pdp_password') + ).encode("utf-8") + ) + pdp_headers["Authorization"] = "Basic {}".format(auth.decode("utf-8")) + + dmaap_headers = { + "Accept": APPLICATION_JSON, + "Content-Type": APPLICATION_JSON + } + + logger.info(kwargs.get('dmaap_password')) + if 'dmaap_user' in kwargs and 'dmaap_password' in kwargs: + auth = base64.b64encode( + "{}:{}".format( + kwargs.get('dmaap_user'), + kwargs.get('dmaap_password') + ).encode("utf-8") + ).decode("utf-8") + dmaap_headers["Authorization"] = f"Basic {auth}" + + # Create client (either v0 or v1) based on arguments) + if use_v0: + return PolicyClientV0( + pdp_headers, + pdp_url, + decision_endpoint=kwargs.get('v0_decision'), + ws_endpoint=kwargs.get('v0_notifications') + ) + + return PolicyClientV1( + pdp_headers, + pdp_url, + v1_decision=kwargs.get('v1_decision'), + dmaap_url=kwargs.get('dmaap_url'), + dmaap_headers=dmaap_headers + ) diff --git a/dcae-services-policy-sync/policysync/cmd.py b/dcae-services-policy-sync/policysync/cmd.py new file mode 100644 index 0000000..9055674 --- /dev/null +++ b/dcae-services-policy-sync/policysync/cmd.py @@ -0,0 +1,234 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" +CLI parsing for the sync utility. +convert flags/env variables to configuration +""" +import argparse +import collections +import os +import sys +import logging +import logging.config +from urllib.parse import urlsplit +import yaml +import policysync.clients as clients +import policysync.coroutines +from .util import get_module_logger + + +logger = get_module_logger(__name__) + +APPLICATION_JSON = "application/json" + + +Config = collections.namedtuple( + 'Config', ['out_file', 'check_period', 'filters', 'ids', 'client', 'bind']) + + +def parsecmd(args): + """ + Parse the command into a config object + :param args: arguments list for parsing + :returns: Config for the policy sync + """ + parser = argparse.ArgumentParser( + description="Keeps a file updated with policies matching a filter.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "--out", + type=str, + default=os.environ.get("POLICY_SYNC_OUTFILE", "policies.json"), + help="Output file to dump to", + ) + + parser.add_argument( + "--duration", + type=int, + default=os.environ.get("POLICY_SYNC_DURATION", 1200), + help="frequency (in seconds) to conduct periodic check", + ) + + parser.add_argument( + "--filters", + type=str, + default=os.environ.get("POLICY_SYNC_FILTER", "[]"), + help="Regex of policies that you are interested in.", + ) + parser.add_argument( + "--ids", + type=str, + default=os.environ.get("POLICY_SYNC_ID", "[]"), + help="Specific names of policies you are interested in.", + ) + + parser.add_argument( + "--pdp-user", + type=str, + default=os.environ.get("POLICY_SYNC_PDP_USER", None), + help="PDP basic auth username", + ) + parser.add_argument( + "--pdp-pass", + type=str, + default=os.environ.get("POLICY_SYNC_PDP_PASS", None), + help="PDP basic auth password", + ) + + parser.add_argument( + "--pdp-url", + type=str, + default=os.environ.get("POLICY_SYNC_PDP_URL", None), + help="PDP to connect to", + ) + + parser.add_argument( + "--http-bind", + type=str, + default=os.environ.get("POLICY_SYNC_HTTP_BIND", "localhost:8000"), + help="The bind address for container metrics", + ) + + parser.add_argument( + "--http-metrics", + type=bool, + default=os.environ.get("POLICY_SYNC_HTTP_METRICS", True), + help="turn on or off the prometheus metrics", + ) + + parser.add_argument( + "--use-v0", + type=bool, + default=os.environ.get("POLICY_SYNC_V0_ENABLE", False), + help="Turn on usage of the legacy v0 policy API", + ) + + parser.add_argument( + "--logging-config", + type=str, + default=os.environ.get("POLICY_SYNC_LOGGING_CONFIG", None), + help="Python formatted logging configuration file", + ) + + # V0 API specific configuration + parser.add_argument( + "--v0-notify-endpoint", + type=str, + default=os.environ.get( + "POLICY_SYNC_V0_NOTIFIY_ENDPOINT", "pdp/notifications" + ), + help="Path of the v0 websocket notification", + ) + + parser.add_argument( + "--v0-decision-endpoint", + type=str, + default=os.environ.get("POLICY_SYNC_V0_DECISION_ENDPOINT", "pdp/api"), + help="path of the v0 decision endpoint", + ) + + # V1 API specific configuration + parser.add_argument( + "--v1-dmaap-topic", + type=str, + default=os.environ.get("POLICY_SYNC_V1_DMAAP_URL", None), + help="URL of the dmaap topic used in v1 api for notifications", + ) + + parser.add_argument( + "--v1-dmaap-user", + type=str, + default=os.environ.get("POLICY_SYNC_V1_DMAAP_USER", None), + help="User to use with with the dmaap topic" + ) + + parser.add_argument( + "--v1-dmaap-pass", + type=str, + default=os.environ.get("POLICY_SYNC_V1_DMAAP_PASS", None), + help="Password to use with the dmaap topic" + ) + + parser.add_argument( + "--v1-decision-endpoint", + type=str, + default=os.environ.get( + "POLICY_SYNC_V1_PDP_DECISION_ENDPOINT", + "policy/pdpx/v1/decision" + ), + help="Decision endpoint used in the v1 api for notifications", + ) + + args = parser.parse_args(args) + + if args.logging_config: + logging.config.fileConfig( + args.logging_config, + disable_existing_loggers=False + ) + else: + handler = logging.StreamHandler() + formatter = logging.Formatter( + "[%(asctime)s][%(levelname)-5s]%(message)s" + ) + root = logging.getLogger() + handler.setFormatter(formatter) + root.addHandler(handler) + root.setLevel(logging.INFO) + + bind = args.http_bind if args.http_metrics else None + + client = clients.get_client( + args.pdp_url, + pdp_user=args.pdp_user, + pdp_password=args.pdp_pass, + use_v0=args.use_v0, + v0_decision=args.v0_decision_endpoint, + v0_notifications=args.v0_notify_endpoint, + v1_decision=args.v1_decision_endpoint, + dmaap_url=args.v1_dmaap_topic, + dmaap_user=args.v1_dmaap_user, + dmaap_password=args.v1_dmaap_pass + ) + + if bind is not None: + bind = urlsplit("//" + bind) + + return Config( + out_file=args.out, + check_period=args.duration, + filters=yaml.safe_load(args.filters), + ids=yaml.safe_load(args.ids), + client=client, + bind=bind, + ) + + +def main(): + """ + Parse the arguments passed in via the command line and start the app + """ + try: + config = parsecmd(sys.argv[1:]) + except ValueError: + logger.error( + "There was no POLICY_SYNC_PDP_URL set or --pdp flag set" + ) + return -1 + policysync.coroutines.start_event_loop(config) + return 0 diff --git a/dcae-services-policy-sync/policysync/coroutines.py b/dcae-services-policy-sync/policysync/coroutines.py new file mode 100644 index 0000000..236d6f2 --- /dev/null +++ b/dcae-services-policy-sync/policysync/coroutines.py @@ -0,0 +1,182 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" +Asyncio coroutine setup for both periodic and real time notification tasks """ +import signal +import asyncio +from prometheus_client import start_http_server +from .inventory import Inventory +from .util import get_module_logger + +SLEEP_ON_ERROR = 10 +logger = get_module_logger(__name__) + + +async def notify_task(inventory, sleep): + """ + start the notification task + :param inventory: Inventory + :param sleep: how long to wait on error in seconds + """ + + logger.info("opening notificationhandler for policy...") + await inventory.client.notificationhandler( + inventory.check_and_update, + ids=inventory.policy_ids, + filters=inventory.policy_filters, + ) + logger.warning("websocket closed or errored...will attempt reconnection") + await asyncio.sleep(sleep) + + +async def periodic_task(inventory, sleep): + """ + start the periodic task + :param inventory: Inventory + :param sleep: how long to wait between periodic checks + """ + await asyncio.sleep(sleep) + logger.info("Executing periodic check of PDP policies") + await inventory.update() + + +async def task_runner(inventory, sleep, task, should_run): + """ + Runs a task in an event loop + :param inventory: Inventory + :param sleep: how long to wait between loop iterations + :param task: coroutine to run + :param should_run: function for should this task continue to run + """ + # pylint: disable=broad-except + while should_run(): + try: + await task(inventory, sleep) + except asyncio.CancelledError: + break + except Exception: + logger.exception("Received exception") + + +async def shutdown(loop, tasks, inventory): + """ + shutdown the event loop and cancel all tasks + :param loop: Asyncio eventloop + :param tasks: list of asyncio tasks + :param inventory: the inventory object + """ + + logger.info("caught signal") + # Stop the websocket routines + for task in tasks: + task.cancel() + await task + + # Close the client + await inventory.close() + loop.stop() + + +def _setup_coroutines( + loop, + inventory, + shutdown_handler, + task_r, + **kwargs +): + """ sets up the application coroutines""" + # Task runner takes a function for stop condition + # (for testing purposes) but should always run in practice + # pylint: disable=broad-except + def infinite_condition(): + return True + + logger.info("Starting gather of all policies...") + try: + loop.run_until_complete(inventory.gather()) + except Exception: + logger.exception('received exception on initial gather') + + # websocket and the periodic check of policies + tasks = [ + loop.create_task( + task_r( + inventory, + kwargs.get('check_period', 2400), + periodic_task, + infinite_condition + ) + ) + ] + + if inventory.client.supports_notifications(): + tasks.append( + loop.create_task( + task_r( + inventory, + SLEEP_ON_ERROR, + notify_task, + infinite_condition + ) + ) + ) + else: + logger.warning( + "Defaulting to polling... Provide a dmaap url to receive faster updates" + ) + + # Add shutdown handlers for sigint and sigterm + for signame in ("SIGINT", "SIGTERM"): + sig = getattr(signal, signame) + loop.add_signal_handler( + sig, + lambda: asyncio.ensure_future( + shutdown_handler(loop, tasks, inventory) + ), + ) + + # Start prometheus server daemonthread for metrics/healthchecking + if 'bind' in kwargs: + metrics_server = kwargs.get('metrics_server', start_http_server) + metrics_server(kwargs['bind'].port, addr=kwargs['bind'].hostname) + + +def start_event_loop(config): + """ + start the event loop that runs the application + :param config: Config object for the application + """ + loop = asyncio.get_event_loop() + inventory = Inventory( + config.filters, + config.ids, + config.out_file, + config.client + ) + + _setup_coroutines( + loop, + inventory, + shutdown, + task_runner, + metrics_server=start_http_server, + bind=config.bind, + check_period=config.check_period + ) + + loop.run_forever() + loop.close() + logger.info("shutdown complete") diff --git a/dcae-services-policy-sync/policysync/inventory.py b/dcae-services-policy-sync/policysync/inventory.py new file mode 100644 index 0000000..0eb91b5 --- /dev/null +++ b/dcae-services-policy-sync/policysync/inventory.py @@ -0,0 +1,169 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" In memory data store for policies which are currently used by a mS """ +import asyncio +import json +import uuid +import os +import tempfile +import aiohttp +from datetime import datetime +from .util import get_module_logger + +logger = get_module_logger(__name__) + +ACTION_GATHERED = "gathered" +ACTION_UPDATED = "updated" +OUTFILE_INDENT = 4 + + +class Inventory: + """ In memory data store for policies which are currently used by a mS """ + def __init__(self, filters, ids, outfile, client): + self.policy_filters = filters + self.policy_ids = ids + self.hp_active_inventory = set() + self.get_lock = asyncio.Lock() + self.file = outfile + self.queue = asyncio.Queue() + self.client = client + + async def gather(self): + """ + Run at startup to gather an initial inventory of policies + """ + return await self._sync_inventory(ACTION_GATHERED) + + async def update(self): + """ + Run to update an inventory of policies on the fly + """ + return await self._sync_inventory(ACTION_UPDATED) + + async def check_and_update(self): + """ check and update the policy inventory """ + return await self.update() + + async def close(self): + """ close the policy inventory and its associated client """ + await self.client.close() + + def _atomic_dump(self, data): + """ atomically dump the policy content to a file by rename """ + try: + temp_file = tempfile.NamedTemporaryFile( + delete=False, + dir=os.path.dirname(self.file), + prefix=os.path.basename(self.file), + mode="w", + ) + try: + temp_file.write(data) + finally: + # fsync the file so its on disk + temp_file.flush() + os.fsync(temp_file.fileno()) + finally: + temp_file.close() + + os.rename(temp_file.name, os.path.abspath(self.file)) + + async def get_policy_content(self, action=ACTION_UPDATED): + """ + get the policy content off the PDP + :param action: what action to present + :returns: True/False depending on if update was successful + """ + logger.info("Starting policy update process...") + try: + policy_bodies = await self.client.get_config( + filters=self.policy_filters, ids=self.policy_ids + ) + except aiohttp.ClientError: + logger.exception('Conncection Error while connecting to PDP') + return False + + # match the format a bit of the Config Binding Service + out = { + "policies": {"items": policy_bodies}, + "event": { + "action": action, + "timestamp": (datetime.utcnow().isoformat()[:-3] + "Z"), + "update_id": str(uuid.uuid4()), + "policies_count": len(policy_bodies), + }, + } + + # Atomically dump the file to disk + tmp = { + x.get("policyName") for x in policy_bodies if "policyName" in x + } + + if tmp != self.hp_active_inventory: + data = json.dumps(out) + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._atomic_dump, data) + logger.info( + "Update complete. Policies dumped to: %s", self.file + ) + self.hp_active_inventory = tmp + return True + else: + logger.info("No updates needed for now") + return False + + async def _sync_inventory(self, action): + """ + Pull an inventory of policies. Commit changes if there is a change. + return: boolean to represent whether changes were commited + """ + try: + pdp_inventory = await self.client.list_policies( + filters=self.policy_filters, ids=self.policy_ids + ) + except aiohttp.ClientError: + logger.exception("Inventory sync failed due to a connection error") + return False + + logger.debug("pdp_inventory -> %s", pdp_inventory) + + # Below needs to be under a lock because of + # the call to getConfig being awaited. + async with self.get_lock: + if self.hp_active_inventory != pdp_inventory or \ + pdp_inventory is None: + + # Log a delta of what has changed related to this policy update + if pdp_inventory is not None and \ + self.hp_active_inventory is not None: + msg = { + "removed": list( + self.hp_active_inventory - pdp_inventory + ), + "added": list( + pdp_inventory - self.hp_active_inventory + ), + } + logger.info( + "PDP indicates the following changes: %s ", msg + ) + + return await self.get_policy_content(action) + + logger.info( + "local matches pdp. no update required for now" + ) + return False diff --git a/dcae-services-policy-sync/policysync/metrics.py b/dcae-services-policy-sync/policysync/metrics.py new file mode 100644 index 0000000..7f825fc --- /dev/null +++ b/dcae-services-policy-sync/policysync/metrics.py @@ -0,0 +1,38 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" counters and gaugages for various metrics """ +from prometheus_client import Counter, Gauge + + +policy_updates_counter = Counter( + "policy_updates", "Number of total policy updates commited" +) +websock_closures = Counter( + "websocket_errors_and_closures", "Number of websocket closures or errors" +) +list_policy_exceptions = Counter( + "list_policy_exception", + "Exceptions that have occured as a result of calling listPolicy", +) +get_config_exceptions = Counter( + "get_config_exception", + "Exceptions that have occured as a result of calling getConfig", +) + +active_policies_gauge = Gauge( + "active_policies", + "Number of policies that have been retrieved off the PDP" +) diff --git a/dcae-services-policy-sync/policysync/util.py b/dcae-services-policy-sync/policysync/util.py new file mode 100644 index 0000000..1bbac5a --- /dev/null +++ b/dcae-services-policy-sync/policysync/util.py @@ -0,0 +1,10 @@ +""" utility functions (currenlty just for logging) """ +import logging + + +def get_module_logger(mod_name): + """ + To use this, do logger = get_module_logger(__name__) + """ + logger = logging.getLogger(mod_name) + return logger diff --git a/dcae-services-policy-sync/pom.xml b/dcae-services-policy-sync/pom.xml new file mode 100644 index 0000000..f5d38d8 --- /dev/null +++ b/dcae-services-policy-sync/pom.xml @@ -0,0 +1,172 @@ +<?xml version="1.0"?> +<!-- +================================================================================ +Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +================================================================================ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END========================================================= + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.onap.dcaegen2.deployments</groupId> + <artifactId>deployments</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <groupId>org.onap.dcaegen2.deployments</groupId> + <artifactId>dcae-services-policy-sync</artifactId> + <name>dcaegen2-deployments-dcae-services-policy-sync</name> + <version>1.0.0</version> + <url>http://maven.apache.org</url> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <sonar.skip>true</sonar.skip> + <sonar.sources>.</sonar.sources> + <!-- customize the SONARQUBE URL --> + <!-- sonar.host.url>http://localhost:9000</sonar.host.url --> + <!-- below are language dependent --> + <!-- for Python --> + <sonar.language>py</sonar.language> + <sonar.pluginName>Python</sonar.pluginName> + <sonar.inclusions>**/*.py</sonar.inclusions> + <!-- for JavaScaript --> + <!-- + <sonar.language>js</sonar.language> + <sonar.pluginName>JS</sonar.pluginName> + <sonar.inclusions>**/*.js</sonar.inclusions> + --> + </properties> + <build> + <finalName>${project.artifactId}-${project.version}</finalName> + <plugins> + <!-- plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.4.1</version> + <configuration> + <descriptors> + <descriptor>assembly/dep.xml</descriptor> + </descriptors> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin --> + <!-- now we configure custom action (calling a script) at various lifecycle phases --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <executions> + <execution> + <id>clean phase script</id> + <phase>clean</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <arguments> + <argument>${project.artifactId}</argument> + <argument>clean</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>generate-sources script</id> + <phase>generate-sources</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <arguments> + <argument>${project.artifactId}</argument> + <argument>generate-sources</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>compile script</id> + <phase>compile</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <arguments> + <argument>${project.artifactId}</argument> + <argument>compile</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>package script</id> + <phase>package</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <arguments> + <argument>${project.artifactId}</argument> + <argument>package</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>test script</id> + <phase>test</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <arguments> + <argument>${project.artifactId}</argument> + <argument>test</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>install script</id> + <phase>install</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <arguments> + <argument>${project.artifactId}</argument> + <argument>install</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>deploy script</id> + <phase>deploy</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <arguments> + <argument>${project.artifactId}</argument> + <argument>deploy</argument> + </arguments> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project>
\ No newline at end of file diff --git a/dcae-services-policy-sync/setup.py b/dcae-services-policy-sync/setup.py new file mode 100644 index 0000000..f5de9a2 --- /dev/null +++ b/dcae-services-policy-sync/setup.py @@ -0,0 +1,30 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from setuptools import setup, find_packages + +setup( + name="policysync", + author="Chris Luckenbaugh", + version="1.0.0", + packages=find_packages(), + include_package_data=True, + install_requires=["aiohttp>=2.3", "PyYAML", "prometheus_client"], + entry_points=""" + [console_scripts] + policysync=policysync.cmd:main + """, +) diff --git a/dcae-services-policy-sync/tests/mocks.py b/dcae-services-policy-sync/tests/mocks.py new file mode 100644 index 0000000..9a3d6cd --- /dev/null +++ b/dcae-services-policy-sync/tests/mocks.py @@ -0,0 +1,191 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from urllib.parse import urlsplit +import asyncio, aiohttp + + +class MockConfig: + def __init__(self): + self.check_period = 60 + self.quiet_period = 0 + self.bind = urlsplit("//localhost:8080") + + +class MockFileDumper: + def __init__(self): + self.closed = False + + async def close(self): + self.closed = True + + +class MockInventory: + def __init__(self, queue=None): + self.was_updated = False + self.was_gathered = False + self.client = MockClient() + self.queue = queue + self.quiet = 0 + self.updates = [] + self.policy_filters = [] + self.policy_ids = [] + + async def update(self): + self.was_updated = True + return True + + async def gather(self): + self.was_gathered = True + print("got here GATHERED") + return True + + async def close(self): + self.client.closed = True + + async def check_and_update(self): + await self.update() + + async def get_policy_content(self, action="UPDATED"): + self.updates.append(action) + + +class MockClient: + def __init__(self, raise_on_listpolicies=False, raise_on_getconfig=False): + self.closed = False + self.opened = False + self.raise_on_listpolicies = raise_on_listpolicies + self.raise_on_getconfig = raise_on_getconfig + + async def close(self): + self.closed = True + + async def notificationhandler(self, callback, ids=[], filters=[]): + await callback() + + def supports_notifications(self): + return True + + async def list_policies(self, filters=[], ids=[]): + if self.raise_on_listpolicies: + raise aiohttp.ClientError + + return set( + [ + "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml" + ] + ) + + async def get_config(self, filters=[], ids=[]): + if self.raise_on_getconfig: + raise aiohttp.ClientError + + return [ + { + "policyConfigMessage": "Config Retrieved!", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": { + "service": "DCAE_HighlandPark_AgingConfig", + "location": " Edge", + "uuid": "TestUUID", + "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging", + "configName": "DCAE_HighlandPark_AgingConfig", + "templateVersion": "1607", + "priority": "4", + "version": 11.0, + "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8", + "riskType": "test", + "riskLevel": "2", + "guard": "False", + "content": { + "signature": { + "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')" + }, + "name": "testAging", + "context": ["PROD"], + "priority": 1, + "prePublishAging": 40, + "preCorrelationAging": 20, + }, + "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging", + }, + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + { + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": "adlskjfadslkjf", + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + ] + + +class MockLoop: + def __init__(self): + self.stopped = False + self.handlers = [] + self.tasks = [] + + def stop(self): + self.stopped = True + + def add_signal_handler(self, signal, handler): + self.handlers.append(signal) + + def create_task(self, task): + self.tasks.append(task) + + def run_until_complete(self, task): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(task) + + +class MockTask: + def __init__(self): + self.canceled = False + + def cancel(self): + self.canceled = True + + def __await__(self): + return iter([]) diff --git a/dcae-services-policy-sync/tests/test_client_v0.py b/dcae-services-policy-sync/tests/test_client_v0.py new file mode 100644 index 0000000..6ca590e --- /dev/null +++ b/dcae-services-policy-sync/tests/test_client_v0.py @@ -0,0 +1,191 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from aiohttp import web, WSMsgType +import json, pytest, re +from policysync.clients import ( + PolicyClientV0 as PolicyClient, + WS_HEARTBEAT +) + + +async def listpolicy(request): + return web.json_response(["hello"]) + + +async def getconfig(request): + j = [ + { + "policyConfigMessage": "Config Retrieved!", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": '{"service":"DCAE_HighlandPark_AgingConfig","location":" Edge","uuid":"TestUUID","policyName":"DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging","configName":"DCAE_HighlandPark_AgingConfig","templateVersion":"1607","priority":"4","version":11.0,"policyScope":"resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8","riskType":"test","riskLevel":"2","guard":"False","content":{"signature":{"filter_clause":"event.faultFields.alarmCondition LIKE(\'%chrisluckenbaugh%\')"},"name":"testAging","context":["PROD"],"priority":1,"prePublishAging":40,"preCorrelationAging":20},"policyNameWithPrefix":"DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging"}', + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + { + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": "adlskjfadslkjf", + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + ] + + return web.json_response(j) + + +async def wshandler(request): + resp = web.WebSocketResponse() + available = resp.can_prepare(request) + await resp.prepare(request) + await resp.send_str('{ "loadedPolicies": [{ "policyName": "bar"}] }') + await resp.send_bytes(b"bar!!!") + await resp.close("closed") + + +@pytest.fixture +def policyclient(aiohttp_client, loop): + app = web.Application() + app.router.add_route("POST", "/pdp/api/listPolicy", listpolicy) + app.router.add_route("POST", "/pdp/api/getConfig", getconfig) + app.router.add_get("/pdp/notifications", wshandler) + fake_client = loop.run_until_complete(aiohttp_client(app)) + server = "{}://{}:{}".format("http", fake_client.host, fake_client.port) + return PolicyClient({}, server) + + +async def test_listpolicies(policyclient): + j = await policyclient.list_policies(filters=["bar"]) + assert j == set(["hello"]) + await policyclient.close() + assert policyclient.session.closed + + +async def test_getconfig(policyclient): + j = await policyclient.get_config(filters=["bar"]) + + assert j == [ + { + "policyConfigMessage": "Config Retrieved!", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": { + "service": "DCAE_HighlandPark_AgingConfig", + "location": " Edge", + "uuid": "TestUUID", + "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging", + "configName": "DCAE_HighlandPark_AgingConfig", + "templateVersion": "1607", + "priority": "4", + "version": 11.0, + "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8", + "riskType": "test", + "riskLevel": "2", + "guard": "False", + "content": { + "signature": { + "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')" + }, + "name": "testAging", + "context": ["PROD"], + "priority": 1, + "prePublishAging": 40, + "preCorrelationAging": 20, + }, + "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging", + }, + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + { + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": "adlskjfadslkjf", + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + ] + await policyclient.close() + + +async def test_supports_notifications(policyclient): + assert policyclient.supports_notifications() + + +async def test_needs_update(policyclient): + assert policyclient._needs_update( + {"loadedPolicies": [{"policyName": "bar"}]}, [], ["bar"] + ) + assert not policyclient._needs_update( + {"loadedPolicies": [{"policyName": "bar"}]}, [], ["foo"] + ) + + +async def test_ws(policyclient): + async def ws_callback(): + assert True + + await policyclient.notificationhandler(ws_callback, filters=["bar"]) + await policyclient.close() + + assert policyclient.ws_session.closed diff --git a/dcae-services-policy-sync/tests/test_client_v1.py b/dcae-services-policy-sync/tests/test_client_v1.py new file mode 100644 index 0000000..6994a6f --- /dev/null +++ b/dcae-services-policy-sync/tests/test_client_v1.py @@ -0,0 +1,216 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from aiohttp import web, WSMsgType +import json, pytest, re +from policysync.clients import PolicyClientV1 as PolicyClient + +DECISION_ENDPOINT = 'policy/pdpx/v1/decision' +async def get_decision(request): + req_data = await request.json() + assert req_data['ONAPName'] == 'DCAE' + assert req_data['ONAPComponent'] == 'policy-sync' + assert req_data['action'] == 'configure' + assert req_data['resource'] == { + 'policy-id': [ + 'onap.scaleout.tca', + 'onap.restart.tca' + ] + } + + + j = { + "policies": { + "onap.scaleout.tca": { + "type": "onap.policies.monitoring.cdap.tca.hi.lo.app", + "version": "1.0.0", + "metadata": {"policy-id": "onap.scaleout.tca"}, + "properties": { + "tca_policy": { + "domain": "measurementsForVfScaling", + "metricsPerEventName": [ + { + "eventName": "vLoadBalancer", + "controlLoopSchemaType": "VNF", + "policyScope": "type=configuration", + "policyName": "onap.scaleout.tca", + "policyVersion": "v0.0.1", + "thresholds": [ + { + "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3", + "closedLoopEventStatus": "ONSET", + "version": "1.0.2", + "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated", + "thresholdValue": 500, + "direction": "LESS_OR_EQUAL", + "severity": "MAJOR", + }, + { + "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3", + "closedLoopEventStatus": "ONSET", + "version": "1.0.2", + "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated", + "thresholdValue": 5000, + "direction": "GREATER_OR_EQUAL", + "severity": "CRITICAL", + }, + ], + } + ], + } + }, + }, + "onap.restart.tca": { + "type": "onap.policies.monitoring.cdap.tca.hi.lo.app", + "version": "1.0.0", + "metadata": {"policy-id": "onap.restart.tca", "policy-version": 1}, + "properties": { + "tca_policy": { + "domain": "measurementsForVfScaling", + "metricsPerEventName": [ + { + "eventName": "Measurement_vGMUX", + "controlLoopSchemaType": "VNF", + "policyScope": "DCAE", + "policyName": "DCAE.Config_tca-hi-lo", + "policyVersion": "v0.0.1", + "thresholds": [ + { + "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e", + "version": "1.0.2", + "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value", + "thresholdValue": 0, + "direction": "EQUAL", + "severity": "MAJOR", + "closedLoopEventStatus": "ABATED", + }, + { + "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e", + "version": "1.0.2", + "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value", + "thresholdValue": 0, + "direction": "GREATER", + "severity": "CRITICAL", + "closedLoopEventStatus": "ONSET", + }, + ], + } + ], + } + }, + }, + } + } + + return web.json_response(j) + + +@pytest.fixture +def policyclient(aiohttp_client, loop): + app = web.Application() + app.router.add_route("POST", "/" + DECISION_ENDPOINT, get_decision) + fake_client = loop.run_until_complete(aiohttp_client(app)) + server = "{}://{}:{}".format("http", fake_client.host, fake_client.port) + return PolicyClient({}, server) + + +async def test_getconfig(policyclient): + j = await policyclient.get_config(ids=['onap.scaleout.tca', 'onap.restart.tca' ]) + assert j == [{ + "type": "onap.policies.monitoring.cdap.tca.hi.lo.app", + "version": "1.0.0", + "metadata": { + "policy-id": "onap.scaleout.tca" + }, + "policyName": "onap.scaleout.tca.1-0-0.xml", + "policyVersion": "1.0.0", + "config": { + "tca_policy": { + "domain": "measurementsForVfScaling", + "metricsPerEventName": [{ + "eventName": "vLoadBalancer", + "controlLoopSchemaType": "VNF", + "policyScope": "type=configuration", + "policyName": "onap.scaleout.tca", + "policyVersion": "v0.0.1", + "thresholds": [{ + "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3", + "closedLoopEventStatus": "ONSET", + "version": "1.0.2", + "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated", + "thresholdValue": 500, + "direction": "LESS_OR_EQUAL", + "severity": "MAJOR" + }, + { + "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3", + "closedLoopEventStatus": "ONSET", + "version": "1.0.2", + "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated", + "thresholdValue": 5000, + "direction": "GREATER_OR_EQUAL", + "severity": "CRITICAL" + } + ] + }] + } + } + }, { + "type": "onap.policies.monitoring.cdap.tca.hi.lo.app", + "version": "1.0.0", + "metadata": { + "policy-id": "onap.restart.tca", + "policy-version": 1 + }, + "policyName": "onap.restart.tca.1-0-0.xml", + "policyVersion": "1.0.0", + "config": { + "tca_policy": { + "domain": "measurementsForVfScaling", + "metricsPerEventName": [{ + "eventName": "Measurement_vGMUX", + "controlLoopSchemaType": "VNF", + "policyScope": "DCAE", + "policyName": "DCAE.Config_tca-hi-lo", + "policyVersion": "v0.0.1", + "thresholds": [{ + "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e", + "version": "1.0.2", + "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value", + "thresholdValue": 0, + "direction": "EQUAL", + "severity": "MAJOR", + "closedLoopEventStatus": "ABATED" + }, + { + "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e", + "version": "1.0.2", + "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value", + "thresholdValue": 0, + "direction": "GREATER", + "severity": "CRITICAL", + "closedLoopEventStatus": "ONSET" + } + ] + }] + } + } + }] + await policyclient.close() + + +async def test_supports_notifications(policyclient): + assert not policyclient.supports_notifications() diff --git a/dcae-services-policy-sync/tests/test_cmd.py b/dcae-services-policy-sync/tests/test_cmd.py new file mode 100644 index 0000000..3c061c0 --- /dev/null +++ b/dcae-services-policy-sync/tests/test_cmd.py @@ -0,0 +1,79 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +import pytest, json, sys, logging, logging.config +from policysync.cmd import Config, main, parsecmd +import policysync.coroutines + + +class TestConfig: + def test_parse_args(self): + args = [ + "--out", + "out", + "--pdp-user", + "chris", + "--pdp-pass", + "notapassword", + "--pdp-url", + "blah", + "--duration", + "60", + "--filters", + "[blah]", + ] + + c = parsecmd(args) + + assert c.filters == ["blah"] + assert c.check_period == 60 + assert c.out_file == "out" + + def test_parse_args_no_auth(self): + c = parsecmd( + ["--out", "out", "--pdp-url", "blah", "--duration", "60", "--filters", "[blah]"] + ) + + assert c.client.pdp_url == "blah" + assert c.filters == ["blah"] + assert c.check_period == 60 + assert c.out_file == "out" + + def test_parse_args_no_pdp(self): + args = [] + with pytest.raises(ValueError): + parsecmd(args) + + def test_parse_bad_bind(self): + args = [ + "--out", + "out", + "--pdp-user", + "chris", + "--pdp-pass", + "notapassword", + "--pdp-url", + "blah", + "--duration", + "60", + "--filters", + "[blah]", + "--http-bind", + "l[ocalhost:100", + ] + + with pytest.raises(ValueError): + parsecmd(args) diff --git a/dcae-services-policy-sync/tests/test_coroutines.py b/dcae-services-policy-sync/tests/test_coroutines.py new file mode 100644 index 0000000..4c90ae8 --- /dev/null +++ b/dcae-services-policy-sync/tests/test_coroutines.py @@ -0,0 +1,142 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +import pytest, json, sys, asyncio, signal +from tests.mocks import ( + MockClient, + MockTask, + MockLoop, + MockInventory, + MockConfig, + MockFileDumper, +) +from policysync.coroutines import ( + shutdown, + periodic_task, + notify_task, + task_runner, + _setup_coroutines, + SLEEP_ON_ERROR, +) +import policysync.coroutines as coroutines + + +async def test_shutdownhandler(): + client = MockClient() + tasks = [MockTask()] + loop = MockLoop() + inventory = MockInventory() + + await shutdown( loop, tasks, inventory) + + # Assert that a shutdown results in all tasks in the loop being canceled + for x in tasks: + assert x.canceled + + # ... And the the PDP client is closed + assert inventory.client.closed + + # ... And that the event loop is stopped + assert loop.stopped + + +async def test_periodic(): + inventory = MockInventory() + await periodic_task(inventory, 1) + assert inventory.was_updated + + +async def test_ws(): + inventory = MockInventory() + await notify_task(inventory, 1) + assert inventory.was_updated + + +async def test_task_runner(): + def should_run(): + if should_run.counter == 0: + should_run.counter += 1 + return True + else: + return False + + should_run.counter = 0 + + def mocktask(inventory): + assert True + + await task_runner(MockInventory(), 1, mocktask, should_run) + + +async def test_task_runner_cancel(): + def should_run(): + if should_run.counter == 0: + should_run.counter += 1 + return True + elif should_run.counter == 1: + # If we get here then fail the test + assert False, "Task runner should have broken out of loop before this" + return False + + should_run.counter = 0 + + # We create a mock task that raises a cancellation error (sent when a asyncio task is canceled) + def mocktask(inventory, sleep): + raise asyncio.CancelledError + + await task_runner(MockInventory(), 1, mocktask, should_run) + + +def test_setup_coroutines(): + loop = MockLoop() + + def fake_task_runner(inventory, sleep, task, should_run): + return (sleep, task) + + def fake_shutdown(sig, loop, tasks, client): + return sig + + def fake_metrics_server(port, addr=None): + fake_metrics_server.started = True + + fake_metrics_server.started = False + + inventory = MockInventory() + client = MockClient() + config = MockConfig() + + _setup_coroutines( + loop, + inventory, + fake_shutdown, + fake_task_runner, + metrics_server=fake_metrics_server, + check_period=config.check_period, + bind=config.bind, + ) + + # By the end of setup coroutines we should have... + + # Gathered initial set of policies + assert inventory.was_gathered + + # started the websocket and periodic task running + assert (SLEEP_ON_ERROR, notify_task) in loop.tasks + assert (config.check_period, periodic_task) in loop.tasks + + # Signal handlers for SIGINT and SIGTERM + assert signal.SIGINT in loop.handlers + assert signal.SIGTERM in loop.handlers diff --git a/dcae-services-policy-sync/tests/test_inventory.py b/dcae-services-policy-sync/tests/test_inventory.py new file mode 100644 index 0000000..5b6f21b --- /dev/null +++ b/dcae-services-policy-sync/tests/test_inventory.py @@ -0,0 +1,153 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +import pytest, json, aiohttp, asyncio +from policysync.inventory import ( + Inventory, + ACTION_GATHERED, + ACTION_UPDATED, +) +from tests.mocks import MockClient + + +class MockMessage: + def __init__(self, type, data): + self.type = type + self.data = data + + +@pytest.fixture() +def inventory(request, tmpdir): + f1 = tmpdir.mkdir("sub").join("myfile") + print(f1) + return Inventory(["DCAE.Config_MS_AGING_UVERSE_PROD_.*"], [], f1, MockClient()) + + +class TestInventory: + @pytest.mark.asyncio + async def test_close(self, inventory): + await inventory.close() + assert inventory.client.closed + + @pytest.mark.asyncio + async def test_get_policy_content(self, inventory): + await inventory.get_policy_content() + with open(inventory.file) as f: + data = json.load(f) + + assert data["policies"] == { + "items": [ + { + "policyConfigMessage": "Config Retrieved!", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": { + "service": "DCAE_HighlandPark_AgingConfig", + "location": " Edge", + "uuid": "TestUUID", + "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging", + "configName": "DCAE_HighlandPark_AgingConfig", + "templateVersion": "1607", + "priority": "4", + "version": 11.0, + "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8", + "riskType": "test", + "riskLevel": "2", + "guard": "False", + "content": { + "signature": { + "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')" + }, + "name": "testAging", + "context": ["PROD"], + "priority": 1, + "prePublishAging": 40, + "preCorrelationAging": 20, + }, + "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging", + }, + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + { + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": "adlskjfadslkjf", + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + ] + } + + assert data["event"]["action"] == ACTION_UPDATED + + @pytest.mark.asyncio + async def test_update(self, inventory): + await inventory.update() + assert len(inventory.hp_active_inventory) == 1 + + assert not await inventory.update() + + @pytest.mark.asyncio + async def test_update_listpolicies_exception(self, inventory): + inventory.client.raise_on_listpolicies = True + assert not await inventory.update() + + @pytest.mark.asyncio + async def test_update_getconfig_exception(self, inventory): + inventory.client.raise_on_getconfig = True + await inventory.get_policy_content() + + @pytest.mark.asyncio + async def test_gather(self, inventory): + await inventory.gather() + + # We should gather one policy + assert len(inventory.hp_active_inventory) == 1 + + # type in event should be gather + with open(inventory.file) as f: + data = json.load(f) + + assert data["event"]["action"] == ACTION_GATHERED + + @pytest.mark.asyncio + async def test_ws_text(self, inventory): + result = await inventory.check_and_update() + assert result == True diff --git a/dcae-services-policy-sync/tox.ini b/dcae-services-policy-sync/tox.ini new file mode 100644 index 0000000..17235c7 --- /dev/null +++ b/dcae-services-policy-sync/tox.ini @@ -0,0 +1,42 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +# content of: tox.ini , put in same dir as setup.py +[tox] +envlist = py36, py38 + +[testenv] +deps= + pytest + coverage + pytest-cov + pytest-asyncio + pytest-aiohttp +setenv = + PYTHONPATH={toxinidir} + REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt +recreate = True +commands= + python --version + pytest -s --cov policysync --cov-report=xml --cov-report=term tests --verbose + +[testenv:lint] +deps = + flake8 + pylint +commands = + flake8 policysync + pylint policysync
\ No newline at end of file |