diff options
Diffstat (limited to 'dcae-services-policy-sync/tests/test_client_v0.py')
-rw-r--r-- | dcae-services-policy-sync/tests/test_client_v0.py | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/dcae-services-policy-sync/tests/test_client_v0.py b/dcae-services-policy-sync/tests/test_client_v0.py new file mode 100644 index 0000000..6ca590e --- /dev/null +++ b/dcae-services-policy-sync/tests/test_client_v0.py @@ -0,0 +1,191 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from aiohttp import web, WSMsgType +import json, pytest, re +from policysync.clients import ( + PolicyClientV0 as PolicyClient, + WS_HEARTBEAT +) + + +async def listpolicy(request): + return web.json_response(["hello"]) + + +async def getconfig(request): + j = [ + { + "policyConfigMessage": "Config Retrieved!", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": '{"service":"DCAE_HighlandPark_AgingConfig","location":" Edge","uuid":"TestUUID","policyName":"DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging","configName":"DCAE_HighlandPark_AgingConfig","templateVersion":"1607","priority":"4","version":11.0,"policyScope":"resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8","riskType":"test","riskLevel":"2","guard":"False","content":{"signature":{"filter_clause":"event.faultFields.alarmCondition LIKE(\'%chrisluckenbaugh%\')"},"name":"testAging","context":["PROD"],"priority":1,"prePublishAging":40,"preCorrelationAging":20},"policyNameWithPrefix":"DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging"}', + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + { + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": "adlskjfadslkjf", + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + ] + + return web.json_response(j) + + +async def wshandler(request): + resp = web.WebSocketResponse() + available = resp.can_prepare(request) + await resp.prepare(request) + await resp.send_str('{ "loadedPolicies": [{ "policyName": "bar"}] }') + await resp.send_bytes(b"bar!!!") + await resp.close("closed") + + +@pytest.fixture +def policyclient(aiohttp_client, loop): + app = web.Application() + app.router.add_route("POST", "/pdp/api/listPolicy", listpolicy) + app.router.add_route("POST", "/pdp/api/getConfig", getconfig) + app.router.add_get("/pdp/notifications", wshandler) + fake_client = loop.run_until_complete(aiohttp_client(app)) + server = "{}://{}:{}".format("http", fake_client.host, fake_client.port) + return PolicyClient({}, server) + + +async def test_listpolicies(policyclient): + j = await policyclient.list_policies(filters=["bar"]) + assert j == set(["hello"]) + await policyclient.close() + assert policyclient.session.closed + + +async def test_getconfig(policyclient): + j = await policyclient.get_config(filters=["bar"]) + + assert j == [ + { + "policyConfigMessage": "Config Retrieved!", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": { + "service": "DCAE_HighlandPark_AgingConfig", + "location": " Edge", + "uuid": "TestUUID", + "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging", + "configName": "DCAE_HighlandPark_AgingConfig", + "templateVersion": "1607", + "priority": "4", + "version": 11.0, + "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8", + "riskType": "test", + "riskLevel": "2", + "guard": "False", + "content": { + "signature": { + "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')" + }, + "name": "testAging", + "context": ["PROD"], + "priority": 1, + "prePublishAging": 40, + "preCorrelationAging": 20, + }, + "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging", + }, + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + { + "policyConfigMessage": "Config Retrieved! ", + "policyConfigStatus": "CONFIG_RETRIEVED", + "type": "JSON", + "config": "adlskjfadslkjf", + "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml", + "policyType": "MicroService", + "policyVersion": "78", + "matchingConditions": { + "ECOMPName": "DCAE", + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "uuid": "TestUUID", + "Location": " Edge", + }, + "responseAttributes": {}, + "property": None, + }, + ] + await policyclient.close() + + +async def test_supports_notifications(policyclient): + assert policyclient.supports_notifications() + + +async def test_needs_update(policyclient): + assert policyclient._needs_update( + {"loadedPolicies": [{"policyName": "bar"}]}, [], ["bar"] + ) + assert not policyclient._needs_update( + {"loadedPolicies": [{"policyName": "bar"}]}, [], ["foo"] + ) + + +async def test_ws(policyclient): + async def ws_callback(): + assert True + + await policyclient.notificationhandler(ws_callback, filters=["bar"]) + await policyclient.close() + + assert policyclient.ws_session.closed |