diff options
Diffstat (limited to 'dcae-services-policy-sync/policysync/inventory.py')
-rw-r--r-- | dcae-services-policy-sync/policysync/inventory.py | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/dcae-services-policy-sync/policysync/inventory.py b/dcae-services-policy-sync/policysync/inventory.py new file mode 100644 index 0000000..0eb91b5 --- /dev/null +++ b/dcae-services-policy-sync/policysync/inventory.py @@ -0,0 +1,169 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" In memory data store for policies which are currently used by a mS """ +import asyncio +import json +import uuid +import os +import tempfile +import aiohttp +from datetime import datetime +from .util import get_module_logger + +logger = get_module_logger(__name__) + +ACTION_GATHERED = "gathered" +ACTION_UPDATED = "updated" +OUTFILE_INDENT = 4 + + +class Inventory: + """ In memory data store for policies which are currently used by a mS """ + def __init__(self, filters, ids, outfile, client): + self.policy_filters = filters + self.policy_ids = ids + self.hp_active_inventory = set() + self.get_lock = asyncio.Lock() + self.file = outfile + self.queue = asyncio.Queue() + self.client = client + + async def gather(self): + """ + Run at startup to gather an initial inventory of policies + """ + return await self._sync_inventory(ACTION_GATHERED) + + async def update(self): + """ + Run to update an inventory of policies on the fly + """ + return await self._sync_inventory(ACTION_UPDATED) + + async def check_and_update(self): + """ check and update the policy inventory """ + return await self.update() + + async def close(self): + """ close the policy inventory and its associated client """ + await self.client.close() + + def _atomic_dump(self, data): + """ atomically dump the policy content to a file by rename """ + try: + temp_file = tempfile.NamedTemporaryFile( + delete=False, + dir=os.path.dirname(self.file), + prefix=os.path.basename(self.file), + mode="w", + ) + try: + temp_file.write(data) + finally: + # fsync the file so its on disk + temp_file.flush() + os.fsync(temp_file.fileno()) + finally: + temp_file.close() + + os.rename(temp_file.name, os.path.abspath(self.file)) + + async def get_policy_content(self, action=ACTION_UPDATED): + """ + get the policy content off the PDP + :param action: what action to present + :returns: True/False depending on if update was successful + """ + logger.info("Starting policy update process...") + try: + policy_bodies = await self.client.get_config( + filters=self.policy_filters, ids=self.policy_ids + ) + except aiohttp.ClientError: + logger.exception('Conncection Error while connecting to PDP') + return False + + # match the format a bit of the Config Binding Service + out = { + "policies": {"items": policy_bodies}, + "event": { + "action": action, + "timestamp": (datetime.utcnow().isoformat()[:-3] + "Z"), + "update_id": str(uuid.uuid4()), + "policies_count": len(policy_bodies), + }, + } + + # Atomically dump the file to disk + tmp = { + x.get("policyName") for x in policy_bodies if "policyName" in x + } + + if tmp != self.hp_active_inventory: + data = json.dumps(out) + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._atomic_dump, data) + logger.info( + "Update complete. Policies dumped to: %s", self.file + ) + self.hp_active_inventory = tmp + return True + else: + logger.info("No updates needed for now") + return False + + async def _sync_inventory(self, action): + """ + Pull an inventory of policies. Commit changes if there is a change. + return: boolean to represent whether changes were commited + """ + try: + pdp_inventory = await self.client.list_policies( + filters=self.policy_filters, ids=self.policy_ids + ) + except aiohttp.ClientError: + logger.exception("Inventory sync failed due to a connection error") + return False + + logger.debug("pdp_inventory -> %s", pdp_inventory) + + # Below needs to be under a lock because of + # the call to getConfig being awaited. + async with self.get_lock: + if self.hp_active_inventory != pdp_inventory or \ + pdp_inventory is None: + + # Log a delta of what has changed related to this policy update + if pdp_inventory is not None and \ + self.hp_active_inventory is not None: + msg = { + "removed": list( + self.hp_active_inventory - pdp_inventory + ), + "added": list( + pdp_inventory - self.hp_active_inventory + ), + } + logger.info( + "PDP indicates the following changes: %s ", msg + ) + + return await self.get_policy_content(action) + + logger.info( + "local matches pdp. no update required for now" + ) + return False |