diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/mock_settings.py | 2 | ||||
-rw-r--r-- | tests/test_policy_utils.py | 186 | ||||
-rw-r--r-- | tests/test_policyhandler.py | 176 |
3 files changed, 289 insertions, 75 deletions
diff --git a/tests/mock_settings.py b/tests/mock_settings.py index 28bc6cc..017ad7e 100644 --- a/tests/mock_settings.py +++ b/tests/mock_settings.py @@ -45,7 +45,7 @@ class Settings(object): Config.load_from_file("etc_upload/config.json") - Config.settings["catch_up"] = {"interval": 10, "max_skips": 2} + Config.settings["catch_up"] = {"interval": 10} Settings.logger = logging.getLogger("policy_handler.unit_test") sys.stdout = LogWriter(Settings.logger.info) diff --git a/tests/test_policy_utils.py b/tests/test_policy_utils.py new file mode 100644 index 0000000..b88f1ea --- /dev/null +++ b/tests/test_policy_utils.py @@ -0,0 +1,186 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +"""test of the policy_utils""" + +import json +import logging +import re + +from policyhandler.config import Config +from policyhandler.policy_utils import RegexCoarser + +Config.load_from_file() +LOGGER = logging.getLogger("policy_handler.unit_test_policy_utils") + + +def check_coarse_regex(test_name, patterns, matching_strings=None, expected_subpatterns=None): + """generic test""" + regex_coarser = RegexCoarser(patterns) + coarse_patterns = regex_coarser.get_coarse_regex_patterns(max_length=20) + LOGGER.info("check_coarse_regex %s (%s) for [%s]", + test_name, coarse_patterns, json.dumps(regex_coarser.patterns)) + coarse_regexes = [re.compile(coarse_pattern) for coarse_pattern in coarse_patterns] + coarse_patterns_str = json.dumps(coarse_patterns) + if matching_strings: + for test_str in matching_strings: + LOGGER.info(" match '%s' to %s (%s)", test_str, test_name, coarse_patterns_str) + assert bool(list(filter(None, [ + coarse_regex.match(test_str) for coarse_regex in coarse_regexes + ]))) + + if expected_subpatterns: + for subpattern in expected_subpatterns: + LOGGER.info(" subpattern '%s' in %s", subpattern, coarse_patterns_str) + assert subpattern in coarse_patterns_str + +def check_combined_regex(test_name, patterns, matching_strings=None, unmatching_strings=None): + """generic test""" + regex_coarser = RegexCoarser(patterns) + combined_pattern = regex_coarser.get_combined_regex_pattern() + LOGGER.info("check_combined_regex %s (%s) for [%s]", + test_name, combined_pattern, json.dumps(regex_coarser.patterns)) + coarse_regex = re.compile(combined_pattern) + if matching_strings: + for test_str in matching_strings: + LOGGER.info(" match '%s' to %s (%s)", test_str, test_name, combined_pattern) + assert coarse_regex.match(test_str) + + if unmatching_strings: + for test_str in unmatching_strings: + LOGGER.info(" not match '%s' to %s (%s)", test_str, test_name, combined_pattern) + assert not coarse_regex.match(test_str) + +def test_regex_coarser(): + """test variety of regex combinations""" + + test_data = [ + ( + "simple", + [ + "plain text", "plain pick", + "aaa (((a|b)|c)|d)", + "aaa (((a|b)|c)|d zzz", + "nested (expr[aeiou]ss(ions)?)", + "nested (expr[aeiou]ss(?:ions|ion)?)", + "^ (any expr|more|less|some|who cares)", + " (any expr|more|less|some|who cares)", + "(any expr|more|less|some|who cares)" + ], + [ + 'plain text', + 'nested exprussions', + 'nested expross', + 'aaa c', + 'aaa d', + 'who cares', + ' who cares' + ], + None, + [ + "nested .*", + "plain .*", + "aaa .*", + "(any expr|more|less|some|who cares)", + "^ (any expr|more|less|some|who cares)", + " (any expr|more|less|some|who cares)"] + ), + ( + "combination", + [ + "plain text", + "^with* modifiers?", + "cha[ra][ra]cter classes", + "^with (groups)", + "^with (groups|more groups)", + r"^with (mod+ifiers\s*|in groups{2,3}\s*)+", + "sub", + "substrings", + "su.*bstrings", + "char count{1,3}s", + "nested (expr[aeiou]ss(ions)?)", + r"escaped (\)) chars", + r"escaped ([\)\]]) chars", + r"escaped ([\)\]]){3} chars" + ], + [ + 'plain text', + 'withhh modifier', + 'character classes', + 'with groups', + 'with more groups', + 'with modddifiers in groupss modifiers in groupsss', + 'sub', + 'substrings', + 'char counttts', + 'nested exprassions', + 'nested express', + 'escaped ) chars', + 'escaped ] chars', + 'escaped ]]] chars' + ], + [ + 'plain', + 'text', + 'something with modifiers', + 'su', + 'char counttttts', + 'escaped ]] chars' + ], + [ + "nested .*", + "escaped .*", + "^wit.*", + "plain text", + "cha.*", + "su.*" + ] + ), + ( + "combined", + [ + 'foo+', + 'food', + 'football', + "ba[rh]", + "bard" + ], + [ + 'foo', + 'fooooo', + 'football', + 'food', + 'bar', + 'bah', + 'bard' + ], + [ + 'fo', + 'bat' + ], + [ + "fo.*", + "ba.*" + ] + ) + ] + + for (test_name, patterns, + matching_strings, unmatching_strings, expected_subpatterns) in test_data: + check_combined_regex(test_name, patterns, matching_strings, unmatching_strings) + check_coarse_regex(test_name, patterns, matching_strings, expected_subpatterns) diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py index ec0b030..eda1981 100644 --- a/tests/test_policyhandler.py +++ b/tests/test_policyhandler.py @@ -25,7 +25,6 @@ import time import uuid import pytest - import cherrypy from cherrypy.test.helper import CPWebCase @@ -34,10 +33,9 @@ from policyhandler.deploy_handler import DeployHandler from policyhandler.discovery import DiscoveryClient from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode) -from policyhandler.policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, - LATEST_POLICIES, POLICY_BODY, +from policyhandler.policy_consts import (LATEST_POLICIES, POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME, - POLICY_VERSION, SCOPE_PREFIXES) + POLICY_VERSION, POLICY_VERSIONS) from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER, REMOVED_POLICIES, PolicyReceiver) from policyhandler.policy_rest import PolicyRest @@ -125,9 +123,9 @@ class MonkeyPolicyBody(object): } -class MonkeyPolicyEngine(object): +class MockPolicyEngine(object): """pretend this is the policy-engine""" - _scope_prefix = Config.settings["scope_prefixes"][0] + scope_prefix = "test_scope_prefix.Config_" LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur ametist""".split() LONG_TEXT = "0123456789" * 100 _policies = [] @@ -135,13 +133,13 @@ class MonkeyPolicyEngine(object): @staticmethod def init(): """init static vars""" - MonkeyPolicyEngine._policies = [ + MockPolicyEngine._policies = [ MonkeyPolicyBody.create_policy_body( - MonkeyPolicyEngine._scope_prefix + policy_id, policy_index + 1) - for policy_id in MonkeyPolicyEngine.LOREM_IPSUM - for policy_index in range(1 + MonkeyPolicyEngine.LOREM_IPSUM.index(policy_id))] - Settings.logger.info("MonkeyPolicyEngine._policies: %s", - json.dumps(MonkeyPolicyEngine._policies)) + MockPolicyEngine.scope_prefix + policy_id, policy_index + 1) + for policy_id in MockPolicyEngine.LOREM_IPSUM + for policy_index in range(1 + MockPolicyEngine.LOREM_IPSUM.index(policy_id))] + Settings.logger.info("MockPolicyEngine._policies: %s", + json.dumps(MockPolicyEngine._policies)) @staticmethod def get_config(policy_name): @@ -149,61 +147,75 @@ class MonkeyPolicyEngine(object): if not policy_name: return [] return [copy.deepcopy(policy) - for policy in MonkeyPolicyEngine._policies + for policy in MockPolicyEngine._policies if re.match(policy_name, policy[POLICY_NAME])] @staticmethod def get_configs_all(): """get all policies the way the policy-engine finds""" policies = [copy.deepcopy(policy) - for policy in MonkeyPolicyEngine._policies] + for policy in MockPolicyEngine._policies] for policy in policies: - policy["config"] = MonkeyPolicyEngine.LONG_TEXT + policy["config"] = MockPolicyEngine.LONG_TEXT return policies @staticmethod def get_policy_id(policy_index): """get the policy_id by index""" - return (MonkeyPolicyEngine._scope_prefix - + MonkeyPolicyEngine.LOREM_IPSUM[ - policy_index % len(MonkeyPolicyEngine.LOREM_IPSUM)]) + return (MockPolicyEngine.scope_prefix + + MockPolicyEngine.LOREM_IPSUM[ + policy_index % len(MockPolicyEngine.LOREM_IPSUM)]) @staticmethod - def gen_policy_latest(policy_index): + def gen_policy_latest(policy_index, version_offset=0): """generate the policy response by policy_index = version - 1""" - policy_id = MonkeyPolicyEngine.get_policy_id(policy_index) - expected_policy = { + policy_id = MockPolicyEngine.get_policy_id(policy_index) + policy = { POLICY_ID: policy_id, - POLICY_BODY: MonkeyPolicyBody.create_policy_body(policy_id, policy_index + 1) + POLICY_BODY: MonkeyPolicyBody.create_policy_body( + policy_id, policy_index + 1 - version_offset) } - return policy_id, PolicyUtils.parse_policy_config(expected_policy) + return policy_id, PolicyUtils.parse_policy_config(policy) @staticmethod - def gen_all_policies_latest(): + def gen_all_policies_latest(version_offset=0): """generate all latest policies""" - return { - LATEST_POLICIES: dict(MonkeyPolicyEngine.gen_policy_latest(policy_index) - for policy_index in range(len(MonkeyPolicyEngine.LOREM_IPSUM))), - ERRORED_SCOPES: ["DCAE.Config_*"], - SCOPE_PREFIXES: ["DCAE.Config_*"], - ERRORED_POLICIES: {} - } + return dict(MockPolicyEngine.gen_policy_latest(policy_index, version_offset=version_offset) + for policy_index in range(len(MockPolicyEngine.LOREM_IPSUM))) @staticmethod def gen_policies_latest(match_to_policy_name): """generate all latest policies""" - return { - LATEST_POLICIES: - dict((k, v) - for k, v in MonkeyPolicyEngine.gen_all_policies_latest() - [LATEST_POLICIES].items() - if re.match(match_to_policy_name, k)), - ERRORED_SCOPES: [], - ERRORED_POLICIES: {} - } + return dict((k, v) + for k, v in MockPolicyEngine.gen_all_policies_latest().items() + if re.match(match_to_policy_name, k)) + +MockPolicyEngine.init() -MonkeyPolicyEngine.init() + +class MockDeploymentHandler(object): + """pretend this is the deployment-handler""" + + @staticmethod + def default_response(): + """generate the deployed policies message""" + return {"server_instance_uuid": Settings.deploy_handler_instance_uuid} + + @staticmethod + def get_deployed_policies(): + """generate the deployed policies message""" + response = MockDeploymentHandler.default_response() + policies = dict( + (policy_id, { + POLICY_ID: policy_id, + POLICY_VERSIONS: {policy.get(POLICY_BODY, {}).get(POLICY_VERSION, "999"): True}, + "pending_update": False}) + for policy_id, policy in (MockPolicyEngine.gen_all_policies_latest(version_offset=1) + .items())) + response["policies"] = policies + + return response @pytest.fixture() @@ -211,7 +223,7 @@ def fix_pdp_post(monkeypatch): """monkeyed request /getConfig to PDP""" def monkeyed_policy_rest_post(full_path, json=None, headers=None): """monkeypatch for the POST to policy-engine""" - res_json = MonkeyPolicyEngine.get_config(json.get(POLICY_NAME)) + res_json = MockPolicyEngine.get_config(json.get(POLICY_NAME)) return MonkeyedResponse(full_path, res_json, json, headers) Settings.logger.info("setup fix_pdp_post") @@ -227,7 +239,7 @@ def fix_pdp_post_big(monkeypatch): """monkeyed request /getConfig to PDP""" def monkeyed_policy_rest_post(full_path, json=None, headers=None): """monkeypatch for the POST to policy-engine""" - res_json = MonkeyPolicyEngine.get_configs_all() + res_json = MockPolicyEngine.get_configs_all() return MonkeyedResponse(full_path, res_json, json, headers) Settings.logger.info("setup fix_pdp_post_big") @@ -280,20 +292,26 @@ def fix_select_latest_policies_boom(monkeypatch): @pytest.fixture() def fix_deploy_handler(monkeypatch, fix_discovery): - """monkeyed discovery request.get""" - def monkeyed_deploy_handler(full_path, json=None, headers=None): - """monkeypatch for deploy_handler""" - return MonkeyedResponse( - full_path, - {"server_instance_uuid": Settings.deploy_handler_instance_uuid}, - json, headers - ) + """monkeyed requests to deployment-handler""" + def monkeyed_deploy_handler_put(full_path, json=None, headers=None): + """monkeypatch for policy-update request.put to deploy_handler""" + return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(), + json, headers) + + def monkeyed_deploy_handler_get(full_path, headers=None): + """monkeypatch policy-update request.get to deploy_handler""" + return MonkeyedResponse(full_path, MockDeploymentHandler.get_deployed_policies(), + None, headers) Settings.logger.info("setup fix_deploy_handler") audit = Audit(req_message="fix_deploy_handler") DeployHandler._lazy_init(audit) - monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.post', - monkeyed_deploy_handler) + + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put', + monkeyed_deploy_handler_put) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get', + monkeyed_deploy_handler_get) + yield fix_deploy_handler # provide the fixture value Settings.logger.info("teardown fix_deploy_handler") @@ -301,7 +319,7 @@ def fix_deploy_handler(monkeypatch, fix_discovery): @pytest.fixture() def fix_deploy_handler_fail(monkeypatch, fix_discovery): """monkeyed failed discovery request.get""" - def monkeyed_deploy_handler(full_path, json=None, headers=None): + def monkeyed_deploy_handler_put(full_path, json=None, headers=None): """monkeypatch for deploy_handler""" res = MonkeyedResponse( full_path, @@ -311,6 +329,11 @@ def fix_deploy_handler_fail(monkeypatch, fix_discovery): res.status_code = 413 return res + def monkeyed_deploy_handler_get(full_path, headers=None): + """monkeypatch policy-update request.get to deploy_handler""" + return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(), + None, headers) + @staticmethod def monkeyed_deploy_handler_init(audit_ignore, rediscover=False): """monkeypatch for deploy_handler init""" @@ -318,28 +341,32 @@ def fix_deploy_handler_fail(monkeypatch, fix_discovery): Settings.logger.info("setup fix_deploy_handler_fail") config_catch_up = Config.settings["catch_up"] - Config.settings["catch_up"] = {"interval": 1, "max_skips": 0} + Config.settings["catch_up"] = {"interval": 1} audit = Audit(req_message="fix_deploy_handler_fail") DeployHandler._lazy_init(audit, rediscover=True) - monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.post', - monkeyed_deploy_handler) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._lazy_init', monkeyed_deploy_handler_init) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put', + monkeyed_deploy_handler_put) + monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get', + monkeyed_deploy_handler_get) + yield fix_deploy_handler_fail Settings.logger.info("teardown fix_deploy_handler_fail") Config.settings["catch_up"] = config_catch_up -def monkeyed_cherrypy_engine_exit(): - """monkeypatch for deploy_handler""" - Settings.logger.info("cherrypy_engine_exit()") - - @pytest.fixture() def fix_cherrypy_engine_exit(monkeypatch): """monkeyed cherrypy.engine.exit()""" Settings.logger.info("setup fix_cherrypy_engine_exit") + + def monkeyed_cherrypy_engine_exit(): + """monkeypatch for deploy_handler""" + Settings.logger.info("cherrypy_engine_exit()") + monkeypatch.setattr('policyhandler.web_server.cherrypy.engine.exit', monkeyed_cherrypy_engine_exit) yield fix_cherrypy_engine_exit # provide the fixture value @@ -358,7 +385,7 @@ class MonkeyedWebSocket(object): message = { LOADED_POLICIES: [ {POLICY_NAME: "{0}.{1}.xml".format( - MonkeyPolicyEngine.get_policy_id(policy_index), policy_index + 1), + MockPolicyEngine.get_policy_id(policy_index), policy_index + 1), POLICY_VER: str(policy_index + 1)} for policy_index in updated_indexes or [] ], @@ -413,7 +440,7 @@ def fix_policy_receiver_websocket(monkeypatch): def test_get_policy_latest(fix_pdp_post): """test /policy_latest/<policy-id>""" - policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3) + policy_id, expected_policy = MockPolicyEngine.gen_policy_latest(3) audit = Audit(job_name="test_get_policy_latest", req_message="get /policy_latest/{0}".format(policy_id or "")) @@ -444,7 +471,7 @@ class WebServerTest(CPWebCase): def test_web_policy_latest(self): """test /policy_latest/<policy-id>""" - policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3) + policy_id, expected_policy = MockPolicyEngine.gen_policy_latest(3) self.getPage("/policy_latest/{0}".format(policy_id or "")) self.assertStatus('200 OK') @@ -458,10 +485,10 @@ class WebServerTest(CPWebCase): result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) + @pytest.mark.usefixtures("fix_deploy_handler") def test_web_all_policies_latest(self): """test GET /policies_latest""" - expected_policies = MonkeyPolicyEngine.gen_all_policies_latest() - expected_policies = expected_policies[LATEST_POLICIES] + expected_policies = MockPolicyEngine.gen_all_policies_latest() result = self.getPage("/policies_latest") Settings.logger.info("result: %s", result) @@ -481,9 +508,8 @@ class WebServerTest(CPWebCase): def test_web_policies_latest(self): """test POST /policies_latest with policyName""" - match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*" - expected_policies = MonkeyPolicyEngine.gen_policies_latest(match_to_policy_name) - expected_policies = expected_policies[LATEST_POLICIES] + match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*" + expected_policies = MockPolicyEngine.gen_policies_latest(match_to_policy_name) body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', @@ -629,7 +655,7 @@ class WebServerPDPBoomTest(CPWebCase): def test_web_policy_latest(self): """test /policy_latest/<policy-id>""" - policy_id, _ = MonkeyPolicyEngine.gen_policy_latest(3) + policy_id, _ = MockPolicyEngine.gen_policy_latest(3) self.getPage("/policy_latest/{0}".format(policy_id or "")) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) @@ -637,6 +663,7 @@ class WebServerPDPBoomTest(CPWebCase): result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) + @pytest.mark.usefixtures("fix_deploy_handler") def test_web_all_policies_latest(self): """test GET /policies_latest""" result = self.getPage("/policies_latest") @@ -649,7 +676,7 @@ class WebServerPDPBoomTest(CPWebCase): def test_web_policies_latest(self): """test POST /policies_latest with policyName""" - match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*" + match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*" body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', @@ -789,7 +816,7 @@ class WebServerInternalBoomTest(CPWebCase): def test_web_policy_latest(self): """test /policy_latest/<policy-id>""" - policy_id, _ = MonkeyPolicyEngine.gen_policy_latest(3) + policy_id, _ = MockPolicyEngine.gen_policy_latest(3) self.getPage("/policy_latest/{0}".format(policy_id or "")) self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value) @@ -797,6 +824,7 @@ class WebServerInternalBoomTest(CPWebCase): result = self.getPage("/healthcheck") Settings.logger.info("healthcheck result: %s", result) + @pytest.mark.usefixtures("fix_deploy_handler") def test_web_all_policies_latest(self): """test GET /policies_latest""" result = self.getPage("/policies_latest") @@ -809,7 +837,7 @@ class WebServerInternalBoomTest(CPWebCase): def test_web_policies_latest(self): """test POST /policies_latest with policyName""" - match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*" + match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*" body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', |