diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/__init__.py | 21 | ||||
-rw-r--r-- | tests/mock_settings.py | 59 | ||||
-rw-r--r-- | tests/test_policyhandler.py | 962 | ||||
-rw-r--r-- | tests/test_step_timer.py | 209 | ||||
-rw-r--r-- | tests/test_zzz_memory.py | 117 |
5 files changed, 1275 insertions, 93 deletions
diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..1875bf6 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,21 @@ +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + + +# empty __init__.py so that pytest can add correct path to coverage report, -- per pytest +# best practice guideline diff --git a/tests/mock_settings.py b/tests/mock_settings.py new file mode 100644 index 0000000..28bc6cc --- /dev/null +++ b/tests/mock_settings.py @@ -0,0 +1,59 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +"""settings that are general to all tests""" + +import json +import logging +import sys +import uuid +from datetime import datetime + +from policyhandler import LogWriter +from policyhandler.config import Config +from policyhandler.onap.audit import Audit + + +class Settings(object): + """init all locals""" + logger = None + RUN_TS = datetime.utcnow().isoformat()[:-3] + 'Z' + dicovered_config = None + deploy_handler_instance_uuid = str(uuid.uuid4()) + + @staticmethod + def init(): + """init configs""" + Config.load_from_file() + + with open("etc_upload/config.json", 'r') as config_json: + Settings.dicovered_config = json.load(config_json) + + Config.load_from_file("etc_upload/config.json") + + Config.settings["catch_up"] = {"interval": 10, "max_skips": 2} + + Settings.logger = logging.getLogger("policy_handler.unit_test") + sys.stdout = LogWriter(Settings.logger.info) + sys.stderr = LogWriter(Settings.logger.error) + + print("print ========== run_policy_handler ==========") + Settings.logger.info("========== run_policy_handler ==========") + Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) + + Settings.logger.info("starting policy_handler with config:") + Settings.logger.info(Audit.log_json_dumps(Config.settings)) diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py index 61ca00c..ec0b030 100644 --- a/tests/test_policyhandler.py +++ b/tests/test_policyhandler.py @@ -1,7 +1,5 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,45 +16,84 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -import sys +"""test of the package for policy-handler of DCAE-Controller""" + +import copy import json import re -import logging -from datetime import datetime +import time +import uuid -# import pytest +import pytest + +import cherrypy +from cherrypy.test.helper import CPWebCase from policyhandler.config import Config -from policyhandler.policy_handler import LogWriter -from policyhandler.onap.audit import Audit -from policyhandler.policy_rest import PolicyRest, PolicyUtils -from policyhandler.policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, \ - POLICY_BODY, POLICY_CONFIG +from policyhandler.deploy_handler import DeployHandler +from policyhandler.discovery import DiscoveryClient +from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, + AuditHttpCode) +from policyhandler.policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, + LATEST_POLICIES, POLICY_BODY, + POLICY_CONFIG, POLICY_ID, POLICY_NAME, + POLICY_VERSION, SCOPE_PREFIXES) +from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER, + REMOVED_POLICIES, PolicyReceiver) +from policyhandler.policy_rest import PolicyRest +from policyhandler.policy_utils import PolicyUtils, Utils +from policyhandler.web_server import _PolicyWeb -class Settings(object): - """init all locals""" - logger = None - RUN_TS = datetime.utcnow().isoformat()[:-3] + 'Z' +from .mock_settings import Settings - @staticmethod - def init(): - """init locals""" - Config.load_from_file() - Config.load_from_file("etc_upload/config.json") +Settings.init() - Settings.logger = logging.getLogger("policy_handler") - sys.stdout = LogWriter(Settings.logger.info) - sys.stderr = LogWriter(Settings.logger.error) +class MonkeyHttpResponse(object): + """Monkey http reposne""" + def __init__(self, headers): + self.headers = headers or {} - Settings.logger.info("========== run_policy_handler ==========") - Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH) - Settings.logger.info("starting policy_handler with config:") - Settings.logger.info(Audit.log_json_dumps(Config.config)) +class MonkeyedResponse(object): + """Monkey response""" + def __init__(self, full_path, res_json, json_body=None, headers=None): + self.full_path = full_path + self.req_json = json_body or {} + self.status_code = 200 + self.request = MonkeyHttpResponse(headers) + self.res = res_json + self.text = json.dumps(self.res) - PolicyRest._lazy_init() + def json(self): + """returns json of response""" + return self.res + + def raise_for_status(self): + """ignoring""" + pass + + +def monkeyed_discovery(full_path): + """monkeypatch for get from consul""" + res_json = {} + if full_path == DiscoveryClient.CONSUL_SERVICE_MASK.format(Config.settings["deploy_handler"]): + res_json = [{ + "ServiceAddress": "1.1.1.1", + "ServicePort": "123" + }] + elif full_path == DiscoveryClient.CONSUL_KV_MASK.format(Config.get_system_name()): + res_json = copy.deepcopy(Settings.dicovered_config) + return MonkeyedResponse(full_path, res_json) + + +@pytest.fixture() +def fix_discovery(monkeypatch): + """monkeyed discovery request.get""" + Settings.logger.info("setup fix_discovery") + monkeypatch.setattr('policyhandler.discovery.requests.get', monkeyed_discovery) + yield fix_discovery # provide the fixture value + Settings.logger.info("teardown fix_discovery") -Settings.init() class MonkeyPolicyBody(object): """policy body that policy-engine returns""" @@ -80,34 +117,19 @@ class MonkeyPolicyBody(object): POLICY_VERSION: this_ver, POLICY_CONFIG: json.dumps(config), "matchingConditions": { - "ECOMPName": "DCAE", + "ONAPName": "DCAE", "ConfigName": "alex_config_name" }, "responseAttributes": {}, "property": None } - @staticmethod - def is_the_same_dict(policy_body_1, policy_body_2): - """check whether both policy_body objects are the same""" - if not isinstance(policy_body_1, dict) or not isinstance(policy_body_2, dict): - return False - for key in policy_body_1.keys(): - if key not in policy_body_2: - return False - if isinstance(policy_body_1[key], dict): - return MonkeyPolicyBody.is_the_same_dict( - policy_body_1[key], policy_body_2[key]) - if (policy_body_1[key] is None and policy_body_2[key] is not None) \ - or (policy_body_1[key] is not None and policy_body_2[key] is None) \ - or (policy_body_1[key] != policy_body_2[key]): - return False - return True class MonkeyPolicyEngine(object): """pretend this is the policy-engine""" - _scope_prefix = Config.config["scope_prefixes"][0] - LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur""".split() + _scope_prefix = Config.settings["scope_prefixes"][0] + LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur ametist""".split() + LONG_TEXT = "0123456789" * 100 _policies = [] @staticmethod @@ -115,69 +137,823 @@ class MonkeyPolicyEngine(object): """init static vars""" MonkeyPolicyEngine._policies = [ MonkeyPolicyBody.create_policy_body( - MonkeyPolicyEngine._scope_prefix + policy_id, policy_version) + MonkeyPolicyEngine._scope_prefix + policy_id, policy_index + 1) for policy_id in MonkeyPolicyEngine.LOREM_IPSUM - for policy_version in range(1, 1 + MonkeyPolicyEngine.LOREM_IPSUM.index(policy_id))] + for policy_index in range(1 + MonkeyPolicyEngine.LOREM_IPSUM.index(policy_id))] + Settings.logger.info("MonkeyPolicyEngine._policies: %s", + json.dumps(MonkeyPolicyEngine._policies)) @staticmethod def get_config(policy_name): """find policy the way the policy-engine finds""" if not policy_name: return [] - if policy_name[-2:] == ".*": - policy_name = policy_name[:-2] - return [policy for policy in MonkeyPolicyEngine._policies + return [copy.deepcopy(policy) + for policy in MonkeyPolicyEngine._policies if re.match(policy_name, policy[POLICY_NAME])] @staticmethod + def get_configs_all(): + """get all policies the way the policy-engine finds""" + policies = [copy.deepcopy(policy) + for policy in MonkeyPolicyEngine._policies] + for policy in policies: + policy["config"] = MonkeyPolicyEngine.LONG_TEXT + return policies + + @staticmethod def get_policy_id(policy_index): """get the policy_id by index""" - return MonkeyPolicyEngine._scope_prefix \ - + MonkeyPolicyEngine.LOREM_IPSUM[policy_index % len(MonkeyPolicyEngine.LOREM_IPSUM)] + return (MonkeyPolicyEngine._scope_prefix + + MonkeyPolicyEngine.LOREM_IPSUM[ + policy_index % len(MonkeyPolicyEngine.LOREM_IPSUM)]) + + @staticmethod + def gen_policy_latest(policy_index): + """generate the policy response by policy_index = version - 1""" + policy_id = MonkeyPolicyEngine.get_policy_id(policy_index) + expected_policy = { + POLICY_ID: policy_id, + POLICY_BODY: MonkeyPolicyBody.create_policy_body(policy_id, policy_index + 1) + } + return policy_id, PolicyUtils.parse_policy_config(expected_policy) + + @staticmethod + def gen_all_policies_latest(): + """generate all latest policies""" + return { + LATEST_POLICIES: dict(MonkeyPolicyEngine.gen_policy_latest(policy_index) + for policy_index in range(len(MonkeyPolicyEngine.LOREM_IPSUM))), + ERRORED_SCOPES: ["DCAE.Config_*"], + SCOPE_PREFIXES: ["DCAE.Config_*"], + ERRORED_POLICIES: {} + } + + @staticmethod + def gen_policies_latest(match_to_policy_name): + """generate all latest policies""" + return { + LATEST_POLICIES: + dict((k, v) + for k, v in MonkeyPolicyEngine.gen_all_policies_latest() + [LATEST_POLICIES].items() + if re.match(match_to_policy_name, k)), + ERRORED_SCOPES: [], + ERRORED_POLICIES: {} + } + MonkeyPolicyEngine.init() -class MonkeyHttpResponse(object): - """Monkey http reposne""" - def __init__(self, headers): - self.headers = headers or {} -class MonkeyedResponse(object): - """Monkey response""" - def __init__(self, full_path, json_body, headers): - self.full_path = full_path - self.req_json = json_body or {} - self.status_code = 200 - self.request = MonkeyHttpResponse(headers) - self.req_policy_name = self.req_json.get(POLICY_NAME) - self.res = MonkeyPolicyEngine.get_config(self.req_policy_name) - self.text = json.dumps(self.res) +@pytest.fixture() +def fix_pdp_post(monkeypatch): + """monkeyed request /getConfig to PDP""" + def monkeyed_policy_rest_post(full_path, json=None, headers=None): + """monkeypatch for the POST to policy-engine""" + res_json = MonkeyPolicyEngine.get_config(json.get(POLICY_NAME)) + return MonkeyedResponse(full_path, res_json, json, headers) + + Settings.logger.info("setup fix_pdp_post") + PolicyRest._lazy_init() + monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', + monkeyed_policy_rest_post) + yield fix_pdp_post # provide the fixture value + Settings.logger.info("teardown fix_pdp_post") + + +@pytest.fixture() +def fix_pdp_post_big(monkeypatch): + """monkeyed request /getConfig to PDP""" + def monkeyed_policy_rest_post(full_path, json=None, headers=None): + """monkeypatch for the POST to policy-engine""" + res_json = MonkeyPolicyEngine.get_configs_all() + return MonkeyedResponse(full_path, res_json, json, headers) + + Settings.logger.info("setup fix_pdp_post_big") + PolicyRest._lazy_init() + monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', + monkeyed_policy_rest_post) + yield fix_pdp_post_big # provide the fixture value + Settings.logger.info("teardown fix_pdp_post_big") + + +class MockException(Exception): + """mock exception""" + pass + + +@pytest.fixture() +def fix_pdp_post_boom(monkeypatch): + """monkeyed request /getConfig to PDP - exception""" + def monkeyed_policy_rest_post_boom(full_path, json=None, headers=None): + """monkeypatch for the POST to policy-engine""" + raise MockException("fix_pdp_post_boom") + + Settings.logger.info("setup fix_pdp_post_boom") + PolicyRest._lazy_init() + monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', + monkeyed_policy_rest_post_boom) + yield fix_pdp_post_boom + Settings.logger.info("teardown fix_pdp_post_boom") + +@staticmethod +def monkeyed_boom(*args, **kwargs): + """monkeypatch for the select_latest_policies""" + raise MockException("monkeyed_boom") + +@pytest.fixture() +def fix_select_latest_policies_boom(monkeypatch): + """monkeyed exception""" + + Settings.logger.info("setup fix_select_latest_policies_boom") + monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.select_latest_policies', + monkeyed_boom) + monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.select_latest_policy', + monkeyed_boom) + monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.extract_policy_id', + monkeyed_boom) + + yield fix_select_latest_policies_boom + Settings.logger.info("teardown fix_select_latest_policies_boom") + + +@pytest.fixture() +def fix_deploy_handler(monkeypatch, fix_discovery): + """monkeyed discovery request.get""" + def monkeyed_deploy_handler(full_path, json=None, headers=None): + """monkeypatch for deploy_handler""" + return MonkeyedResponse( + full_path, + {"server_instance_uuid": Settings.deploy_handler_instance_uuid}, + json, headers + ) + + Settings.logger.info("setup fix_deploy_handler") + audit = Audit(req_message="fix_deploy_handler") + DeployHandler._lazy_init(audit) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.post', + monkeyed_deploy_handler) + yield fix_deploy_handler # provide the fixture value + Settings.logger.info("teardown fix_deploy_handler") + + +@pytest.fixture() +def fix_deploy_handler_fail(monkeypatch, fix_discovery): + """monkeyed failed discovery request.get""" + def monkeyed_deploy_handler(full_path, json=None, headers=None): + """monkeypatch for deploy_handler""" + res = MonkeyedResponse( + full_path, + {"server_instance_uuid": Settings.deploy_handler_instance_uuid}, + json, headers + ) + res.status_code = 413 + return res + + @staticmethod + def monkeyed_deploy_handler_init(audit_ignore, rediscover=False): + """monkeypatch for deploy_handler init""" + DeployHandler._url = None + + Settings.logger.info("setup fix_deploy_handler_fail") + config_catch_up = Config.settings["catch_up"] + Config.settings["catch_up"] = {"interval": 1, "max_skips": 0} + + audit = Audit(req_message="fix_deploy_handler_fail") + DeployHandler._lazy_init(audit, rediscover=True) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.post', + monkeyed_deploy_handler) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._lazy_init', + monkeyed_deploy_handler_init) + yield fix_deploy_handler_fail + Settings.logger.info("teardown fix_deploy_handler_fail") + Config.settings["catch_up"] = config_catch_up + + +def monkeyed_cherrypy_engine_exit(): + """monkeypatch for deploy_handler""" + Settings.logger.info("cherrypy_engine_exit()") + + +@pytest.fixture() +def fix_cherrypy_engine_exit(monkeypatch): + """monkeyed cherrypy.engine.exit()""" + Settings.logger.info("setup fix_cherrypy_engine_exit") + monkeypatch.setattr('policyhandler.web_server.cherrypy.engine.exit', + monkeyed_cherrypy_engine_exit) + yield fix_cherrypy_engine_exit # provide the fixture value + Settings.logger.info("teardown fix_cherrypy_engine_exit") - def json(self): - """returns json of response""" - return self.res -def monkeyed_policy_rest_post(full_path, json={}, headers={}): - """monkeypatch for the POST to policy-engine""" - return MonkeyedResponse(full_path, json, headers) +class MonkeyedWebSocket(object): + """Monkey websocket""" + on_message = None -def test_get_policy_latest(monkeypatch): + @staticmethod + def send_notification(updated_indexes): + """fake notification through the web-socket""" + if not MonkeyedWebSocket.on_message: + return + message = { + LOADED_POLICIES: [ + {POLICY_NAME: "{0}.{1}.xml".format( + MonkeyPolicyEngine.get_policy_id(policy_index), policy_index + 1), + POLICY_VER: str(policy_index + 1)} + for policy_index in updated_indexes or [] + ], + REMOVED_POLICIES : [] + } + message = json.dumps(message) + Settings.logger.info("send_notification: %s", message) + MonkeyedWebSocket.on_message(None, message) + + @staticmethod + def enableTrace(yes_no): + """ignore""" + pass + + class MonkeyedSocket(object): + """Monkey websocket""" + def __init__(self): + self.connected = True + + class WebSocketApp(object): + """Monkeyed WebSocketApp""" + def __init__(self, web_socket_url, on_message=None, on_close=None, on_error=None): + self.web_socket_url = web_socket_url + self.on_message = MonkeyedWebSocket.on_message = on_message + self.on_close = on_close + self.on_error = on_error + self.sock = MonkeyedWebSocket.MonkeyedSocket() + Settings.logger.info("MonkeyedWebSocket for: %s", self.web_socket_url) + + def run_forever(self): + """forever in the loop""" + counter = 0 + while self.sock.connected: + counter += 1 + Settings.logger.info("MonkeyedWebSocket sleep %s...", counter) + time.sleep(5) + Settings.logger.info("MonkeyedWebSocket exit %s", counter) + + def close(self): + """close socket""" + self.sock.connected = False + + +@pytest.fixture() +def fix_policy_receiver_websocket(monkeypatch): + """monkeyed websocket for policy_receiver""" + Settings.logger.info("setup fix_policy_receiver_websocket") + monkeypatch.setattr('policyhandler.policy_receiver.websocket', MonkeyedWebSocket) + yield fix_policy_receiver_websocket # provide the fixture value + Settings.logger.info("teardown fix_policy_receiver_websocket") + + +def test_get_policy_latest(fix_pdp_post): """test /policy_latest/<policy-id>""" - monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', \ - monkeyed_policy_rest_post) - policy_index = 3 - policy_id = MonkeyPolicyEngine.get_policy_id(policy_index) - expected_policy = { - POLICY_ID : policy_id, - POLICY_BODY : MonkeyPolicyBody.create_policy_body(policy_id, policy_index) - } - expected_policy = PolicyUtils.parse_policy_config(expected_policy) - - audit = Audit(req_message="get /policy_latest/{0}".format(policy_id or "")) - policy_latest = PolicyRest.get_latest_policy((audit, policy_id)) or {} + policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3) + + audit = Audit(job_name="test_get_policy_latest", + req_message="get /policy_latest/{0}".format(policy_id or "")) + policy_latest = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {} audit.audit_done(result=json.dumps(policy_latest)) - Settings.logger.info("expected_policy: {0}".format(json.dumps(expected_policy))) - Settings.logger.info("policy_latest: {0}".format(json.dumps(policy_latest))) - assert MonkeyPolicyBody.is_the_same_dict(policy_latest, expected_policy) - assert MonkeyPolicyBody.is_the_same_dict(expected_policy, policy_latest) + Settings.logger.info("expected_policy: %s", json.dumps(expected_policy)) + Settings.logger.info("policy_latest: %s", json.dumps(policy_latest)) + assert Utils.are_the_same(policy_latest, expected_policy) + + + +@pytest.mark.usefixtures("fix_pdp_post") +class WebServerTest(CPWebCase): + """testing the web-server - runs tests in alphabetical order of method names""" + def setup_server(): + """setup the web-server""" + cherrypy.tree.mount(_PolicyWeb(), '/') + + setup_server = staticmethod(setup_server) + + def test_web_healthcheck(self): + """test /healthcheck""" + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + Settings.logger.info("got healthcheck: %s", self.body) + self.assertStatus('200 OK') + + def test_web_policy_latest(self): + """test /policy_latest/<policy-id>""" + policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3) + + self.getPage("/policy_latest/{0}".format(policy_id or "")) + self.assertStatus('200 OK') + + policy_latest = json.loads(self.body) + + Settings.logger.info("policy_latest: %s", self.body) + Settings.logger.info("expected_policy: %s", json.dumps(expected_policy)) + assert Utils.are_the_same(policy_latest, expected_policy) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + def test_web_all_policies_latest(self): + """test GET /policies_latest""" + expected_policies = MonkeyPolicyEngine.gen_all_policies_latest() + expected_policies = expected_policies[LATEST_POLICIES] + + result = self.getPage("/policies_latest") + Settings.logger.info("result: %s", result) + Settings.logger.info("body: %s", self.body) + self.assertStatus('200 OK') + + policies_latest = json.loads(self.body) + self.assertIn(LATEST_POLICIES, policies_latest) + policies_latest = policies_latest[LATEST_POLICIES] + + Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) + Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) + assert Utils.are_the_same(policies_latest, expected_policies) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + def test_web_policies_latest(self): + """test POST /policies_latest with policyName""" + match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*" + expected_policies = MonkeyPolicyEngine.gen_policies_latest(match_to_policy_name) + expected_policies = expected_policies[LATEST_POLICIES] + + body = json.dumps({POLICY_NAME: match_to_policy_name}) + result = self.getPage("/policies_latest", method='POST', + body=body, + headers=[ + (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())), + ("Content-Type", "application/json"), + ('Content-Length', str(len(body))) + ]) + Settings.logger.info("result: %s", result) + Settings.logger.info("body: %s", self.body) + self.assertStatus('200 OK') + + policies_latest = json.loads(self.body)[LATEST_POLICIES] + + Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) + Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) + assert Utils.are_the_same(policies_latest, expected_policies) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_policy_updates_and_catch_ups(self): + """test run policy handler with policy updates and catchups""" + Settings.logger.info("start policy_updates_and_catch_ups") + audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups", + req_message="start policy_updates_and_catch_ups") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1, 3, 5]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.logger.info("sleep 30 before shutdown...") + time.sleep(30) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_catch_up_on_deploy_handler_changed(self): + """test run policy handler with deployment-handler changed underneath""" + Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed") + audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed", + req_message="start zzz_catch_up_on_deploy_handler_changed") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.deploy_handler_instance_uuid = str(uuid.uuid4()) + Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid) + + MonkeyedWebSocket.send_notification([2, 4]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.logger.info("sleep 5 before shutdown...") + time.sleep(5) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_get_catch_up(self): + """test /catch_up""" + Settings.logger.info("start /catch_up") + audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up") + PolicyReceiver.run(audit) + time.sleep(5) + result = self.getPage("/catch_up") + Settings.logger.info("catch_up result: %s", result) + self.assertStatus('200 OK') + Settings.logger.info("got catch_up: %s", self.body) + + Settings.logger.info("sleep 5 before shutdown...") + time.sleep(5) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures( + "fix_deploy_handler", + "fix_policy_receiver_websocket", + "fix_cherrypy_engine_exit") + def test_zzzzz_shutdown(self): + """test shutdown""" + Settings.logger.info("start shutdown") + audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1, 3, 5]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + WebServerTest.do_gc_test = False + Settings.logger.info("shutdown...") + result = self.getPage("/shutdown") + Settings.logger.info("shutdown result: %s", result) + self.assertStatus('200 OK') + Settings.logger.info("got shutdown: %s", self.body) + time.sleep(1) + + +@pytest.mark.usefixtures("fix_pdp_post_boom") +class WebServerPDPBoomTest(CPWebCase): + """testing the web-server - runs tests in alphabetical order of method names""" + def setup_server(): + """setup the web-server""" + cherrypy.tree.mount(_PolicyWeb(), '/') + + setup_server = staticmethod(setup_server) + + def test_web_healthcheck(self): + """test /healthcheck""" + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + Settings.logger.info("got healthcheck: %s", self.body) + self.assertStatus('200 OK') + + def test_web_policy_latest(self): + """test /policy_latest/<policy-id>""" + policy_id, _ = MonkeyPolicyEngine.gen_policy_latest(3) + + self.getPage("/policy_latest/{0}".format(policy_id or "")) + self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + def test_web_all_policies_latest(self): + """test GET /policies_latest""" + result = self.getPage("/policies_latest") + Settings.logger.info("result: %s", result) + Settings.logger.info("body: %s", self.body) + self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + def test_web_policies_latest(self): + """test POST /policies_latest with policyName""" + match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*" + + body = json.dumps({POLICY_NAME: match_to_policy_name}) + result = self.getPage("/policies_latest", method='POST', + body=body, + headers=[ + (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())), + ("Content-Type", "application/json"), + ('Content-Length', str(len(body))) + ]) + Settings.logger.info("result: %s", result) + Settings.logger.info("body: %s", self.body) + self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_policy_updates_and_catch_ups(self): + """test run policy handler with policy updates and catchups""" + Settings.logger.info("start policy_updates_and_catch_ups") + audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups", + req_message="start policy_updates_and_catch_ups") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1, 3, 5]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.logger.info("sleep 30 before shutdown...") + time.sleep(30) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_catch_up_on_deploy_handler_changed(self): + """test run policy handler with deployment-handler changed underneath""" + Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed") + audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed", + req_message="start zzz_catch_up_on_deploy_handler_changed") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.deploy_handler_instance_uuid = str(uuid.uuid4()) + Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid) + + MonkeyedWebSocket.send_notification([2, 4]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.logger.info("sleep 5 before shutdown...") + time.sleep(5) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_get_catch_up(self): + """test /catch_up""" + Settings.logger.info("start /catch_up") + audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up") + PolicyReceiver.run(audit) + time.sleep(5) + result = self.getPage("/catch_up") + Settings.logger.info("catch_up result: %s", result) + self.assertStatus('200 OK') + Settings.logger.info("got catch_up: %s", self.body) + + Settings.logger.info("sleep 5 before shutdown...") + time.sleep(5) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures( + "fix_deploy_handler", + "fix_policy_receiver_websocket", + "fix_cherrypy_engine_exit") + def test_zzzzz_shutdown(self): + """test shutdown""" + Settings.logger.info("start shutdown") + audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1, 3, 5]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + WebServerPDPBoomTest.do_gc_test = False + Settings.logger.info("shutdown...") + result = self.getPage("/shutdown") + Settings.logger.info("shutdown result: %s", result) + self.assertStatus('200 OK') + Settings.logger.info("got shutdown: %s", self.body) + time.sleep(1) + + +@pytest.mark.usefixtures("fix_pdp_post", "fix_select_latest_policies_boom") +class WebServerInternalBoomTest(CPWebCase): + """testing the web-server - runs tests in alphabetical order of method names""" + def setup_server(): + """setup the web-server""" + cherrypy.tree.mount(_PolicyWeb(), '/') + + setup_server = staticmethod(setup_server) + + def test_web_healthcheck(self): + """test /healthcheck""" + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + Settings.logger.info("got healthcheck: %s", self.body) + self.assertStatus('200 OK') + + def test_web_policy_latest(self): + """test /policy_latest/<policy-id>""" + policy_id, _ = MonkeyPolicyEngine.gen_policy_latest(3) + + self.getPage("/policy_latest/{0}".format(policy_id or "")) + self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + def test_web_all_policies_latest(self): + """test GET /policies_latest""" + result = self.getPage("/policies_latest") + Settings.logger.info("result: %s", result) + Settings.logger.info("body: %s", self.body) + self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + def test_web_policies_latest(self): + """test POST /policies_latest with policyName""" + match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*" + + body = json.dumps({POLICY_NAME: match_to_policy_name}) + result = self.getPage("/policies_latest", method='POST', + body=body, + headers=[ + (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())), + ("Content-Type", "application/json"), + ('Content-Length', str(len(body))) + ]) + Settings.logger.info("result: %s", result) + Settings.logger.info("body: %s", self.body) + self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_policy_updates_and_catch_ups(self): + """test run policy handler with policy updates and catchups""" + Settings.logger.info("start policy_updates_and_catch_ups") + audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups", + req_message="start policy_updates_and_catch_ups") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1, 3, 5]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.logger.info("sleep 30 before shutdown...") + time.sleep(30) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_catch_up_on_deploy_handler_changed(self): + """test run policy handler with deployment-handler changed underneath""" + Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed") + audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed", + req_message="start zzz_catch_up_on_deploy_handler_changed") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.deploy_handler_instance_uuid = str(uuid.uuid4()) + Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid) + + MonkeyedWebSocket.send_notification([2, 4]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + Settings.logger.info("sleep 5 before shutdown...") + time.sleep(5) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket") + def test_zzz_get_catch_up(self): + """test /catch_up""" + Settings.logger.info("start /catch_up") + audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up") + PolicyReceiver.run(audit) + time.sleep(5) + result = self.getPage("/catch_up") + Settings.logger.info("catch_up result: %s", result) + self.assertStatus('200 OK') + Settings.logger.info("got catch_up: %s", self.body) + + Settings.logger.info("sleep 5 before shutdown...") + time.sleep(5) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + @pytest.mark.usefixtures( + "fix_deploy_handler", + "fix_policy_receiver_websocket", + "fix_cherrypy_engine_exit") + def test_zzzzz_shutdown(self): + """test shutdown""" + Settings.logger.info("start shutdown") + audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep before send_notification...") + time.sleep(2) + + MonkeyedWebSocket.send_notification([1, 3, 5]) + Settings.logger.info("sleep after send_notification...") + time.sleep(3) + + result = self.getPage("/healthcheck") + Settings.logger.info("healthcheck result: %s", result) + + WebServerInternalBoomTest.do_gc_test = False + Settings.logger.info("shutdown...") + result = self.getPage("/shutdown") + Settings.logger.info("shutdown result: %s", result) + self.assertStatus('200 OK') + Settings.logger.info("got shutdown: %s", self.body) + time.sleep(1) + + +@pytest.mark.usefixtures( + "fix_pdp_post_big", + "fix_deploy_handler_fail", + "fix_policy_receiver_websocket" +) +def test_catch_ups_failed_dh(): + """test run policy handler with catchups and failed deployment-handler""" + Settings.logger.info("start test_catch_ups_failed_dh") + audit = Audit(job_name="test_catch_ups_failed_dh", + req_message="start test_catch_ups_failed_dh") + PolicyReceiver.run(audit) + + Settings.logger.info("sleep 50 before shutdown...") + time.sleep(50) + + health = audit.health(full=True) + audit.audit_done(result=json.dumps(health)) + + Settings.logger.info("healthcheck: %s", json.dumps(health)) + assert bool(health) + + PolicyReceiver.shutdown(audit) + time.sleep(1) + + health = audit.health(full=True) + Settings.logger.info("healthcheck: %s", json.dumps(health)) diff --git a/tests/test_step_timer.py b/tests/test_step_timer.py new file mode 100644 index 0000000..ff89388 --- /dev/null +++ b/tests/test_step_timer.py @@ -0,0 +1,209 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""test of the step_timer""" + +import json +import logging +import time +from datetime import datetime + +from policyhandler.config import Config +from policyhandler.step_timer import StepTimer + +Config.load_from_file() + + +class MockTimerController(object): + """testing step_timer""" + logger = logging.getLogger("policy_handler.unit_test.step_timer") + + INIT = "init" + NEXT = "next" + STARTED = "started" + PAUSED = "paused" + STOPPING = "stopping" + STOPPED = "stopped" + + def __init__(self, name, interval): + """step_timer test settings""" + self.name = name or "step_timer" + self.interval = interval or 5 + self.step_timer = None + self.status = None + self.run_counter = 0 + self.status_ts = datetime.utcnow() + self.exe_ts = None + self.exe_interval = None + self.set_status(MockTimerController.INIT) + + def __enter__(self): + """constructor""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """destructor""" + self.stop_timer() + + def on_time(self, *args, **kwargs): + """timer event""" + self.exe_ts = datetime.utcnow() + self.exe_interval = (self.exe_ts - self.status_ts).total_seconds() + MockTimerController.logger.info("run on_time[%s] (%s, %s) in %s for %s", + self.run_counter, json.dumps(args), json.dumps(kwargs), + self.exe_interval, self.get_status()) + time.sleep(3) + MockTimerController.logger.info("done on_time[%s] (%s, %s) in %s for %s", + self.run_counter, json.dumps(args), json.dumps(kwargs), + self.exe_interval, self.get_status()) + + def verify_last_event(self): + """assertions needs to be in the main thread""" + if self.exe_interval is None: + MockTimerController.logger.info("not executed: %s", self.get_status()) + return + + MockTimerController.logger.info("verify exe %s for %s", + self.exe_interval, self.get_status()) + assert self.exe_interval >= (self.interval - 0.01) + assert self.exe_interval < 2 * self.interval + MockTimerController.logger.info("success %s", self.get_status()) + + def run_timer(self): + """create and start the step_timer""" + if self.step_timer: + self.step_timer.next() + self.set_status(MockTimerController.NEXT) + return + + self.step_timer = StepTimer( + self.name, self.interval, MockTimerController.on_time, + MockTimerController.logger, + self + ) + self.step_timer.start() + self.set_status(MockTimerController.STARTED) + + def pause_timer(self): + """pause step_timer""" + if self.step_timer: + self.step_timer.pause() + self.set_status(MockTimerController.PAUSED) + + def stop_timer(self): + """stop step_timer""" + if self.step_timer: + self.set_status(MockTimerController.STOPPING) + self.step_timer.stop() + self.step_timer.join() + self.step_timer = None + self.set_status(MockTimerController.STOPPED) + + def set_status(self, status): + """set the status of the timer""" + if status in [MockTimerController.NEXT, MockTimerController.STARTED]: + self.run_counter += 1 + + self.status = status + utcnow = datetime.utcnow() + time_step = (utcnow - self.status_ts).total_seconds() + self.status_ts = utcnow + MockTimerController.logger.info("[%s]: %s", time_step, self.get_status()) + + def get_status(self): + """string representation""" + status = "{0}[{1}] {2} in {3} from {4} last exe {5}".format( + self.status, self.run_counter, self.name, self.interval, + str(self.status_ts), str(self.exe_ts) + ) + if self.step_timer: + return "{0}: {1}".format(status, self.step_timer.get_timer_status()) + return status + + +def test_step_timer(): + """test step_timer""" + MockTimerController.logger.info("============ test_step_timer =========") + with MockTimerController("step_timer", 5) as step_timer: + step_timer.run_timer() + time.sleep(1) + step_timer.verify_last_event() + + time.sleep(1 + step_timer.interval) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(3 * step_timer.interval) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(3 * step_timer.interval) + step_timer.verify_last_event() + + +def test_interrupt_step_timer(): + """test step_timer""" + MockTimerController.logger.info("============ test_interrupt_step_timer =========") + with MockTimerController("step_timer", 5) as step_timer: + step_timer.run_timer() + time.sleep(1) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2 + step_timer.interval) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(2 + step_timer.interval) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.pause_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(2) + step_timer.verify_last_event() + + step_timer.run_timer() + time.sleep(3 * step_timer.interval) + step_timer.verify_last_event() diff --git a/tests/test_zzz_memory.py b/tests/test_zzz_memory.py new file mode 100644 index 0000000..e70cf59 --- /dev/null +++ b/tests/test_zzz_memory.py @@ -0,0 +1,117 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""test of the package for policy-handler of DCAE-Controller""" + +import gc +import json +import time + +from policyhandler.onap.audit import Audit, AuditHttpCode, Metrics + +from .mock_settings import Settings + +Settings.init() + +class Node(object): + """making the cycled objects""" + def __init__(self, name): + self.name = name + self.next = None + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, self.name) + + +def test_healthcheck(): + """test /healthcheck""" + audit = Audit(job_name="test_healthcheck", + req_message="get /healthcheck") + metrics = Metrics(aud_parent=audit, targetEntity="test_healthcheck") + metrics.metrics_start("test /healthcheck") + time.sleep(0.1) + + metrics.metrics("test /healthcheck") + health = audit.health(full=True) + audit.audit_done(result=json.dumps(health)) + + Settings.logger.info("healthcheck: %s", json.dumps(health)) + assert bool(health) + + +def test_healthcheck_with_error(): + """test /healthcheck""" + audit = Audit(job_name="test_healthcheck_with_error", + req_message="get /healthcheck") + metrics = Metrics(aud_parent=audit, targetEntity="test_healthcheck_with_error") + metrics.metrics_start("test /healthcheck") + time.sleep(0.2) + audit.error("error from test_healthcheck_with_error") + audit.fatal("fatal from test_healthcheck_with_error") + audit.debug("debug from test_healthcheck_with_error") + audit.warn("debug from test_healthcheck_with_error") + audit.info_requested("debug from test_healthcheck_with_error") + if audit.is_success(): + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + metrics.metrics("test /healthcheck") + + health = audit.health(full=True) + audit.audit_done(result=json.dumps(health)) + + Settings.logger.info("healthcheck: %s", json.dumps(health)) + assert bool(health) + + +def test_healthcheck_with_garbage(): + """test /healthcheck""" + gc_found = gc.collect() + gc.set_debug(gc.DEBUG_LEAK) + + node1 = Node("one") + node2 = Node("two") + node3 = Node("three") + node1.next = node2 + node2.next = node3 + node3.next = node1 + node1 = node2 = node3 = None + gc_found = gc.collect() + + audit = Audit(job_name="test_healthcheck_with_garbage", + req_message="get /test_healthcheck_with_garbage") + health = audit.health(full=True) + audit.audit_done(result=json.dumps(health)) + + Settings.logger.info("test_healthcheck_with_garbage[%s]: %s", gc_found, json.dumps(health)) + assert bool(health) + assert bool(health.get("runtime", {}).get("gc", {}).get("gc_garbage")) + + Settings.logger.info("clearing up garbage...") + for obj in gc.garbage: + if isinstance(obj, Node): + Settings.logger.info("in garbage: %s 0x%x", obj, id(obj)) + obj.next = None + + gc_found = gc.collect() + Settings.logger.info("after clear test_healthcheck_with_garbage[%s]: %s", + gc_found, json.dumps(audit.health(full=True))) + + gc.set_debug(False) + + gc_found = gc.collect() + Settings.logger.info("after turned off gc debug test_healthcheck_with_garbage[%s]: %s", + gc_found, json.dumps(audit.health(full=True))) |