# ============LICENSE_START======================================================= # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. """test of the package for policy-handler of DCAE-Controller""" import base64 import copy import json import re import time import uuid import cherrypy from cherrypy.test.helper import CPWebCase import pytest from policyhandler.config import Config from policyhandler.deploy_handler import DeployHandler from policyhandler.discovery import DiscoveryClient from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode) from policyhandler.policy_consts import (LATEST_POLICIES, POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME, POLICY_VERSION, POLICY_VERSIONS) from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER, REMOVED_POLICIES, PolicyReceiver) from policyhandler.policy_rest import PolicyRest from policyhandler.policy_utils import PolicyUtils, Utils from policyhandler.web_server import _PolicyWeb from .mock_settings import MonkeyedResponse, Settings Settings.init() class MonkeyPolicyBody(object): """policy body that policy-engine returns""" @staticmethod def create_policy_body(policy_id, policy_version=1): """returns a fake policy-body""" prev_ver = str(policy_version - 1) this_ver = str(policy_version) config = { "policy_updated_from_ver": prev_ver, "policy_updated_to_ver": this_ver, "policy_hello": "world!", "policy_updated_ts": Settings.RUN_TS, "updated_policy_id": policy_id } return { "policyConfigMessage": "Config Retrieved! ", "policyConfigStatus": "CONFIG_RETRIEVED", "type": "JSON", POLICY_NAME: "{0}.{1}.xml".format(policy_id, this_ver), POLICY_VERSION: this_ver, POLICY_CONFIG: json.dumps(config), "matchingConditions": { "ONAPName": "DCAE", "ConfigName": "alex_config_name" }, "responseAttributes": {}, "property": None } class MockPolicyEngine(object): """pretend this is the policy-engine""" scope_prefix = "test_scope_prefix.Config_" LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur ametist""".split() LONG_TEXT = "0123456789" * 100 _policies = [] @staticmethod def init(): """init static vars""" MockPolicyEngine._policies = [ MonkeyPolicyBody.create_policy_body( MockPolicyEngine.scope_prefix + policy_id, policy_index + 1) for policy_id in MockPolicyEngine.LOREM_IPSUM for policy_index in range(1 + MockPolicyEngine.LOREM_IPSUM.index(policy_id))] Settings.logger.info("MockPolicyEngine._policies: %s", json.dumps(MockPolicyEngine._policies)) @staticmethod def get_config(policy_name): """find policy the way the policy-engine finds""" if not policy_name: return [] return [copy.deepcopy(policy) for policy in MockPolicyEngine._policies if re.match(policy_name, policy[POLICY_NAME])] @staticmethod def get_configs_all(): """get all policies the way the policy-engine finds""" policies = [copy.deepcopy(policy) for policy in MockPolicyEngine._policies] for policy in policies: policy["config"] = MockPolicyEngine.LONG_TEXT return policies @staticmethod def get_policy_id(policy_index): """get the policy_id by index""" return (MockPolicyEngine.scope_prefix + MockPolicyEngine.LOREM_IPSUM[ policy_index % len(MockPolicyEngine.LOREM_IPSUM)]) @staticmethod def gen_policy_latest(policy_index, version_offset=0): """generate the policy response by policy_index = version - 1""" policy_id = MockPolicyEngine.get_policy_id(policy_index) policy = { POLICY_ID: policy_id, POLICY_BODY: MonkeyPolicyBody.create_policy_body( policy_id, policy_index + 1 - version_offset) } return policy_id, PolicyUtils.parse_policy_config(policy) @staticmethod def gen_all_policies_latest(version_offset=0): """generate all latest policies""" return dict(MockPolicyEngine.gen_policy_latest(policy_index, version_offset=version_offset) for policy_index in range(len(MockPolicyEngine.LOREM_IPSUM))) @staticmethod def gen_policies_latest(match_to_policy_name): """generate all latest policies""" return dict((k, v) for k, v in MockPolicyEngine.gen_all_policies_latest().items() if re.match(match_to_policy_name, k)) MockPolicyEngine.init() class MockDeploymentHandler(object): """pretend this is the deployment-handler""" @staticmethod def default_response(): """generate the deployed policies message""" return {"server_instance_uuid": Settings.deploy_handler_instance_uuid} @staticmethod def get_deployed_policies(): """generate the deployed policies message""" response = MockDeploymentHandler.default_response() policies = dict( (policy_id, { POLICY_ID: policy_id, POLICY_VERSIONS: {policy.get(POLICY_BODY, {}).get(POLICY_VERSION, "999"): True}, "pending_update": False}) for policy_id, policy in (MockPolicyEngine.gen_all_policies_latest(version_offset=1) .items())) response["policies"] = policies return response @pytest.fixture() def fix_pdp_post(monkeypatch): """monkeyed request /getConfig to PDP""" def monkeyed_policy_rest_post(full_path, json=None, headers=None, **custom_kwargs): """monkeypatch for the POST to policy-engine""" res_json = MockPolicyEngine.get_config(json.get(POLICY_NAME)) return MonkeyedResponse(full_path, res_json, json, headers) Settings.logger.info("setup fix_pdp_post") PolicyRest._lazy_init() monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', monkeyed_policy_rest_post) yield fix_pdp_post # provide the fixture value Settings.logger.info("teardown fix_pdp_post") @pytest.fixture() def fix_pdp_post_big(monkeypatch): """monkeyed request /getConfig to PDP""" def monkeyed_policy_rest_post(full_path, json=None, headers=None, **custom_kwargs): """monkeypatch for the POST to policy-engine""" res_json = MockPolicyEngine.get_configs_all() return MonkeyedResponse(full_path, res_json, json, headers) Settings.logger.info("setup fix_pdp_post_big") PolicyRest._lazy_init() monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', monkeyed_policy_rest_post) yield fix_pdp_post_big # provide the fixture value Settings.logger.info("teardown fix_pdp_post_big") class MockException(Exception): """mock exception""" pass @pytest.fixture() def fix_pdp_post_boom(monkeypatch): """monkeyed request /getConfig to PDP - exception""" def monkeyed_policy_rest_post_boom(full_path, json=None, headers=None, **custom_kwargs): """monkeypatch for the POST to policy-engine""" raise MockException("fix_pdp_post_boom") Settings.logger.info("setup fix_pdp_post_boom") PolicyRest._lazy_init() monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', monkeyed_policy_rest_post_boom) yield fix_pdp_post_boom Settings.logger.info("teardown fix_pdp_post_boom") @staticmethod def monkeyed_boom(*args, **kwargs): """monkeypatch for the select_latest_policies""" raise MockException("monkeyed_boom") @pytest.fixture() def fix_select_latest_policies_boom(monkeypatch): """monkeyed exception""" Settings.logger.info("setup fix_select_latest_policies_boom") monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.select_latest_policies', monkeyed_boom) monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.select_latest_policy', monkeyed_boom) monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.extract_policy_id', monkeyed_boom) yield fix_select_latest_policies_boom Settings.logger.info("teardown fix_select_latest_policies_boom") @pytest.fixture() def fix_discovery(monkeypatch): """monkeyed discovery request.get""" def monkeyed_discovery(full_path): """monkeypatch for get from consul""" res_json = {} if full_path == DiscoveryClient.CONSUL_SERVICE_MASK.format( Config.discovered_config.get_by_key(Config.DEPLOY_HANDLER)): res_json = [{ "ServiceAddress": "1.1.1.1", "ServicePort": "123" }] elif full_path == DiscoveryClient.CONSUL_KV_MASK.format(Config.system_name): res_json = [{"Value": base64.b64encode( json.dumps(Settings.mock_config).encode()).decode("utf-8")}] return MonkeyedResponse(full_path, res_json) Settings.logger.info("setup fix_discovery") monkeypatch.setattr('policyhandler.discovery.requests.get', monkeyed_discovery) yield fix_discovery # provide the fixture value Settings.logger.info("teardown fix_discovery") @pytest.fixture() def fix_deploy_handler(monkeypatch): """monkeyed requests to deployment-handler""" def monkeyed_deploy_handler_put(full_path, json=None, headers=None, params=None, **custom_kwargs): """monkeypatch for policy-update request.put to deploy_handler""" return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(), json, headers) def monkeyed_deploy_handler_get(full_path, headers=None, params=None, **custom_kwargs): """monkeypatch policy-update request.get to deploy_handler""" return MonkeyedResponse(full_path, MockDeploymentHandler.get_deployed_policies(), None, headers) Settings.logger.info("setup fix_deploy_handler") audit = Audit(req_message="fix_deploy_handler") DeployHandler._lazy_init(audit) monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put', monkeyed_deploy_handler_put) monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get', monkeyed_deploy_handler_get) yield fix_deploy_handler # provide the fixture value audit.audit_done("teardown") Settings.logger.info("teardown fix_deploy_handler") @pytest.fixture() def fix_deploy_handler_fail(monkeypatch): """monkeyed failed discovery request.get""" def monkeyed_deploy_handler_put(full_path, json=None, headers=None, params=None, **custom_kwargs): """monkeypatch for deploy_handler""" res = MonkeyedResponse( full_path, {"server_instance_uuid": Settings.deploy_handler_instance_uuid}, json, headers ) res.status_code = 413 return res def monkeyed_deploy_handler_get(full_path, headers=None, params=None, **custom_kwargs): """monkeypatch policy-update request.get to deploy_handler""" return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(), None, headers) @staticmethod def monkeyed_deploy_handler_init(audit_ignore): """monkeypatch for deploy_handler init""" DeployHandler._url = None Settings.logger.info("setup fix_deploy_handler_fail") config_catch_up = Config.discovered_config.get_by_key("catch_up") Config.discovered_config.update("catch_up", {"interval": 1}) audit = Audit(req_message="fix_deploy_handler_fail") DeployHandler._lazy_init(audit) monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._lazy_init', monkeyed_deploy_handler_init) monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put', monkeyed_deploy_handler_put) monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get', monkeyed_deploy_handler_get) yield fix_deploy_handler_fail audit.audit_done("teardown") Settings.logger.info("teardown fix_deploy_handler_fail") Config.discovered_config.update("catch_up", config_catch_up) @pytest.fixture() def fix_cherrypy_engine_exit(monkeypatch): """monkeyed cherrypy.engine.exit()""" Settings.logger.info("setup fix_cherrypy_engine_exit") def monkeyed_cherrypy_engine_exit(): """monkeypatch for deploy_handler""" Settings.logger.info("cherrypy_engine_exit()") monkeypatch.setattr('policyhandler.web_server.cherrypy.engine.exit', monkeyed_cherrypy_engine_exit) yield fix_cherrypy_engine_exit # provide the fixture value Settings.logger.info("teardown fix_cherrypy_engine_exit") class MonkeyedWebSocket(object): """Monkey websocket""" on_message = None @staticmethod def send_notification(updated_indexes): """fake notification through the web-socket""" if not MonkeyedWebSocket.on_message: return message = { LOADED_POLICIES: [ {POLICY_NAME: "{0}.{1}.xml".format( MockPolicyEngine.get_policy_id(policy_index), policy_index + 1), POLICY_VER: str(policy_index + 1)} for policy_index in updated_indexes or [] ], REMOVED_POLICIES : [] } message = json.dumps(message) Settings.logger.info("send_notification: %s", message) MonkeyedWebSocket.on_message(None, message) @staticmethod def enableTrace(yes_no): """ignore""" pass class MonkeyedSocket(object): """Monkey websocket""" def __init__(self): self.connected = True class WebSocketApp(object): """Monkeyed WebSocketApp""" def __init__(self, web_socket_url, on_message=None, on_close=None, on_error=None): self.web_socket_url = web_socket_url self.on_message = MonkeyedWebSocket.on_message = on_message self.on_close = on_close self.on_error = on_error self.sock = MonkeyedWebSocket.MonkeyedSocket() Settings.logger.info("MonkeyedWebSocket for: %s", self.web_socket_url) def run_forever(self): """forever in the loop""" counter = 0 while self.sock.connected: counter += 1 Settings.logger.info("MonkeyedWebSocket sleep %s...", counter) time.sleep(5) Settings.logger.info("MonkeyedWebSocket exit %s", counter) def close(self): """close socket""" self.sock.connected = False @pytest.fixture() def fix_policy_receiver_websocket(monkeypatch): """monkeyed websocket for policy_receiver""" Settings.logger.info("setup fix_policy_receiver_websocket") monkeypatch.setattr('policyhandler.policy_receiver.websocket', MonkeyedWebSocket) yield fix_policy_receiver_websocket # provide the fixture value Settings.logger.info("teardown fix_policy_receiver_websocket") def test_get_policy_latest(fix_pdp_post, fix_discovery): """test /policy_latest/""" policy_id, expected_policy = MockPolicyEngine.gen_policy_latest(3) audit = Audit(job_name="test_get_policy_latest", req_message="get /policy_latest/{0}".format(policy_id or "")) policy_latest = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {} audit.audit_done(result=json.dumps(policy_latest)) Settings.logger.info("expected_policy: %s", json.dumps(expected_policy)) Settings.logger.info("policy_latest: %s", json.dumps(policy_latest)) assert Utils.are_the_same(policy_latest, expected_policy) @pytest.mark.usefixtures("fix_pdp_post", "fix_discovery") class WebServerTest(CPWebCase): """testing the web-server - runs tests in alphabetical order of method names""" def setup_server(): """setup the web-server""" cherrypy.tree.mount(_PolicyWeb(), '/') setup_server = staticmethod(setup_server) def test_web_healthcheck(self): """test /healthcheck""" result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) Settings.logger.info("got healthcheck: %s", self.body) self.assertStatus('200 OK') def test_web_policy_latest(self): """test /policy_latest/""" policy_id, expected_policy = MockPolicyEngine.gen_policy_latest(3) self.getPage("/policy_latest/{0}".format(policy_id or "")) self.assertStatus('200 OK') policy_latest = json.loads(self.body) Settings.logger.info("policy_latest: %s", self.body) Settings.logger.info("expected_policy: %s", json.dumps(expected_policy)) assert Utils.are_the_same(policy_latest, expected_policy) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) @pytest.mark.usefixtures("fix_deploy_handler") def test_web_all_policies_latest(self): """test GET /policies_latest""" expected_policies = MockPolicyEngine.gen_all_policies_latest() result = self.getPage("/policies_latest") Settings.logger.info("result: %s", result) Settings.logger.info("body: %s", self.body) self.assertStatus('200 OK') policies_latest = json.loads(self.body) self.assertIn(LATEST_POLICIES, policies_latest) policies_latest = policies_latest[LATEST_POLICIES] Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) assert Utils.are_the_same(policies_latest, expected_policies) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) def test_web_policies_latest(self): """test POST /policies_latest with policyName""" match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*" expected_policies = MockPolicyEngine.gen_policies_latest(match_to_policy_name) body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', body=body, headers=[ (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())), ("Content-Type", "application/json"), ('Content-Length', str(len(body))) ]) Settings.logger.info("result: %s", result) Settings.logger.info("body: %s", self.body) self.assertStatus('200 OK') policies_latest = json.loads(self.body)[LATEST_POLICIES] Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) assert Utils.are_the_same(policies_latest, expected_policies) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") def test_zzz_policy_updates_and_catch_ups(self): """test run policy handler with policy updates and catchups""" Settings.logger.info("start policy_updates_and_catch_ups") audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups", req_message="start policy_updates_and_catch_ups") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") time.sleep(2) MonkeyedWebSocket.send_notification([1, 3, 5]) Settings.logger.info("sleep after send_notification...") time.sleep(3) Settings.logger.info("sleep 30 before shutdown...") time.sleep(30) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) PolicyReceiver.shutdown(audit) time.sleep(1) @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") def test_zzz_catch_up_on_deploy_handler_changed(self): """test run policy handler with deployment-handler changed underneath""" Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed") audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed", req_message="start zzz_catch_up_on_deploy_handler_changed") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") time.sleep(2) MonkeyedWebSocket.send_notification([1]) Settings.logger.info("sleep after send_notification...") time.sleep(3) Settings.deploy_handler_instance_uuid = str(uuid.uuid4()) Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid) MonkeyedWebSocket.send_notification([2, 4]) Settings.logger.info("sleep after send_notification...") time.sleep(3) Settings.logger.info("sleep 5 before shutdown...") time.sleep(5) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) PolicyReceiver.shutdown(audit) time.sleep(1) @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") def test_zzz_get_catch_up(self): """test /catch_up""" Settings.logger.info("start /catch_up") audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up") PolicyReceiver.run(audit) time.sleep(5) result = self.getPage("/catch_up") Settings.logger.info("catch_up result: %s", result) self.assertStatus('200 OK') Settings.logger.info("got catch_up: %s", self.body) Settings.logger.info("sleep 5 before shutdown...") time.sleep(5) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) PolicyReceiver.shutdown(audit) time.sleep(1) @pytest.mark.usefixtures( "fix_deploy_handler", "fix_policy_receiver_websocket", "fix_cherrypy_engine_exit") def test_zzzzz_shutdown(self): """test shutdown""" Settings.logger.info("start shutdown") audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") time.sleep(2) MonkeyedWebSocket.send_notification([1, 3, 5]) Settings.logger.info("sleep after send_notification...") time.sleep(3) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) WebServerTest.do_gc_test = False Settings.logger.info("shutdown...") audit.audit_done("shutdown") result = self.getPage("/shutdown") Settings.logger.info("shutdown result: %s", result) self.assertStatus('200 OK') Settings.logger.info("got shutdown: %s", self.body) time.sleep(1) @pytest.mark.usefixtures("fix_pdp_post_boom", "fix_discovery") class WebServerPDPBoomTest(CPWebCase): """testing the web-server - runs tests in alphabetical order of method names""" def setup_server(): """setup the web-server""" cherrypy.tree.mount(_PolicyWeb(), '/') setup_server = staticmethod(setup_server) def test_web_healthcheck(self): """test /healthcheck""" result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) Settings.logger.info("got healthcheck: %s", self.body) self.assertStatus('200 OK') def test_web_policy_latest(self): """test /policy_latest/""" policy_id, _ = MockPolicyEngine.gen_policy_latest(3) self.getPage("/policy_latest/{0}".format(policy_id or "")) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) @pytest.mark.usefixtures("fix_deploy_handler") def test_web_all_policies_latest(self): """test GET /policies_latest""" result = self.getPage("/policies_latest") Settings.logger.info("result: %s", result) Settings.logger.info("body: %s", self.body) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) def test_web_policies_latest(self): """test POST /policies_latest with policyName""" match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*" body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', body=body, headers=[ (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())), ("Content-Type", "application/json"), ('Content-Length', str(len(body))) ]) Settings.logger.info("result: %s", result) Settings.logger.info("body: %s", self.body) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") def test_zzz_policy_updates_and_catch_ups(self): """test run policy handler with policy updates and catchups""" Settings.logger.info("start policy_updates_and_catch_ups") audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups", req_message="start policy_updates_and_catch_ups") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") time.sleep(2) MonkeyedWebSocket.send_notification([1, 3, 5]) Settings.logger.info("sleep after send_notification...") time.sleep(3) Settings.logger.info("sleep 30 before shutdown...") time.sleep(30) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) PolicyReceiver.shutdown(audit) time.sleep(1) @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") def test_zzz_catch_up_on_deploy_handler_changed(self): """test run policy handler with deployment-handler changed underneath""" Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed") audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed", req_message="start zzz_catch_up_on_deploy_handler_changed") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") time.sleep(2) MonkeyedWebSocket.send_notification([1]) Settings.logger.info("sleep after send_notification...") time.sleep(3) Settings.deploy_handler_instance_uuid = str(uuid.uuid4()) Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid) MonkeyedWebSocket.send_notification([2, 4]) Settings.logger.info("sleep after send_notification...") time.sleep(3) Settings.logger.info("sleep 5 before shutdown...") time.sleep(5) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) PolicyReceiver.shutdown(audit) time.sleep(1) @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") def test_zzz_get_catch_up(self): """test /catch_up""" Settings.logger.info("start /catch_up") audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up") PolicyReceiver.run(audit) time.sleep(5) result = self.getPage("/catch_up") Settings.logger.info("catch_up result: %s", result) self.assertStatus('200 OK') Settings.logger.info("got catch_up: %s", self.body) Settings.logger.info("sleep 5 before shutdown...") time.sleep(5) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) PolicyReceiver.shutdown(audit) time.sleep(1) @pytest.mark.usefixtures( "fix_deploy_handler", "fix_policy_receiver_websocket", "fix_cherrypy_engine_exit") def test_zzzzz_shutdown(self): """test shutdown""" Settings.logger.info("start shutdown") audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") time.sleep(2) MonkeyedWebSocket.send_notification([1, 3, 5]) Settings.logger.info("sleep after send_notification...") time.sleep(3) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) WebServerPDPBoomTest.do_gc_test = False Settings.logger.info("shutdown...") audit.audit_done("shutdown") result = self.getPage("/shutdown") Settings.logger.info("shutdown result: %s", result) self.assertStatus('200 OK') Settings.logger.info("got shutdown: %s", self.body) time.sleep(1) @pytest.mark.usefixtures("fix_pdp_post", "fix_select_latest_policies_boom", "fix_discovery") class WebServerInternalBoomTest(CPWebCase): """testing the web-server - runs tests in alphabetical order of method names""" def setup_server(): """setup the web-server""" cherrypy.tree.mount(_PolicyWeb(), '/') setup_server = staticmethod(setup_server) def test_web_healthcheck(self): """test /healthcheck""" result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) Settings.logger.info("got healthcheck: %s", self.body) self.assertStatus('200 OK') def test_web_policy_latest(self): """test /policy_latest/""" policy_id, _ = MockPolicyEngine.gen_policy_latest(3) self.getPage("/policy_latest/{0}".format(policy_id or "")) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) @pytest.mark.usefixtures("fix_deploy_handler") def test_web_all_policies_latest(self): """test GET /policies_latest""" result = self.getPage("/policies_latest") Settings.logger.info("result: %s", result) Settings.logger.info("body: %s", self.body) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) def test_web_policies_latest(self): """test POST /policies_latest with policyName""" match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*" body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', body=body, headers=[ (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())), ("Content-Type", "application/json"), ('Content-Length', str(len(body))) ]) Settings.logger.info("result: %s", result) Settings.logger.info("body: %s", self.body) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") def test_zzz_policy_updates_and_catch_ups(self): """test run policy handler with policy updates and catchups""" Settings.logger.info("start policy_updates_and_catch_ups") audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups", req_message="start policy_updates_and_catch_ups") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") time.sleep(2) MonkeyedWebSocket.send_notification([1, 3, 5]) Settings.logger.info("sleep after send_notification...") time.sleep(3) Settings.logger.info("sleep 30 before shutdown...") time.sleep(30) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) PolicyReceiver.shutdown(audit) time.sleep(1) @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") def test_zzz_catch_up_on_deploy_handler_changed(self): """test run policy handler with deployment-handler changed underneath""" Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed") audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed", req_message="start zzz_catch_up_on_deploy_handler_changed") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") time.sleep(2) MonkeyedWebSocket.send_notification([1]) Settings.logger.info("sleep after send_notification...") time.sleep(3) Settings.deploy_handler_instance_uuid = str(uuid.uuid4()) Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid) MonkeyedWebSocket.send_notification([2, 4]) Settings.logger.info("sleep after send_notification...") time.sleep(3) Settings.logger.info("sleep 5 before shutdown...") time.sleep(5) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) PolicyReceiver.shutdown(audit) time.sleep(1) @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") def test_zzz_get_catch_up(self): """test /catch_up""" Settings.logger.info("start /catch_up") audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up") PolicyReceiver.run(audit) time.sleep(5) result = self.getPage("/catch_up") Settings.logger.info("catch_up result: %s", result) self.assertStatus('200 OK') Settings.logger.info("got catch_up: %s", self.body) Settings.logger.info("sleep 5 before shutdown...") time.sleep(5) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) PolicyReceiver.shutdown(audit) time.sleep(1) @pytest.mark.usefixtures( "fix_deploy_handler", "fix_policy_receiver_websocket", "fix_cherrypy_engine_exit") def test_zzzzz_shutdown(self): """test shutdown""" Settings.logger.info("start shutdown") audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown") PolicyReceiver.run(audit) Settings.logger.info("sleep before send_notification...") time.sleep(2) MonkeyedWebSocket.send_notification([1, 3, 5]) Settings.logger.info("sleep after send_notification...") time.sleep(3) result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) WebServerInternalBoomTest.do_gc_test = False Settings.logger.info("shutdown...") audit.audit_done("shutdown") result = self.getPage("/shutdown") Settings.logger.info("shutdown result: %s", result) self.assertStatus('200 OK') Settings.logger.info("got shutdown: %s", self.body) time.sleep(1) @pytest.mark.usefixtures( "fix_pdp_post_big", "fix_deploy_handler_fail", "fix_policy_receiver_websocket", "fix_discovery" ) def test_catch_ups_failed_dh(): """test run policy handler with catchups and failed deployment-handler""" Settings.logger.info("start test_catch_ups_failed_dh") audit = Audit(job_name="test_catch_ups_failed_dh", req_message="start test_catch_ups_failed_dh") PolicyReceiver.run(audit) Settings.logger.info("sleep 50 before shutdown...") time.sleep(50) health = audit.health(full=True) audit.audit_done(result=json.dumps(health)) Settings.logger.info("healthcheck: %s", json.dumps(health)) assert bool(health) PolicyReceiver.shutdown(audit) time.sleep(1) health = audit.health(full=True) Settings.logger.info("healthcheck: %s", json.dumps(health))