summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTony Hansen <tony@att.com>2021-02-18 01:15:28 +0000
committerGerrit Code Review <gerrit@onap.org>2021-02-18 01:15:28 +0000
commit5f2aa5fa415ad402c6c3acac1d5138515daf68a3 (patch)
tree11e43aadd64cb29ffb14b63c737cb0e88c87fd1c
parent6118a2bb89e3a57d429616f7f8e182046d0008a5 (diff)
parentfffe41c078fa427f2a62035ee2d6cc5cd407238c (diff)
Merge "Seed policysync container code"
-rw-r--r--dcae-services-policy-sync/Dockerfile46
-rw-r--r--dcae-services-policy-sync/README.md150
-rw-r--r--dcae-services-policy-sync/policysync/__init__.py16
-rw-r--r--dcae-services-policy-sync/policysync/clients.py512
-rw-r--r--dcae-services-policy-sync/policysync/cmd.py234
-rw-r--r--dcae-services-policy-sync/policysync/coroutines.py182
-rw-r--r--dcae-services-policy-sync/policysync/inventory.py169
-rw-r--r--dcae-services-policy-sync/policysync/metrics.py38
-rw-r--r--dcae-services-policy-sync/policysync/util.py10
-rw-r--r--dcae-services-policy-sync/pom.xml172
-rw-r--r--dcae-services-policy-sync/setup.py30
-rw-r--r--dcae-services-policy-sync/tests/mocks.py191
-rw-r--r--dcae-services-policy-sync/tests/test_client_v0.py191
-rw-r--r--dcae-services-policy-sync/tests/test_client_v1.py216
-rw-r--r--dcae-services-policy-sync/tests/test_cmd.py79
-rw-r--r--dcae-services-policy-sync/tests/test_coroutines.py142
-rw-r--r--dcae-services-policy-sync/tests/test_inventory.py153
-rw-r--r--dcae-services-policy-sync/tox.ini42
-rwxr-xr-xmvn-phase-script.sh21
-rw-r--r--pom.xml1
20 files changed, 2595 insertions, 0 deletions
diff --git a/dcae-services-policy-sync/Dockerfile b/dcae-services-policy-sync/Dockerfile
new file mode 100644
index 0000000..f317a85
--- /dev/null
+++ b/dcae-services-policy-sync/Dockerfile
@@ -0,0 +1,46 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+FROM nexus3.onap.org:10001/onap/integration-python:7.0.1 as build
+
+
+USER root
+
+RUN python3 -m venv /policysync
+# Need GCC, musl and associated dependencies to compile dependencies against musl
+RUN apk add --no-cache --virtual .build-deps gcc musl-dev
+
+WORKDIR /app
+
+
+# Install dependencies first to speed up builds
+ADD setup.py setup.py
+RUN /policysync/bin/pip install -e .
+
+# Add the code now
+ADD policysync policysync
+RUN /policysync/bin/pip install .
+
+FROM nexus3.onap.org:10001/onap/integration-python:7.0.1 as runtime
+
+COPY --from=build /policysync /policysync
+
+USER onap
+ENTRYPOINT [ "/policysync/bin/policysync" ]
+
+
+
+
diff --git a/dcae-services-policy-sync/README.md b/dcae-services-policy-sync/README.md
new file mode 100644
index 0000000..519aea6
--- /dev/null
+++ b/dcae-services-policy-sync/README.md
@@ -0,0 +1,150 @@
+# Policy Sync
+This page serves as an implementation for the Policy sync container described in the [wiki](https://wiki.onap.org/display/DW/Policy+function+as+Sidecar)
+
+
+Policy Sync utility is a python based utility that interfaces with the ONAP/ECOMP policy websocket and REST APIs. It is designed to keep a local listing of policies in sync with an environment's policy distribution point (PDP). It functions well as a Kubernetes sidecar container which can pull down the latest policies for consumption by an application container.
+
+The sync utility primarily utilizes the PDP's websocket notification API to receive policy update notifications. It also includes a periodic check of the PDP for resilliency purposes in the event of websocket API issues.
+
+
+## Build and Run
+Easiest way to use is via docker by building the provided docker file
+
+```bash
+docker build . -t policy-puller
+```
+
+If you want to run it in a non containerized environment, an easy way is to use python virtual environments.
+```bash
+# Create a virtual environment in venv folder and activate it
+python3 -m venv venv
+source venv/bin/activate
+
+# install the utility
+pip install .
+
+# Utility is now installed and usable in your virtual environment. Test it with:
+policysync -h
+```
+
+## Configuration
+
+Configuration is currently done via either env variables or by flag. Flags take precedence env variables, env variables take precedence over default
+
+### General configuration
+General configuration that is used regardless of which PDP API you are using.
+
+| ENV Variable | Flag | Description | Default |
+| --------------------------| -------------------|----------------------------------------------|-----------------------------------|
+| POLICY_SYNC_PDP_URL | --pdp-url | PDP URL to query | None (must be set in env or flag) |
+| POLICY_SYNC_FILTER | --filters | yaml list of regex of policies to match | [] |
+| POLICY_SYNC_ID | --ids | yaml list of ids of policies to match | [] |
+| POLICY_SYNC_DURATION | --duration | duration in seconds for periodic checks | 2600 |
+| POLICY_SYNC_OUTFILE | --outfile | File to output policies to | ./policies.json |
+| POLICY_SYNC_PDP_USER | --pdp-user | Set user if you need basic auth for PDP | None |
+| POLICY_SYNC_PDP_PASS | --pdp-password | Set pass if you need basic auth for PDP | None |
+| POLICY_SYNC_HTTP_METRICS | --http-metrics | Whether to expose prometheus metrics | True |
+| POLICY_SYNC_HTTP_BIND | --http-bind | host:port for exporting prometheus metrics | localhost:8000 |
+| POLICY_SYNC_LOGGING_CONFIG| --logging-config | Path to a python formatted logging file | None (logs will write to stderr) |
+| POLICY_SYNC_V0_ENABLE | --use-v0 | Set to true to enable usage of legacy v0 API | False |
+
+### V1 Specific Configuration (Used as of the Dublin release)
+Configurable variables used for the V1 API used in the ONAP Dublin Release.
+
+Note: Policy filters are not currently supported in the current policy release but will be eventually.
+
+| ENV Variable | Flag | Description | Default |
+| ---------------------------------|------------------------|----------------------------------------|------------------------------|
+| POLICY_SYNC_V1_DECISION_ENDPOINT | --v1-decision-endpoint | Endpoint to query for PDP decisions | policy/pdpx/v1/decision |
+| POLICY_SYNC_V1_DMAAP_URL | --v1-dmaap-topic | Dmaap url with topic for notifications | None |
+| POLICY_SYNC_V1_DMAAP_USER | --v1-dmaap-user | User to use for DMaaP notifications | None |
+| POLICY_SYNC_V1_DMAAP_PASS | --v1-dmaap-pass | Password to use for DMaaP notifications| None |
+
+
+
+### V0 Specific Configuration (Legacy Policy API)
+Configurable variables used for the legacy V0 API Prior to the ONAP release. Only valid when --use-v0 is set to True
+
+
+| ENV Variable | Flag | Description | Default |
+| ---------------------------------|------------------------|----------------------------------------|------------------------------|
+| POLICY_SYNC_V0_NOTIFIY_ENDPOINT | --v0-notifiy-endpoint | websock endpoint for pdp notifications | pdp/notifications |
+| POLICY_SYNC_V0_DECISION_ENDPOINT | --v0-decision-endpoint | rest endpoint for pdp decisions | pdp/api |
+
+## Usage
+
+You can run in a pure docker setup:
+```bash
+# Run the container
+docker run
+ --env POLICY_SYNC_PDP_USER=<username> \
+ --env POLICY_SYNC_PDP_PASS=<password> \
+ --env POLICY_SYNC_PDP_URL=<path_to_pdp> \
+ --env POLICY_SYNC_V1_DMAAP_URL='https://<dmaap_host>:3905/events/<dmaap_topic>' \
+ --env POLICY_SYNC_V1_DMAAP_PASS='<user>' \
+ --env POLICY_SYNC_V1_DMAAP_USER='<pass>' \
+ --env POLICY_SYNC_ID=['DCAE.Config_MS_AGING_UVERSE_PROD'] \
+ -v $(pwd)/policy-volume:/etc/policy \
+ nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.policy-sync:1.0.0
+```
+
+Or on Kubernetes:
+```yaml
+# policy-config-map
+apiVersion: v1
+kind: policy-config-map
+metadata:
+ name: special-config
+ namespace: default
+data:
+ POLICY_SYNC_PDP_USER: myusername
+ POLICY_SYNC_PDP_PASS: mypassword
+ POLICY_SYNC_PDP_URL: <path_to_pdp>
+ POLICY_SYNC_V1_DMAAP_URL: 'https://<dmaap_host>:3905/events/<dmaap_topic>' \
+ POLICY_SYNC_V1_DMAAP_PASS: '<user>' \
+ POLICY_SYNC_V1_DMAAP_USER: '<pass>' \
+ POLICY_SYNC_FILTER: '["DCAE.Config_MS_AGING_UVERSE_PROD"]'
+
+
+---
+
+apiVersion: v1
+kind: Pod
+metadata:
+ name: Sidecar sample app
+spec:
+ restartPolicy: Never
+
+
+ # The shared volume that the two containers use to communicate...empty dir for simplicity
+ volumes:
+ - name: policy-shared
+ emptyDir: {}
+
+ containers:
+
+ # Sample app that uses inotifyd (part of busybox/alpine). For demonstration purposes only...
+ - name: main
+ image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.policy-sync:1.0.0
+ volumeMounts:
+ - name: policy-shared
+ mountPath: /etc/policies.json
+ subPath: policies.json
+ # For details on what this does see: https://wiki.alpinelinux.org/wiki/Inotifyd
+ # you can replace '-' arg below with a shell script to do more interesting
+ cmd: [ "inotifyd", "-", "/etc/policies.json:c" ]
+
+
+ # The sidecar app which keeps the policies in sync
+ - name: policy-sync
+ image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.policy-sync:1.0.0
+ envFrom:
+ - configMapRef:
+ name: special-config
+
+ volumeMounts:
+ - name: policy-shared
+ mountPath: /etc/policies
+```
+
+
diff --git a/dcae-services-policy-sync/policysync/__init__.py b/dcae-services-policy-sync/policysync/__init__.py
new file mode 100644
index 0000000..7c04dfd
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/__init__.py
@@ -0,0 +1,16 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+""" nothing here """
diff --git a/dcae-services-policy-sync/policysync/clients.py b/dcae-services-policy-sync/policysync/clients.py
new file mode 100644
index 0000000..698bc86
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/clients.py
@@ -0,0 +1,512 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Clients for communicating with both the post dublin and pre dublin APIs"""
+import json
+import re
+import base64
+import uuid
+import asyncio
+import aiohttp
+import policysync.metrics as metrics
+from .util import get_module_logger
+
+logger = get_module_logger(__name__)
+
+# Websocket config
+WS_HEARTBEAT = 60
+WS_NOTIFICATIONS_ENDPOINT = "pdp/notifications"
+# REST config
+V1_DECISION_ENDPOINT = "policy/pdpx/v1/decision"
+V0_DECISION_ENDPOINT = "pdp/api"
+
+APPLICATION_JSON = "application/json"
+
+
+def get_single_regex(filters, ids):
+ """given a list of filters and ids returns a single regex for matching"""
+ filters = [] if filters is None else filters
+ ids = [] if ids is None else ["{}[.][0-9]+[.]xml".format(x) for x in ids]
+ return "|".join(filters + ids) if filters is not None else ""
+
+
+class BasePolicyClient:
+ """ Base policy client that is pluggable into inventory """
+ def __init__(self, pdp_url, headers=None):
+ self.headers = {} if headers is None else headers
+ self.session = None
+ self.pdp_url = pdp_url
+
+ def _init_rest_session(self):
+ """
+ initialize an aiohttp rest session
+ :returns: an aiohttp rest session
+ """
+ if self.session is None:
+ self.session = aiohttp.ClientSession(
+ headers=self.headers, raise_for_status=True
+ )
+
+ return self.session
+
+ async def _run_request(self, endpoint, request_data):
+ """
+ execute a particular REST request
+ :param endpoint: str rest endpoint to query
+ :param request_data: dictionary request data
+ :returns: dictionary response data
+ """
+ session = self._init_rest_session()
+ async with session.post(
+ "{0}/{1}".format(self.pdp_url, endpoint), json=request_data
+ ) as resp:
+ data = await resp.read()
+ return json.loads(data)
+
+ def supports_notifications(self):
+ """
+ does this particular client support real time notifictions
+ :returns: True
+ """
+ # in derived classes we may use self
+ # pylint: disable=no-self-use
+ return True
+
+ async def list_policies(self, filters=None, ids=None):
+ """
+ used to get a list of policies matching a particular ID
+ :param filters: list of regex filter strings for matching
+ :param ids: list of id strings for matching
+ :returns: List of policies matching filters or ids
+ """
+ raise NotImplementedError
+
+ async def get_config(self, filters=None, ids=None):
+ """
+ used to get a list of policies matching a particular ID
+ :returns: List of policies matching filters or ids
+ """
+ raise NotImplementedError
+
+ async def notificationhandler(self, callback, ids=None, filters=None):
+ """
+ Clients should implement this to support real time notifications
+ :param callback: func to execute when a matching notification is found
+ :param ids: list of id strings for matching
+ """
+ raise NotImplementedError
+
+ async def close(self):
+ """ close the policy client """
+ logger.info("closing websocket clients...")
+ if self.session:
+ await self.session.close()
+
+
+class PolicyClientV0(BasePolicyClient):
+ """
+ Supports the legacy v0 policy API use prior to ONAP Dublin
+ """
+ async def close(self):
+ """ close the policy client """
+ await super().close()
+ if self.ws_session is not None:
+ await self.ws_session.close()
+
+ def __init__(
+ self,
+ headers,
+ pdp_url,
+ decision_endpoint=V0_DECISION_ENDPOINT,
+ ws_endpoint=WS_NOTIFICATIONS_ENDPOINT
+ ):
+ """
+ Initialize a v0 policy client
+ :param headers: Headers to use for policy rest api
+ :param pdp_url: URL of the PDP
+ :param decision_endpoint: root for the decison API
+ :param websocket_endpoint: root of the websocket endpoint
+ """
+ super().__init__(pdp_url, headers=headers)
+ self.ws_session = None
+ self.session = None
+ self.decision_endpoint = decision_endpoint
+ self.ws_endpoint = ws_endpoint
+ self._ws = None
+
+ def _init_ws_session(self):
+ """initialize a websocket session for notifications"""
+ if self.ws_session is None:
+ self.ws_session = aiohttp.ClientSession()
+
+ return self.ws_session
+
+ @metrics.list_policy_exceptions.count_exceptions()
+ async def list_policies(self, filters=None, ids=None):
+ """
+ used to get a list of policies matching a particular ID
+ :param filters: list of regex filter strings for matching
+ :param ids: list of id strings for matching
+ :returns: List of policies matching filters or ids
+ """
+ request_data = self._prepare_request(filters, ids)
+ policies = await self._run_request(
+ f"{self.decision_endpoint}/listPolicy", request_data
+ )
+ return set(policies)
+
+ @classmethod
+ def _prepare_request(cls, filters, ids):
+ """prepare the request body for the v0 api"""
+ regex = get_single_regex(filters, ids)
+ return {"policyName": regex}
+
+ @metrics.get_config_exceptions.count_exceptions()
+ async def get_config(self, filters=None, ids=None):
+ """
+ Used to get the actual policy configuration from PDP
+ :return: the policy objects that are currently active
+ for the given set of filters
+ """
+ request_data = self._prepare_request(filters, ids)
+ policies = await self._run_request(
+ f"{self.decision_endpoint}/getConfig", request_data)
+
+ for policy in policies:
+ try:
+ policy["config"] = json.loads(policy["config"])
+ except json.JSONDecodeError:
+ pass
+
+ return policies
+
+ @classmethod
+ def _needs_update(cls, update, ids=None, filters=None):
+ """
+ Expect something like this
+ {
+ "removedPolicies": [{
+ "policyName": "xyz.45.xml",
+ "versionNo": "45"
+ }],
+ "loadedPolicies": [{
+ "policyName": "xyz.46.xml",
+ "versionNo": "46",
+ "matches": {
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "guard": "false",
+ "location": " Edge",
+ "TTLDate": "NA",
+ "uuid": "TestUUID",
+ "RiskLevel": "5",
+ "RiskType": "default"
+ },
+ "updateType": "UPDATE"
+ }],
+ "notificationType": "BOTH"
+ }
+ """
+ for policy in update.get("removedPolicies", []) + update.get(
+ "loadedPolicies", []
+ ):
+ if (
+ re.match(get_single_regex(filters, ids), policy["policyName"])
+ is not None
+ ):
+ return True
+
+ return False
+
+ async def notificationhandler(self, callback, ids=None, filters=None):
+ """
+ websocket based notification handler for
+ :param callback: function to execute when
+ a matching notification is found
+ :param ids: list of id strings for matching
+ """
+
+ url = self.pdp_url.replace("https", "wss")
+
+ # The websocket we start here will periodically
+ # send heartbeat (ping frames) to policy
+ # this ensures that we are never left hanging
+ # with our communication with policy.
+ session = self._init_ws_session()
+ try:
+ websocket = await session.ws_connect(
+ "{0}/{1}".format(url, self.ws_endpoint), heartbeat=WS_HEARTBEAT
+ )
+ logger.info("websock with policy established")
+ async for msg in websocket:
+ # check for websocket errors
+ # break out of this async for loop. to attempt reconnection
+ if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
+ break
+
+ if msg.type is (aiohttp.WSMsgType.TEXT):
+ if self._needs_update(
+ json.loads(msg.data),
+ ids=ids,
+ filters=filters
+ ):
+ logger.debug(
+ "notification received from pdp websocket -> %s", msg
+ )
+ await callback()
+ else:
+ logger.warning(
+ "unexpected websocket message type received %s", msg.type
+ )
+ except aiohttp.ClientError:
+ logger.exception("Received connection error with websocket")
+
+
+class PolicyClientV1(BasePolicyClient):
+ """
+ Supports the v1 policy API introduced in ONAP's dublin release
+ """
+
+ async def close(self):
+ """ close the policy client """
+ await super().close()
+ if self.dmaap_session is not None:
+ await self.dmaap_session.close()
+
+ def _init_dmaap_session(self):
+ """ initialize a dmaap session for notifications """
+ if self.dmaap_session is None:
+ self.dmaap_session = aiohttp.ClientSession(
+ headers=self.dmaap_headers,
+ raise_for_status=True
+ )
+
+ return self.dmaap_session
+
+ def __init__(
+ self,
+ headers,
+ pdp_url,
+ **kwargs,
+ ):
+ super().__init__(pdp_url, headers=headers)
+ self._ws = None
+ self.audit_uuid = str(uuid.uuid4())
+ self.dmaap_url = kwargs.get('dmaap_url')
+ self.dmaap_timeout = 15000
+ self.dmaap_session = None
+ self.dmaap_headers = kwargs.get('dmaap_headers', {})
+ self.decision = kwargs.get('v1_decision', V1_DECISION_ENDPOINT)
+
+ async def list_policies(self, filters=None, ids=None):
+ """
+ ONAP has no real equivalent to this.
+ :returns: None
+ """
+ # in derived classes we may use self
+ # pylint: disable=no-self-use
+ return None
+
+ @classmethod
+ def convert_to_policy(cls, policy_body):
+ """
+ Convert raw policy to format expected by microservices
+ :param policy_body: raw dictionary output from pdp
+ :returns: data in proper formatting
+ """
+ pdp_metadata = policy_body.get("metadata", {})
+ policy_id = pdp_metadata.get("policy-id")
+ policy_version = policy_body.get("version")
+ if not policy_id or policy_version is None:
+ logger.warning("Malformed policy is missing policy-id and version")
+ return None
+
+ policy_body["policyName"] = "{}.{}.xml".format(
+ policy_id, str(policy_version.replace(".", "-"))
+ )
+ policy_body["policyVersion"] = str(policy_version)
+ if "properties" in policy_body:
+ policy_body["config"] = policy_body["properties"]
+ del policy_body["properties"]
+
+ return policy_body
+
+ @metrics.get_config_exceptions.count_exceptions()
+ async def get_config(self, filters=None, ids=None):
+ """
+ Used to get the actual policy configuration from PDP
+ :returns: the policy objects that are currently active
+ for the given set of filters
+ """
+ if ids is None:
+ ids = []
+
+ request_data = {
+ "ONAPName": "DCAE",
+ "ONAPComponent": "policy-sync",
+ "ONAPInstance": self.audit_uuid,
+ "action": "configure",
+ "resource": {"policy-id": ids}
+ }
+
+ data = await self._run_request(self.decision, request_data)
+ out = []
+ for policy_body in data["policies"].values():
+ policy = self.convert_to_policy(policy_body)
+ if policy is not None:
+ out.append(policy)
+
+ return out
+
+ def supports_notifications(self):
+ """
+ Does this policy client support real time notifications
+ :returns: True if the dmaap url is set else return false
+ """
+ return self.dmaap_url is not None
+
+ @classmethod
+ def _needs_update(cls, update, ids):
+ """
+ expect something like this
+ {
+ "deployed-policies": [
+ {
+ "policy-type": "onap.policies.monitoring.tcagen2",
+ "policy-type-version": "1.0.0",
+ "policy-id": "onap.scaleout.tca",
+ "policy-version": "2.0.0",
+ "success-count": 3,
+ "failure-count": 0
+ }
+ ],
+ "undeployed-policies": [
+ {
+ "policy-type": "onap.policies.monitoring.tcagen2",
+ "policy-type-version": "1.0.0",
+ "policy-id": "onap.firewall.tca",
+ "policy-version": "6.0.0",
+ "success-count": 3,
+ "failure-count": 0
+ }
+ ]
+ }
+ """
+ for policy in update.get("deployed-policies", []) + update.get(
+ "undeployed-policies", []
+ ):
+ if policy.get("policy-id") in ids:
+ return True
+
+ return False
+
+ async def poll_dmaap(self, callback, ids=None):
+ """
+ one GET request to dmaap
+ :param callback: function to execute when a
+ matching notification is found
+ :param ids: list of id strings for matching
+ """
+ query = f"?timeout={self.dmaap_timeout}"
+ url = f"{self.dmaap_url}/{self.audit_uuid}/0{query}"
+ logger.info("polling topic: %s", url)
+ session = self._init_dmaap_session()
+ try:
+ async with session.get(url) as response:
+ messages = await response.read()
+
+ for msg in json.loads(messages):
+ if self._needs_update(json.loads(msg), ids):
+ logger.info(
+ "notification received from dmaap -> %s", msg
+ )
+ await callback()
+ except aiohttp.ClientError:
+ logger.exception('received connection error from dmaap topic')
+ # wait some time
+ await asyncio.sleep(30)
+
+ async def notificationhandler(self, callback, ids=None, filters=None):
+ """
+ dmaap based notification handler for
+ :param callback: function to execute when a
+ matching notification is found
+ :param ids: list of id strings for matching
+ """
+ if filters is not None:
+ logger.warning("filters are not supported with pdp v1..ignoring")
+ while True:
+ await self.poll_dmaap(callback, ids=ids)
+
+
+def get_client(
+ pdp_url,
+ use_v0=False,
+ **kwargs
+):
+ """
+ get a particular policy client
+ :param use_v0: whether this should be a v0 client or
+ :return: A policy client
+ """
+ if pdp_url is None:
+ raise ValueError("POLICY_SYNC_PDP_URL set or --pdp flag not set")
+
+ pdp_headers = {
+ "Accept": APPLICATION_JSON,
+ "Content-Type": APPLICATION_JSON
+ }
+
+ if 'pdp_user' in kwargs and 'pdp_password' in kwargs:
+ auth = base64.b64encode(
+ "{}:{}".format(
+ kwargs.get('pdp_user'),
+ kwargs.get('pdp_password')
+ ).encode("utf-8")
+ )
+ pdp_headers["Authorization"] = "Basic {}".format(auth.decode("utf-8"))
+
+ dmaap_headers = {
+ "Accept": APPLICATION_JSON,
+ "Content-Type": APPLICATION_JSON
+ }
+
+ logger.info(kwargs.get('dmaap_password'))
+ if 'dmaap_user' in kwargs and 'dmaap_password' in kwargs:
+ auth = base64.b64encode(
+ "{}:{}".format(
+ kwargs.get('dmaap_user'),
+ kwargs.get('dmaap_password')
+ ).encode("utf-8")
+ ).decode("utf-8")
+ dmaap_headers["Authorization"] = f"Basic {auth}"
+
+ # Create client (either v0 or v1) based on arguments)
+ if use_v0:
+ return PolicyClientV0(
+ pdp_headers,
+ pdp_url,
+ decision_endpoint=kwargs.get('v0_decision'),
+ ws_endpoint=kwargs.get('v0_notifications')
+ )
+
+ return PolicyClientV1(
+ pdp_headers,
+ pdp_url,
+ v1_decision=kwargs.get('v1_decision'),
+ dmaap_url=kwargs.get('dmaap_url'),
+ dmaap_headers=dmaap_headers
+ )
diff --git a/dcae-services-policy-sync/policysync/cmd.py b/dcae-services-policy-sync/policysync/cmd.py
new file mode 100644
index 0000000..9055674
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/cmd.py
@@ -0,0 +1,234 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""
+CLI parsing for the sync utility.
+convert flags/env variables to configuration
+"""
+import argparse
+import collections
+import os
+import sys
+import logging
+import logging.config
+from urllib.parse import urlsplit
+import yaml
+import policysync.clients as clients
+import policysync.coroutines
+from .util import get_module_logger
+
+
+logger = get_module_logger(__name__)
+
+APPLICATION_JSON = "application/json"
+
+
+Config = collections.namedtuple(
+ 'Config', ['out_file', 'check_period', 'filters', 'ids', 'client', 'bind'])
+
+
+def parsecmd(args):
+ """
+ Parse the command into a config object
+ :param args: arguments list for parsing
+ :returns: Config for the policy sync
+ """
+ parser = argparse.ArgumentParser(
+ description="Keeps a file updated with policies matching a filter.",
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+ )
+
+ parser.add_argument(
+ "--out",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_OUTFILE", "policies.json"),
+ help="Output file to dump to",
+ )
+
+ parser.add_argument(
+ "--duration",
+ type=int,
+ default=os.environ.get("POLICY_SYNC_DURATION", 1200),
+ help="frequency (in seconds) to conduct periodic check",
+ )
+
+ parser.add_argument(
+ "--filters",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_FILTER", "[]"),
+ help="Regex of policies that you are interested in.",
+ )
+ parser.add_argument(
+ "--ids",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_ID", "[]"),
+ help="Specific names of policies you are interested in.",
+ )
+
+ parser.add_argument(
+ "--pdp-user",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_PDP_USER", None),
+ help="PDP basic auth username",
+ )
+ parser.add_argument(
+ "--pdp-pass",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_PDP_PASS", None),
+ help="PDP basic auth password",
+ )
+
+ parser.add_argument(
+ "--pdp-url",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_PDP_URL", None),
+ help="PDP to connect to",
+ )
+
+ parser.add_argument(
+ "--http-bind",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_HTTP_BIND", "localhost:8000"),
+ help="The bind address for container metrics",
+ )
+
+ parser.add_argument(
+ "--http-metrics",
+ type=bool,
+ default=os.environ.get("POLICY_SYNC_HTTP_METRICS", True),
+ help="turn on or off the prometheus metrics",
+ )
+
+ parser.add_argument(
+ "--use-v0",
+ type=bool,
+ default=os.environ.get("POLICY_SYNC_V0_ENABLE", False),
+ help="Turn on usage of the legacy v0 policy API",
+ )
+
+ parser.add_argument(
+ "--logging-config",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_LOGGING_CONFIG", None),
+ help="Python formatted logging configuration file",
+ )
+
+ # V0 API specific configuration
+ parser.add_argument(
+ "--v0-notify-endpoint",
+ type=str,
+ default=os.environ.get(
+ "POLICY_SYNC_V0_NOTIFIY_ENDPOINT", "pdp/notifications"
+ ),
+ help="Path of the v0 websocket notification",
+ )
+
+ parser.add_argument(
+ "--v0-decision-endpoint",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_V0_DECISION_ENDPOINT", "pdp/api"),
+ help="path of the v0 decision endpoint",
+ )
+
+ # V1 API specific configuration
+ parser.add_argument(
+ "--v1-dmaap-topic",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_V1_DMAAP_URL", None),
+ help="URL of the dmaap topic used in v1 api for notifications",
+ )
+
+ parser.add_argument(
+ "--v1-dmaap-user",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_V1_DMAAP_USER", None),
+ help="User to use with with the dmaap topic"
+ )
+
+ parser.add_argument(
+ "--v1-dmaap-pass",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_V1_DMAAP_PASS", None),
+ help="Password to use with the dmaap topic"
+ )
+
+ parser.add_argument(
+ "--v1-decision-endpoint",
+ type=str,
+ default=os.environ.get(
+ "POLICY_SYNC_V1_PDP_DECISION_ENDPOINT",
+ "policy/pdpx/v1/decision"
+ ),
+ help="Decision endpoint used in the v1 api for notifications",
+ )
+
+ args = parser.parse_args(args)
+
+ if args.logging_config:
+ logging.config.fileConfig(
+ args.logging_config,
+ disable_existing_loggers=False
+ )
+ else:
+ handler = logging.StreamHandler()
+ formatter = logging.Formatter(
+ "[%(asctime)s][%(levelname)-5s]%(message)s"
+ )
+ root = logging.getLogger()
+ handler.setFormatter(formatter)
+ root.addHandler(handler)
+ root.setLevel(logging.INFO)
+
+ bind = args.http_bind if args.http_metrics else None
+
+ client = clients.get_client(
+ args.pdp_url,
+ pdp_user=args.pdp_user,
+ pdp_password=args.pdp_pass,
+ use_v0=args.use_v0,
+ v0_decision=args.v0_decision_endpoint,
+ v0_notifications=args.v0_notify_endpoint,
+ v1_decision=args.v1_decision_endpoint,
+ dmaap_url=args.v1_dmaap_topic,
+ dmaap_user=args.v1_dmaap_user,
+ dmaap_password=args.v1_dmaap_pass
+ )
+
+ if bind is not None:
+ bind = urlsplit("//" + bind)
+
+ return Config(
+ out_file=args.out,
+ check_period=args.duration,
+ filters=yaml.safe_load(args.filters),
+ ids=yaml.safe_load(args.ids),
+ client=client,
+ bind=bind,
+ )
+
+
+def main():
+ """
+ Parse the arguments passed in via the command line and start the app
+ """
+ try:
+ config = parsecmd(sys.argv[1:])
+ except ValueError:
+ logger.error(
+ "There was no POLICY_SYNC_PDP_URL set or --pdp flag set"
+ )
+ return -1
+ policysync.coroutines.start_event_loop(config)
+ return 0
diff --git a/dcae-services-policy-sync/policysync/coroutines.py b/dcae-services-policy-sync/policysync/coroutines.py
new file mode 100644
index 0000000..236d6f2
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/coroutines.py
@@ -0,0 +1,182 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""
+Asyncio coroutine setup for both periodic and real time notification tasks """
+import signal
+import asyncio
+from prometheus_client import start_http_server
+from .inventory import Inventory
+from .util import get_module_logger
+
+SLEEP_ON_ERROR = 10
+logger = get_module_logger(__name__)
+
+
+async def notify_task(inventory, sleep):
+ """
+ start the notification task
+ :param inventory: Inventory
+ :param sleep: how long to wait on error in seconds
+ """
+
+ logger.info("opening notificationhandler for policy...")
+ await inventory.client.notificationhandler(
+ inventory.check_and_update,
+ ids=inventory.policy_ids,
+ filters=inventory.policy_filters,
+ )
+ logger.warning("websocket closed or errored...will attempt reconnection")
+ await asyncio.sleep(sleep)
+
+
+async def periodic_task(inventory, sleep):
+ """
+ start the periodic task
+ :param inventory: Inventory
+ :param sleep: how long to wait between periodic checks
+ """
+ await asyncio.sleep(sleep)
+ logger.info("Executing periodic check of PDP policies")
+ await inventory.update()
+
+
+async def task_runner(inventory, sleep, task, should_run):
+ """
+ Runs a task in an event loop
+ :param inventory: Inventory
+ :param sleep: how long to wait between loop iterations
+ :param task: coroutine to run
+ :param should_run: function for should this task continue to run
+ """
+ # pylint: disable=broad-except
+ while should_run():
+ try:
+ await task(inventory, sleep)
+ except asyncio.CancelledError:
+ break
+ except Exception:
+ logger.exception("Received exception")
+
+
+async def shutdown(loop, tasks, inventory):
+ """
+ shutdown the event loop and cancel all tasks
+ :param loop: Asyncio eventloop
+ :param tasks: list of asyncio tasks
+ :param inventory: the inventory object
+ """
+
+ logger.info("caught signal")
+ # Stop the websocket routines
+ for task in tasks:
+ task.cancel()
+ await task
+
+ # Close the client
+ await inventory.close()
+ loop.stop()
+
+
+def _setup_coroutines(
+ loop,
+ inventory,
+ shutdown_handler,
+ task_r,
+ **kwargs
+):
+ """ sets up the application coroutines"""
+ # Task runner takes a function for stop condition
+ # (for testing purposes) but should always run in practice
+ # pylint: disable=broad-except
+ def infinite_condition():
+ return True
+
+ logger.info("Starting gather of all policies...")
+ try:
+ loop.run_until_complete(inventory.gather())
+ except Exception:
+ logger.exception('received exception on initial gather')
+
+ # websocket and the periodic check of policies
+ tasks = [
+ loop.create_task(
+ task_r(
+ inventory,
+ kwargs.get('check_period', 2400),
+ periodic_task,
+ infinite_condition
+ )
+ )
+ ]
+
+ if inventory.client.supports_notifications():
+ tasks.append(
+ loop.create_task(
+ task_r(
+ inventory,
+ SLEEP_ON_ERROR,
+ notify_task,
+ infinite_condition
+ )
+ )
+ )
+ else:
+ logger.warning(
+ "Defaulting to polling... Provide a dmaap url to receive faster updates"
+ )
+
+ # Add shutdown handlers for sigint and sigterm
+ for signame in ("SIGINT", "SIGTERM"):
+ sig = getattr(signal, signame)
+ loop.add_signal_handler(
+ sig,
+ lambda: asyncio.ensure_future(
+ shutdown_handler(loop, tasks, inventory)
+ ),
+ )
+
+ # Start prometheus server daemonthread for metrics/healthchecking
+ if 'bind' in kwargs:
+ metrics_server = kwargs.get('metrics_server', start_http_server)
+ metrics_server(kwargs['bind'].port, addr=kwargs['bind'].hostname)
+
+
+def start_event_loop(config):
+ """
+ start the event loop that runs the application
+ :param config: Config object for the application
+ """
+ loop = asyncio.get_event_loop()
+ inventory = Inventory(
+ config.filters,
+ config.ids,
+ config.out_file,
+ config.client
+ )
+
+ _setup_coroutines(
+ loop,
+ inventory,
+ shutdown,
+ task_runner,
+ metrics_server=start_http_server,
+ bind=config.bind,
+ check_period=config.check_period
+ )
+
+ loop.run_forever()
+ loop.close()
+ logger.info("shutdown complete")
diff --git a/dcae-services-policy-sync/policysync/inventory.py b/dcae-services-policy-sync/policysync/inventory.py
new file mode 100644
index 0000000..0eb91b5
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/inventory.py
@@ -0,0 +1,169 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+""" In memory data store for policies which are currently used by a mS """
+import asyncio
+import json
+import uuid
+import os
+import tempfile
+import aiohttp
+from datetime import datetime
+from .util import get_module_logger
+
+logger = get_module_logger(__name__)
+
+ACTION_GATHERED = "gathered"
+ACTION_UPDATED = "updated"
+OUTFILE_INDENT = 4
+
+
+class Inventory:
+ """ In memory data store for policies which are currently used by a mS """
+ def __init__(self, filters, ids, outfile, client):
+ self.policy_filters = filters
+ self.policy_ids = ids
+ self.hp_active_inventory = set()
+ self.get_lock = asyncio.Lock()
+ self.file = outfile
+ self.queue = asyncio.Queue()
+ self.client = client
+
+ async def gather(self):
+ """
+ Run at startup to gather an initial inventory of policies
+ """
+ return await self._sync_inventory(ACTION_GATHERED)
+
+ async def update(self):
+ """
+ Run to update an inventory of policies on the fly
+ """
+ return await self._sync_inventory(ACTION_UPDATED)
+
+ async def check_and_update(self):
+ """ check and update the policy inventory """
+ return await self.update()
+
+ async def close(self):
+ """ close the policy inventory and its associated client """
+ await self.client.close()
+
+ def _atomic_dump(self, data):
+ """ atomically dump the policy content to a file by rename """
+ try:
+ temp_file = tempfile.NamedTemporaryFile(
+ delete=False,
+ dir=os.path.dirname(self.file),
+ prefix=os.path.basename(self.file),
+ mode="w",
+ )
+ try:
+ temp_file.write(data)
+ finally:
+ # fsync the file so its on disk
+ temp_file.flush()
+ os.fsync(temp_file.fileno())
+ finally:
+ temp_file.close()
+
+ os.rename(temp_file.name, os.path.abspath(self.file))
+
+ async def get_policy_content(self, action=ACTION_UPDATED):
+ """
+ get the policy content off the PDP
+ :param action: what action to present
+ :returns: True/False depending on if update was successful
+ """
+ logger.info("Starting policy update process...")
+ try:
+ policy_bodies = await self.client.get_config(
+ filters=self.policy_filters, ids=self.policy_ids
+ )
+ except aiohttp.ClientError:
+ logger.exception('Conncection Error while connecting to PDP')
+ return False
+
+ # match the format a bit of the Config Binding Service
+ out = {
+ "policies": {"items": policy_bodies},
+ "event": {
+ "action": action,
+ "timestamp": (datetime.utcnow().isoformat()[:-3] + "Z"),
+ "update_id": str(uuid.uuid4()),
+ "policies_count": len(policy_bodies),
+ },
+ }
+
+ # Atomically dump the file to disk
+ tmp = {
+ x.get("policyName") for x in policy_bodies if "policyName" in x
+ }
+
+ if tmp != self.hp_active_inventory:
+ data = json.dumps(out)
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(None, self._atomic_dump, data)
+ logger.info(
+ "Update complete. Policies dumped to: %s", self.file
+ )
+ self.hp_active_inventory = tmp
+ return True
+ else:
+ logger.info("No updates needed for now")
+ return False
+
+ async def _sync_inventory(self, action):
+ """
+ Pull an inventory of policies. Commit changes if there is a change.
+ return: boolean to represent whether changes were commited
+ """
+ try:
+ pdp_inventory = await self.client.list_policies(
+ filters=self.policy_filters, ids=self.policy_ids
+ )
+ except aiohttp.ClientError:
+ logger.exception("Inventory sync failed due to a connection error")
+ return False
+
+ logger.debug("pdp_inventory -> %s", pdp_inventory)
+
+ # Below needs to be under a lock because of
+ # the call to getConfig being awaited.
+ async with self.get_lock:
+ if self.hp_active_inventory != pdp_inventory or \
+ pdp_inventory is None:
+
+ # Log a delta of what has changed related to this policy update
+ if pdp_inventory is not None and \
+ self.hp_active_inventory is not None:
+ msg = {
+ "removed": list(
+ self.hp_active_inventory - pdp_inventory
+ ),
+ "added": list(
+ pdp_inventory - self.hp_active_inventory
+ ),
+ }
+ logger.info(
+ "PDP indicates the following changes: %s ", msg
+ )
+
+ return await self.get_policy_content(action)
+
+ logger.info(
+ "local matches pdp. no update required for now"
+ )
+ return False
diff --git a/dcae-services-policy-sync/policysync/metrics.py b/dcae-services-policy-sync/policysync/metrics.py
new file mode 100644
index 0000000..7f825fc
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/metrics.py
@@ -0,0 +1,38 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+""" counters and gaugages for various metrics """
+from prometheus_client import Counter, Gauge
+
+
+policy_updates_counter = Counter(
+ "policy_updates", "Number of total policy updates commited"
+)
+websock_closures = Counter(
+ "websocket_errors_and_closures", "Number of websocket closures or errors"
+)
+list_policy_exceptions = Counter(
+ "list_policy_exception",
+ "Exceptions that have occured as a result of calling listPolicy",
+)
+get_config_exceptions = Counter(
+ "get_config_exception",
+ "Exceptions that have occured as a result of calling getConfig",
+)
+
+active_policies_gauge = Gauge(
+ "active_policies",
+ "Number of policies that have been retrieved off the PDP"
+)
diff --git a/dcae-services-policy-sync/policysync/util.py b/dcae-services-policy-sync/policysync/util.py
new file mode 100644
index 0000000..1bbac5a
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/util.py
@@ -0,0 +1,10 @@
+""" utility functions (currenlty just for logging) """
+import logging
+
+
+def get_module_logger(mod_name):
+ """
+ To use this, do logger = get_module_logger(__name__)
+ """
+ logger = logging.getLogger(mod_name)
+ return logger
diff --git a/dcae-services-policy-sync/pom.xml b/dcae-services-policy-sync/pom.xml
new file mode 100644
index 0000000..f5d38d8
--- /dev/null
+++ b/dcae-services-policy-sync/pom.xml
@@ -0,0 +1,172 @@
+<?xml version="1.0"?>
+<!--
+================================================================================
+Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.onap.dcaegen2.deployments</groupId>
+ <artifactId>deployments</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+ <groupId>org.onap.dcaegen2.deployments</groupId>
+ <artifactId>dcae-services-policy-sync</artifactId>
+ <name>dcaegen2-deployments-dcae-services-policy-sync</name>
+ <version>1.0.0</version>
+ <url>http://maven.apache.org</url>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <sonar.skip>true</sonar.skip>
+ <sonar.sources>.</sonar.sources>
+ <!-- customize the SONARQUBE URL -->
+ <!-- sonar.host.url>http://localhost:9000</sonar.host.url -->
+ <!-- below are language dependent -->
+ <!-- for Python -->
+ <sonar.language>py</sonar.language>
+ <sonar.pluginName>Python</sonar.pluginName>
+ <sonar.inclusions>**/*.py</sonar.inclusions>
+ <!-- for JavaScaript -->
+ <!--
+ <sonar.language>js</sonar.language>
+ <sonar.pluginName>JS</sonar.pluginName>
+ <sonar.inclusions>**/*.js</sonar.inclusions>
+ -->
+ </properties>
+ <build>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <plugins>
+ <!-- plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <descriptors>
+ <descriptor>assembly/dep.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin -->
+ <!-- now we configure custom action (calling a script) at various lifecycle phases -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <id>clean phase script</id>
+ <phase>clean</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>clean</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ <execution>
+ <id>generate-sources script</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>generate-sources</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ <execution>
+ <id>compile script</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>compile</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ <execution>
+ <id>package script</id>
+ <phase>package</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>package</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ <execution>
+ <id>test script</id>
+ <phase>test</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>test</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ <execution>
+ <id>install script</id>
+ <phase>install</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>install</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ <execution>
+ <id>deploy script</id>
+ <phase>deploy</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>deploy</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project> \ No newline at end of file
diff --git a/dcae-services-policy-sync/setup.py b/dcae-services-policy-sync/setup.py
new file mode 100644
index 0000000..f5de9a2
--- /dev/null
+++ b/dcae-services-policy-sync/setup.py
@@ -0,0 +1,30 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from setuptools import setup, find_packages
+
+setup(
+ name="policysync",
+ author="Chris Luckenbaugh",
+ version="1.0.0",
+ packages=find_packages(),
+ include_package_data=True,
+ install_requires=["aiohttp>=2.3", "PyYAML", "prometheus_client"],
+ entry_points="""
+ [console_scripts]
+ policysync=policysync.cmd:main
+ """,
+)
diff --git a/dcae-services-policy-sync/tests/mocks.py b/dcae-services-policy-sync/tests/mocks.py
new file mode 100644
index 0000000..9a3d6cd
--- /dev/null
+++ b/dcae-services-policy-sync/tests/mocks.py
@@ -0,0 +1,191 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from urllib.parse import urlsplit
+import asyncio, aiohttp
+
+
+class MockConfig:
+ def __init__(self):
+ self.check_period = 60
+ self.quiet_period = 0
+ self.bind = urlsplit("//localhost:8080")
+
+
+class MockFileDumper:
+ def __init__(self):
+ self.closed = False
+
+ async def close(self):
+ self.closed = True
+
+
+class MockInventory:
+ def __init__(self, queue=None):
+ self.was_updated = False
+ self.was_gathered = False
+ self.client = MockClient()
+ self.queue = queue
+ self.quiet = 0
+ self.updates = []
+ self.policy_filters = []
+ self.policy_ids = []
+
+ async def update(self):
+ self.was_updated = True
+ return True
+
+ async def gather(self):
+ self.was_gathered = True
+ print("got here GATHERED")
+ return True
+
+ async def close(self):
+ self.client.closed = True
+
+ async def check_and_update(self):
+ await self.update()
+
+ async def get_policy_content(self, action="UPDATED"):
+ self.updates.append(action)
+
+
+class MockClient:
+ def __init__(self, raise_on_listpolicies=False, raise_on_getconfig=False):
+ self.closed = False
+ self.opened = False
+ self.raise_on_listpolicies = raise_on_listpolicies
+ self.raise_on_getconfig = raise_on_getconfig
+
+ async def close(self):
+ self.closed = True
+
+ async def notificationhandler(self, callback, ids=[], filters=[]):
+ await callback()
+
+ def supports_notifications(self):
+ return True
+
+ async def list_policies(self, filters=[], ids=[]):
+ if self.raise_on_listpolicies:
+ raise aiohttp.ClientError
+
+ return set(
+ [
+ "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml"
+ ]
+ )
+
+ async def get_config(self, filters=[], ids=[]):
+ if self.raise_on_getconfig:
+ raise aiohttp.ClientError
+
+ return [
+ {
+ "policyConfigMessage": "Config Retrieved!",
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "config": {
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "location": " Edge",
+ "uuid": "TestUUID",
+ "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+ "configName": "DCAE_HighlandPark_AgingConfig",
+ "templateVersion": "1607",
+ "priority": "4",
+ "version": 11.0,
+ "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
+ "riskType": "test",
+ "riskLevel": "2",
+ "guard": "False",
+ "content": {
+ "signature": {
+ "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
+ },
+ "name": "testAging",
+ "context": ["PROD"],
+ "priority": 1,
+ "prePublishAging": 40,
+ "preCorrelationAging": 20,
+ },
+ "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+ },
+ "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+ "policyType": "MicroService",
+ "policyVersion": "78",
+ "matchingConditions": {
+ "ECOMPName": "DCAE",
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "uuid": "TestUUID",
+ "Location": " Edge",
+ },
+ "responseAttributes": {},
+ "property": None,
+ },
+ {
+ "policyConfigMessage": "Config Retrieved! ",
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "config": "adlskjfadslkjf",
+ "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+ "policyType": "MicroService",
+ "policyVersion": "78",
+ "matchingConditions": {
+ "ECOMPName": "DCAE",
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "uuid": "TestUUID",
+ "Location": " Edge",
+ },
+ "responseAttributes": {},
+ "property": None,
+ },
+ ]
+
+
+class MockLoop:
+ def __init__(self):
+ self.stopped = False
+ self.handlers = []
+ self.tasks = []
+
+ def stop(self):
+ self.stopped = True
+
+ def add_signal_handler(self, signal, handler):
+ self.handlers.append(signal)
+
+ def create_task(self, task):
+ self.tasks.append(task)
+
+ def run_until_complete(self, task):
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ loop.run_until_complete(task)
+
+
+class MockTask:
+ def __init__(self):
+ self.canceled = False
+
+ def cancel(self):
+ self.canceled = True
+
+ def __await__(self):
+ return iter([])
diff --git a/dcae-services-policy-sync/tests/test_client_v0.py b/dcae-services-policy-sync/tests/test_client_v0.py
new file mode 100644
index 0000000..6ca590e
--- /dev/null
+++ b/dcae-services-policy-sync/tests/test_client_v0.py
@@ -0,0 +1,191 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from aiohttp import web, WSMsgType
+import json, pytest, re
+from policysync.clients import (
+ PolicyClientV0 as PolicyClient,
+ WS_HEARTBEAT
+)
+
+
+async def listpolicy(request):
+ return web.json_response(["hello"])
+
+
+async def getconfig(request):
+ j = [
+ {
+ "policyConfigMessage": "Config Retrieved!",
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "config": '{"service":"DCAE_HighlandPark_AgingConfig","location":" Edge","uuid":"TestUUID","policyName":"DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging","configName":"DCAE_HighlandPark_AgingConfig","templateVersion":"1607","priority":"4","version":11.0,"policyScope":"resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8","riskType":"test","riskLevel":"2","guard":"False","content":{"signature":{"filter_clause":"event.faultFields.alarmCondition LIKE(\'%chrisluckenbaugh%\')"},"name":"testAging","context":["PROD"],"priority":1,"prePublishAging":40,"preCorrelationAging":20},"policyNameWithPrefix":"DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging"}',
+ "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+ "policyType": "MicroService",
+ "policyVersion": "78",
+ "matchingConditions": {
+ "ECOMPName": "DCAE",
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "uuid": "TestUUID",
+ "Location": " Edge",
+ },
+ "responseAttributes": {},
+ "property": None,
+ },
+ {
+ "policyConfigMessage": "Config Retrieved! ",
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "config": "adlskjfadslkjf",
+ "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+ "policyType": "MicroService",
+ "policyVersion": "78",
+ "matchingConditions": {
+ "ECOMPName": "DCAE",
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "uuid": "TestUUID",
+ "Location": " Edge",
+ },
+ "responseAttributes": {},
+ "property": None,
+ },
+ ]
+
+ return web.json_response(j)
+
+
+async def wshandler(request):
+ resp = web.WebSocketResponse()
+ available = resp.can_prepare(request)
+ await resp.prepare(request)
+ await resp.send_str('{ "loadedPolicies": [{ "policyName": "bar"}] }')
+ await resp.send_bytes(b"bar!!!")
+ await resp.close("closed")
+
+
+@pytest.fixture
+def policyclient(aiohttp_client, loop):
+ app = web.Application()
+ app.router.add_route("POST", "/pdp/api/listPolicy", listpolicy)
+ app.router.add_route("POST", "/pdp/api/getConfig", getconfig)
+ app.router.add_get("/pdp/notifications", wshandler)
+ fake_client = loop.run_until_complete(aiohttp_client(app))
+ server = "{}://{}:{}".format("http", fake_client.host, fake_client.port)
+ return PolicyClient({}, server)
+
+
+async def test_listpolicies(policyclient):
+ j = await policyclient.list_policies(filters=["bar"])
+ assert j == set(["hello"])
+ await policyclient.close()
+ assert policyclient.session.closed
+
+
+async def test_getconfig(policyclient):
+ j = await policyclient.get_config(filters=["bar"])
+
+ assert j == [
+ {
+ "policyConfigMessage": "Config Retrieved!",
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "config": {
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "location": " Edge",
+ "uuid": "TestUUID",
+ "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+ "configName": "DCAE_HighlandPark_AgingConfig",
+ "templateVersion": "1607",
+ "priority": "4",
+ "version": 11.0,
+ "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
+ "riskType": "test",
+ "riskLevel": "2",
+ "guard": "False",
+ "content": {
+ "signature": {
+ "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
+ },
+ "name": "testAging",
+ "context": ["PROD"],
+ "priority": 1,
+ "prePublishAging": 40,
+ "preCorrelationAging": 20,
+ },
+ "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+ },
+ "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+ "policyType": "MicroService",
+ "policyVersion": "78",
+ "matchingConditions": {
+ "ECOMPName": "DCAE",
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "uuid": "TestUUID",
+ "Location": " Edge",
+ },
+ "responseAttributes": {},
+ "property": None,
+ },
+ {
+ "policyConfigMessage": "Config Retrieved! ",
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "config": "adlskjfadslkjf",
+ "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+ "policyType": "MicroService",
+ "policyVersion": "78",
+ "matchingConditions": {
+ "ECOMPName": "DCAE",
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "uuid": "TestUUID",
+ "Location": " Edge",
+ },
+ "responseAttributes": {},
+ "property": None,
+ },
+ ]
+ await policyclient.close()
+
+
+async def test_supports_notifications(policyclient):
+ assert policyclient.supports_notifications()
+
+
+async def test_needs_update(policyclient):
+ assert policyclient._needs_update(
+ {"loadedPolicies": [{"policyName": "bar"}]}, [], ["bar"]
+ )
+ assert not policyclient._needs_update(
+ {"loadedPolicies": [{"policyName": "bar"}]}, [], ["foo"]
+ )
+
+
+async def test_ws(policyclient):
+ async def ws_callback():
+ assert True
+
+ await policyclient.notificationhandler(ws_callback, filters=["bar"])
+ await policyclient.close()
+
+ assert policyclient.ws_session.closed
diff --git a/dcae-services-policy-sync/tests/test_client_v1.py b/dcae-services-policy-sync/tests/test_client_v1.py
new file mode 100644
index 0000000..6994a6f
--- /dev/null
+++ b/dcae-services-policy-sync/tests/test_client_v1.py
@@ -0,0 +1,216 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from aiohttp import web, WSMsgType
+import json, pytest, re
+from policysync.clients import PolicyClientV1 as PolicyClient
+
+DECISION_ENDPOINT = 'policy/pdpx/v1/decision'
+async def get_decision(request):
+ req_data = await request.json()
+ assert req_data['ONAPName'] == 'DCAE'
+ assert req_data['ONAPComponent'] == 'policy-sync'
+ assert req_data['action'] == 'configure'
+ assert req_data['resource'] == {
+ 'policy-id': [
+ 'onap.scaleout.tca',
+ 'onap.restart.tca'
+ ]
+ }
+
+
+ j = {
+ "policies": {
+ "onap.scaleout.tca": {
+ "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+ "version": "1.0.0",
+ "metadata": {"policy-id": "onap.scaleout.tca"},
+ "properties": {
+ "tca_policy": {
+ "domain": "measurementsForVfScaling",
+ "metricsPerEventName": [
+ {
+ "eventName": "vLoadBalancer",
+ "controlLoopSchemaType": "VNF",
+ "policyScope": "type=configuration",
+ "policyName": "onap.scaleout.tca",
+ "policyVersion": "v0.0.1",
+ "thresholds": [
+ {
+ "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3",
+ "closedLoopEventStatus": "ONSET",
+ "version": "1.0.2",
+ "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated",
+ "thresholdValue": 500,
+ "direction": "LESS_OR_EQUAL",
+ "severity": "MAJOR",
+ },
+ {
+ "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3",
+ "closedLoopEventStatus": "ONSET",
+ "version": "1.0.2",
+ "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated",
+ "thresholdValue": 5000,
+ "direction": "GREATER_OR_EQUAL",
+ "severity": "CRITICAL",
+ },
+ ],
+ }
+ ],
+ }
+ },
+ },
+ "onap.restart.tca": {
+ "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+ "version": "1.0.0",
+ "metadata": {"policy-id": "onap.restart.tca", "policy-version": 1},
+ "properties": {
+ "tca_policy": {
+ "domain": "measurementsForVfScaling",
+ "metricsPerEventName": [
+ {
+ "eventName": "Measurement_vGMUX",
+ "controlLoopSchemaType": "VNF",
+ "policyScope": "DCAE",
+ "policyName": "DCAE.Config_tca-hi-lo",
+ "policyVersion": "v0.0.1",
+ "thresholds": [
+ {
+ "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e",
+ "version": "1.0.2",
+ "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value",
+ "thresholdValue": 0,
+ "direction": "EQUAL",
+ "severity": "MAJOR",
+ "closedLoopEventStatus": "ABATED",
+ },
+ {
+ "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e",
+ "version": "1.0.2",
+ "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value",
+ "thresholdValue": 0,
+ "direction": "GREATER",
+ "severity": "CRITICAL",
+ "closedLoopEventStatus": "ONSET",
+ },
+ ],
+ }
+ ],
+ }
+ },
+ },
+ }
+ }
+
+ return web.json_response(j)
+
+
+@pytest.fixture
+def policyclient(aiohttp_client, loop):
+ app = web.Application()
+ app.router.add_route("POST", "/" + DECISION_ENDPOINT, get_decision)
+ fake_client = loop.run_until_complete(aiohttp_client(app))
+ server = "{}://{}:{}".format("http", fake_client.host, fake_client.port)
+ return PolicyClient({}, server)
+
+
+async def test_getconfig(policyclient):
+ j = await policyclient.get_config(ids=['onap.scaleout.tca', 'onap.restart.tca' ])
+ assert j == [{
+ "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+ "version": "1.0.0",
+ "metadata": {
+ "policy-id": "onap.scaleout.tca"
+ },
+ "policyName": "onap.scaleout.tca.1-0-0.xml",
+ "policyVersion": "1.0.0",
+ "config": {
+ "tca_policy": {
+ "domain": "measurementsForVfScaling",
+ "metricsPerEventName": [{
+ "eventName": "vLoadBalancer",
+ "controlLoopSchemaType": "VNF",
+ "policyScope": "type=configuration",
+ "policyName": "onap.scaleout.tca",
+ "policyVersion": "v0.0.1",
+ "thresholds": [{
+ "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3",
+ "closedLoopEventStatus": "ONSET",
+ "version": "1.0.2",
+ "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated",
+ "thresholdValue": 500,
+ "direction": "LESS_OR_EQUAL",
+ "severity": "MAJOR"
+ },
+ {
+ "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3",
+ "closedLoopEventStatus": "ONSET",
+ "version": "1.0.2",
+ "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated",
+ "thresholdValue": 5000,
+ "direction": "GREATER_OR_EQUAL",
+ "severity": "CRITICAL"
+ }
+ ]
+ }]
+ }
+ }
+ }, {
+ "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+ "version": "1.0.0",
+ "metadata": {
+ "policy-id": "onap.restart.tca",
+ "policy-version": 1
+ },
+ "policyName": "onap.restart.tca.1-0-0.xml",
+ "policyVersion": "1.0.0",
+ "config": {
+ "tca_policy": {
+ "domain": "measurementsForVfScaling",
+ "metricsPerEventName": [{
+ "eventName": "Measurement_vGMUX",
+ "controlLoopSchemaType": "VNF",
+ "policyScope": "DCAE",
+ "policyName": "DCAE.Config_tca-hi-lo",
+ "policyVersion": "v0.0.1",
+ "thresholds": [{
+ "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e",
+ "version": "1.0.2",
+ "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value",
+ "thresholdValue": 0,
+ "direction": "EQUAL",
+ "severity": "MAJOR",
+ "closedLoopEventStatus": "ABATED"
+ },
+ {
+ "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e",
+ "version": "1.0.2",
+ "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value",
+ "thresholdValue": 0,
+ "direction": "GREATER",
+ "severity": "CRITICAL",
+ "closedLoopEventStatus": "ONSET"
+ }
+ ]
+ }]
+ }
+ }
+ }]
+ await policyclient.close()
+
+
+async def test_supports_notifications(policyclient):
+ assert not policyclient.supports_notifications()
diff --git a/dcae-services-policy-sync/tests/test_cmd.py b/dcae-services-policy-sync/tests/test_cmd.py
new file mode 100644
index 0000000..3c061c0
--- /dev/null
+++ b/dcae-services-policy-sync/tests/test_cmd.py
@@ -0,0 +1,79 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+import pytest, json, sys, logging, logging.config
+from policysync.cmd import Config, main, parsecmd
+import policysync.coroutines
+
+
+class TestConfig:
+ def test_parse_args(self):
+ args = [
+ "--out",
+ "out",
+ "--pdp-user",
+ "chris",
+ "--pdp-pass",
+ "notapassword",
+ "--pdp-url",
+ "blah",
+ "--duration",
+ "60",
+ "--filters",
+ "[blah]",
+ ]
+
+ c = parsecmd(args)
+
+ assert c.filters == ["blah"]
+ assert c.check_period == 60
+ assert c.out_file == "out"
+
+ def test_parse_args_no_auth(self):
+ c = parsecmd(
+ ["--out", "out", "--pdp-url", "blah", "--duration", "60", "--filters", "[blah]"]
+ )
+
+ assert c.client.pdp_url == "blah"
+ assert c.filters == ["blah"]
+ assert c.check_period == 60
+ assert c.out_file == "out"
+
+ def test_parse_args_no_pdp(self):
+ args = []
+ with pytest.raises(ValueError):
+ parsecmd(args)
+
+ def test_parse_bad_bind(self):
+ args = [
+ "--out",
+ "out",
+ "--pdp-user",
+ "chris",
+ "--pdp-pass",
+ "notapassword",
+ "--pdp-url",
+ "blah",
+ "--duration",
+ "60",
+ "--filters",
+ "[blah]",
+ "--http-bind",
+ "l[ocalhost:100",
+ ]
+
+ with pytest.raises(ValueError):
+ parsecmd(args)
diff --git a/dcae-services-policy-sync/tests/test_coroutines.py b/dcae-services-policy-sync/tests/test_coroutines.py
new file mode 100644
index 0000000..4c90ae8
--- /dev/null
+++ b/dcae-services-policy-sync/tests/test_coroutines.py
@@ -0,0 +1,142 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+import pytest, json, sys, asyncio, signal
+from tests.mocks import (
+ MockClient,
+ MockTask,
+ MockLoop,
+ MockInventory,
+ MockConfig,
+ MockFileDumper,
+)
+from policysync.coroutines import (
+ shutdown,
+ periodic_task,
+ notify_task,
+ task_runner,
+ _setup_coroutines,
+ SLEEP_ON_ERROR,
+)
+import policysync.coroutines as coroutines
+
+
+async def test_shutdownhandler():
+ client = MockClient()
+ tasks = [MockTask()]
+ loop = MockLoop()
+ inventory = MockInventory()
+
+ await shutdown( loop, tasks, inventory)
+
+ # Assert that a shutdown results in all tasks in the loop being canceled
+ for x in tasks:
+ assert x.canceled
+
+ # ... And the the PDP client is closed
+ assert inventory.client.closed
+
+ # ... And that the event loop is stopped
+ assert loop.stopped
+
+
+async def test_periodic():
+ inventory = MockInventory()
+ await periodic_task(inventory, 1)
+ assert inventory.was_updated
+
+
+async def test_ws():
+ inventory = MockInventory()
+ await notify_task(inventory, 1)
+ assert inventory.was_updated
+
+
+async def test_task_runner():
+ def should_run():
+ if should_run.counter == 0:
+ should_run.counter += 1
+ return True
+ else:
+ return False
+
+ should_run.counter = 0
+
+ def mocktask(inventory):
+ assert True
+
+ await task_runner(MockInventory(), 1, mocktask, should_run)
+
+
+async def test_task_runner_cancel():
+ def should_run():
+ if should_run.counter == 0:
+ should_run.counter += 1
+ return True
+ elif should_run.counter == 1:
+ # If we get here then fail the test
+ assert False, "Task runner should have broken out of loop before this"
+ return False
+
+ should_run.counter = 0
+
+ # We create a mock task that raises a cancellation error (sent when a asyncio task is canceled)
+ def mocktask(inventory, sleep):
+ raise asyncio.CancelledError
+
+ await task_runner(MockInventory(), 1, mocktask, should_run)
+
+
+def test_setup_coroutines():
+ loop = MockLoop()
+
+ def fake_task_runner(inventory, sleep, task, should_run):
+ return (sleep, task)
+
+ def fake_shutdown(sig, loop, tasks, client):
+ return sig
+
+ def fake_metrics_server(port, addr=None):
+ fake_metrics_server.started = True
+
+ fake_metrics_server.started = False
+
+ inventory = MockInventory()
+ client = MockClient()
+ config = MockConfig()
+
+ _setup_coroutines(
+ loop,
+ inventory,
+ fake_shutdown,
+ fake_task_runner,
+ metrics_server=fake_metrics_server,
+ check_period=config.check_period,
+ bind=config.bind,
+ )
+
+ # By the end of setup coroutines we should have...
+
+ # Gathered initial set of policies
+ assert inventory.was_gathered
+
+ # started the websocket and periodic task running
+ assert (SLEEP_ON_ERROR, notify_task) in loop.tasks
+ assert (config.check_period, periodic_task) in loop.tasks
+
+ # Signal handlers for SIGINT and SIGTERM
+ assert signal.SIGINT in loop.handlers
+ assert signal.SIGTERM in loop.handlers
diff --git a/dcae-services-policy-sync/tests/test_inventory.py b/dcae-services-policy-sync/tests/test_inventory.py
new file mode 100644
index 0000000..5b6f21b
--- /dev/null
+++ b/dcae-services-policy-sync/tests/test_inventory.py
@@ -0,0 +1,153 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+import pytest, json, aiohttp, asyncio
+from policysync.inventory import (
+ Inventory,
+ ACTION_GATHERED,
+ ACTION_UPDATED,
+)
+from tests.mocks import MockClient
+
+
+class MockMessage:
+ def __init__(self, type, data):
+ self.type = type
+ self.data = data
+
+
+@pytest.fixture()
+def inventory(request, tmpdir):
+ f1 = tmpdir.mkdir("sub").join("myfile")
+ print(f1)
+ return Inventory(["DCAE.Config_MS_AGING_UVERSE_PROD_.*"], [], f1, MockClient())
+
+
+class TestInventory:
+ @pytest.mark.asyncio
+ async def test_close(self, inventory):
+ await inventory.close()
+ assert inventory.client.closed
+
+ @pytest.mark.asyncio
+ async def test_get_policy_content(self, inventory):
+ await inventory.get_policy_content()
+ with open(inventory.file) as f:
+ data = json.load(f)
+
+ assert data["policies"] == {
+ "items": [
+ {
+ "policyConfigMessage": "Config Retrieved!",
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "config": {
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "location": " Edge",
+ "uuid": "TestUUID",
+ "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+ "configName": "DCAE_HighlandPark_AgingConfig",
+ "templateVersion": "1607",
+ "priority": "4",
+ "version": 11.0,
+ "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
+ "riskType": "test",
+ "riskLevel": "2",
+ "guard": "False",
+ "content": {
+ "signature": {
+ "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
+ },
+ "name": "testAging",
+ "context": ["PROD"],
+ "priority": 1,
+ "prePublishAging": 40,
+ "preCorrelationAging": 20,
+ },
+ "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+ },
+ "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+ "policyType": "MicroService",
+ "policyVersion": "78",
+ "matchingConditions": {
+ "ECOMPName": "DCAE",
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "uuid": "TestUUID",
+ "Location": " Edge",
+ },
+ "responseAttributes": {},
+ "property": None,
+ },
+ {
+ "policyConfigMessage": "Config Retrieved! ",
+ "policyConfigStatus": "CONFIG_RETRIEVED",
+ "type": "JSON",
+ "config": "adlskjfadslkjf",
+ "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+ "policyType": "MicroService",
+ "policyVersion": "78",
+ "matchingConditions": {
+ "ECOMPName": "DCAE",
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "uuid": "TestUUID",
+ "Location": " Edge",
+ },
+ "responseAttributes": {},
+ "property": None,
+ },
+ ]
+ }
+
+ assert data["event"]["action"] == ACTION_UPDATED
+
+ @pytest.mark.asyncio
+ async def test_update(self, inventory):
+ await inventory.update()
+ assert len(inventory.hp_active_inventory) == 1
+
+ assert not await inventory.update()
+
+ @pytest.mark.asyncio
+ async def test_update_listpolicies_exception(self, inventory):
+ inventory.client.raise_on_listpolicies = True
+ assert not await inventory.update()
+
+ @pytest.mark.asyncio
+ async def test_update_getconfig_exception(self, inventory):
+ inventory.client.raise_on_getconfig = True
+ await inventory.get_policy_content()
+
+ @pytest.mark.asyncio
+ async def test_gather(self, inventory):
+ await inventory.gather()
+
+ # We should gather one policy
+ assert len(inventory.hp_active_inventory) == 1
+
+ # type in event should be gather
+ with open(inventory.file) as f:
+ data = json.load(f)
+
+ assert data["event"]["action"] == ACTION_GATHERED
+
+ @pytest.mark.asyncio
+ async def test_ws_text(self, inventory):
+ result = await inventory.check_and_update()
+ assert result == True
diff --git a/dcae-services-policy-sync/tox.ini b/dcae-services-policy-sync/tox.ini
new file mode 100644
index 0000000..17235c7
--- /dev/null
+++ b/dcae-services-policy-sync/tox.ini
@@ -0,0 +1,42 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+# content of: tox.ini , put in same dir as setup.py
+[tox]
+envlist = py36, py38
+
+[testenv]
+deps=
+ pytest
+ coverage
+ pytest-cov
+ pytest-asyncio
+ pytest-aiohttp
+setenv =
+ PYTHONPATH={toxinidir}
+ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
+recreate = True
+commands=
+ python --version
+ pytest -s --cov policysync --cov-report=xml --cov-report=term tests --verbose
+
+[testenv:lint]
+deps =
+ flake8
+ pylint
+commands =
+ flake8 policysync
+ pylint policysync \ No newline at end of file
diff --git a/mvn-phase-script.sh b/mvn-phase-script.sh
index 0e9c8f4..71df21a 100755
--- a/mvn-phase-script.sh
+++ b/mvn-phase-script.sh
@@ -62,6 +62,27 @@ compile)
;;
test)
echo "==> test phase script"
+ case $MVN_PROJECT_MODULEID in
+ dcae-services-policy-sync)
+ set -e -x
+ CURDIR=$(pwd)
+ TOXINIS=$(find . -name "tox.ini")
+ for TOXINI in "${TOXINIS[@]}"; do
+ DIR=$(echo "$TOXINI" | rev | cut -f2- -d'/' | rev)
+ cd "${CURDIR}/${DIR}"
+ rm -rf ./venv-tox ./.tox
+ virtualenv ./venv-tox
+ source ./venv-tox/bin/activate
+ pip install pip==20.3.4
+ pip install --upgrade argparse
+ pip install tox
+ pip freeze
+ tox
+ deactivate
+ rm -rf ./venv-tox ./.tox
+ done
+ ;;
+ esac
;;
package)
echo "==> package phase script"
diff --git a/pom.xml b/pom.xml
index f27ea85..5130161 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,7 @@ limitations under the License.
<module>healthcheck-container</module>
<module>tls-init-container</module>
<module>consul-loader-container</module>
+ <module>dcae-services-policy-sync</module>
<!--<module>multisite-init-container</module> -->
<module>dcae-k8s-cleanup-container</module>
</modules>