summaryrefslogtreecommitdiffstats
path: root/tests/utils
diff options
context:
space:
mode:
Diffstat (limited to 'tests/utils')
-rw-r--r--tests/utils/__init__.py20
-rw-r--r--tests/utils/test_step_timer.py199
-rw-r--r--tests/utils/test_utils.py181
-rw-r--r--tests/utils/test_zzz_memory.py115
4 files changed, 515 insertions, 0 deletions
diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py
new file mode 100644
index 0000000..5e2fe60
--- /dev/null
+++ b/tests/utils/__init__.py
@@ -0,0 +1,20 @@
+# ================================================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+
+# empty __init__.py so that pytest can add correct path to coverage report, -- per pytest
+# best practice guideline
diff --git a/tests/utils/test_step_timer.py b/tests/utils/test_step_timer.py
new file mode 100644
index 0000000..152836d
--- /dev/null
+++ b/tests/utils/test_step_timer.py
@@ -0,0 +1,199 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""test of the step_timer"""
+
+import json
+import time
+from datetime import datetime
+
+from policyhandler.step_timer import StepTimer
+from policyhandler.utils import Utils
+
+_LOGGER = Utils.get_logger(__file__)
+
+class MockTimerController(object):
+ """testing step_timer"""
+ INIT = "init"
+ NEXT = "next"
+ STARTED = "started"
+ PAUSED = "paused"
+ STOPPING = "stopping"
+ STOPPED = "stopped"
+
+ def __init__(self, name, interval):
+ """step_timer test settings"""
+ self.name = name or "step_timer"
+ self.interval = interval or 5
+ self.step_timer = None
+ self.status = None
+ self.run_counter = 0
+ self.status_ts = datetime.utcnow()
+ self.exe_ts = None
+ self.exe_interval = None
+ self.set_status(MockTimerController.INIT)
+
+ def __enter__(self):
+ """constructor"""
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """destructor"""
+ self.stop_timer()
+
+ def on_time(self, *args, **kwargs):
+ """timer event"""
+ self.exe_ts = datetime.utcnow()
+ self.exe_interval = (self.exe_ts - self.status_ts).total_seconds()
+ _LOGGER.info("run on_time[%s] (%s, %s) in %s for %s",
+ self.run_counter, json.dumps(args), json.dumps(kwargs),
+ self.exe_interval, self.get_status())
+ time.sleep(3)
+ _LOGGER.info("done on_time[%s] (%s, %s) in %s for %s",
+ self.run_counter, json.dumps(args), json.dumps(kwargs),
+ self.exe_interval, self.get_status())
+
+ def verify_last_event(self):
+ """assertions needs to be in the main thread"""
+ if self.exe_interval is None:
+ _LOGGER.info("not executed: %s", self.get_status())
+ return
+
+ _LOGGER.info("verify exe %s for %s", self.exe_interval, self.get_status())
+ assert self.exe_interval >= (self.interval - 0.01)
+ assert self.exe_interval < 2 * self.interval
+ _LOGGER.info("success %s", self.get_status())
+
+ def run_timer(self):
+ """create and start the step_timer"""
+ if self.step_timer:
+ self.step_timer.next()
+ self.set_status(MockTimerController.NEXT)
+ return
+
+ self.step_timer = StepTimer(self.name, self.interval, MockTimerController.on_time, self)
+ self.step_timer.start()
+ self.set_status(MockTimerController.STARTED)
+
+ def pause_timer(self):
+ """pause step_timer"""
+ if self.step_timer:
+ self.step_timer.pause()
+ self.set_status(MockTimerController.PAUSED)
+
+ def stop_timer(self):
+ """stop step_timer"""
+ if self.step_timer:
+ self.set_status(MockTimerController.STOPPING)
+ self.step_timer.stop()
+ self.step_timer.join()
+ self.step_timer = None
+ self.set_status(MockTimerController.STOPPED)
+
+ def set_status(self, status):
+ """set the status of the timer"""
+ if status in [MockTimerController.NEXT, MockTimerController.STARTED]:
+ self.run_counter += 1
+
+ self.status = status
+ utcnow = datetime.utcnow()
+ time_step = (utcnow - self.status_ts).total_seconds()
+ self.status_ts = utcnow
+ _LOGGER.info("[%s]: %s", time_step, self.get_status())
+
+ def get_status(self):
+ """string representation"""
+ status = "{0}[{1}] {2} in {3} from {4} last exe {5}".format(
+ self.status, self.run_counter, self.name, self.interval,
+ str(self.status_ts), str(self.exe_ts)
+ )
+ if self.step_timer:
+ return "{0}: {1}".format(status, self.step_timer.get_timer_status())
+ return status
+
+
+def test_step_timer():
+ """test step_timer"""
+ _LOGGER.info("============ test_step_timer =========")
+ with MockTimerController("step_timer", 5) as step_timer:
+ step_timer.run_timer()
+ time.sleep(1)
+ step_timer.verify_last_event()
+
+ time.sleep(1 + step_timer.interval)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(3 * step_timer.interval)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(3 * step_timer.interval)
+ step_timer.verify_last_event()
+
+
+def test_interrupt_step_timer():
+ """test step_timer"""
+ _LOGGER.info("============ test_interrupt_step_timer =========")
+ with MockTimerController("step_timer", 5) as step_timer:
+ step_timer.run_timer()
+ time.sleep(1)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2 + step_timer.interval)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(2 + step_timer.interval)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(3 * step_timer.interval)
+ step_timer.verify_last_event()
diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py
new file mode 100644
index 0000000..18026ff
--- /dev/null
+++ b/tests/utils/test_utils.py
@@ -0,0 +1,181 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""test of the policy_utils"""
+
+import json
+import re
+
+from policyhandler.utils import RegexCoarser, Utils
+
+_LOGGER = Utils.get_logger(__file__)
+
+def check_coarse_regex(test_name, patterns, matching_strings=None, expected_subpatterns=None):
+ """generic test"""
+ regex_coarser = RegexCoarser(patterns)
+ coarse_patterns = regex_coarser.get_coarse_regex_patterns(max_length=20)
+ _LOGGER.info("check_coarse_regex %s (%s) for [%s]",
+ test_name, coarse_patterns, json.dumps(regex_coarser.patterns))
+ coarse_regexes = [re.compile(coarse_pattern) for coarse_pattern in coarse_patterns]
+ coarse_patterns_str = json.dumps(coarse_patterns)
+ if matching_strings:
+ for test_str in matching_strings:
+ _LOGGER.info(" match '%s' to %s (%s)", test_str, test_name, coarse_patterns_str)
+ assert bool(list(filter(None, [
+ coarse_regex.match(test_str) for coarse_regex in coarse_regexes
+ ])))
+
+ if expected_subpatterns:
+ for subpattern in expected_subpatterns:
+ _LOGGER.info(" subpattern '%s' in %s", subpattern, coarse_patterns_str)
+ assert subpattern in coarse_patterns_str
+
+def check_combined_regex(test_name, patterns, matching_strings=None, unmatching_strings=None):
+ """generic test"""
+ regex_coarser = RegexCoarser(patterns)
+ combined_pattern = regex_coarser.get_combined_regex_pattern()
+ _LOGGER.info("check_combined_regex %s (%s) for [%s]",
+ test_name, combined_pattern, json.dumps(regex_coarser.patterns))
+ coarse_regex = re.compile(combined_pattern)
+ if matching_strings:
+ for test_str in matching_strings:
+ _LOGGER.info(" match '%s' to %s (%s)", test_str, test_name, combined_pattern)
+ assert coarse_regex.match(test_str)
+
+ if unmatching_strings:
+ for test_str in unmatching_strings:
+ _LOGGER.info(" not match '%s' to %s (%s)", test_str, test_name, combined_pattern)
+ assert not coarse_regex.match(test_str)
+
+def test_regex_coarser():
+ """test variety of regex combinations"""
+
+ test_data = [
+ (
+ "simple",
+ [
+ "plain text", "plain pick",
+ "aaa (((a|b)|c)|d)",
+ "aaa (((a|b)|c)|d zzz",
+ "nested (expr[aeiou]ss(ions)?)",
+ "nested (expr[aeiou]ss(?:ions|ion)?)",
+ "^ (any expr|more|less|some|who cares)",
+ " (any expr|more|less|some|who cares)",
+ "(any expr|more|less|some|who cares)"
+ ],
+ [
+ 'plain text',
+ 'nested exprussions',
+ 'nested expross',
+ 'aaa c',
+ 'aaa d',
+ 'who cares',
+ ' who cares'
+ ],
+ None,
+ [
+ "nested .*",
+ "plain .*",
+ "aaa .*",
+ "(any expr|more|less|some|who cares)",
+ "^ (any expr|more|less|some|who cares)",
+ " (any expr|more|less|some|who cares)"]
+ ),
+ (
+ "combination",
+ [
+ "plain text",
+ "^with* modifiers?",
+ "cha[ra][ra]cter classes",
+ "^with (groups)",
+ "^with (groups|more groups)",
+ r"^with (mod+ifiers\s*|in groups{2,3}\s*)+",
+ "sub",
+ "substrings",
+ "su.*bstrings",
+ "char count{1,3}s",
+ "nested (expr[aeiou]ss(ions)?)",
+ r"escaped (\)) chars",
+ r"escaped ([\)\]]) chars",
+ r"escaped ([\)\]]){3} chars"
+ ],
+ [
+ 'plain text',
+ 'withhh modifier',
+ 'character classes',
+ 'with groups',
+ 'with more groups',
+ 'with modddifiers in groupss modifiers in groupsss',
+ 'sub',
+ 'substrings',
+ 'char counttts',
+ 'nested exprassions',
+ 'nested express',
+ 'escaped ) chars',
+ 'escaped ] chars',
+ 'escaped ]]] chars'
+ ],
+ [
+ 'plain',
+ 'text',
+ 'something with modifiers',
+ 'su',
+ 'char counttttts',
+ 'escaped ]] chars'
+ ],
+ [
+ "nested .*",
+ "escaped .*",
+ "^wit.*",
+ "plain text",
+ "cha.*",
+ "su.*"
+ ]
+ ),
+ (
+ "combined",
+ [
+ 'foo+',
+ 'food',
+ 'football',
+ "ba[rh]",
+ "bard"
+ ],
+ [
+ 'foo',
+ 'fooooo',
+ 'football',
+ 'food',
+ 'bar',
+ 'bah',
+ 'bard'
+ ],
+ [
+ 'fo',
+ 'bat'
+ ],
+ [
+ "fo.*",
+ "ba.*"
+ ]
+ )
+ ]
+
+ for (test_name, patterns,
+ matching_strings, unmatching_strings, expected_subpatterns) in test_data:
+ check_combined_regex(test_name, patterns, matching_strings, unmatching_strings)
+ check_coarse_regex(test_name, patterns, matching_strings, expected_subpatterns)
diff --git a/tests/utils/test_zzz_memory.py b/tests/utils/test_zzz_memory.py
new file mode 100644
index 0000000..4b934cb
--- /dev/null
+++ b/tests/utils/test_zzz_memory.py
@@ -0,0 +1,115 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""test of the package for policy-handler of DCAE-Controller"""
+
+import gc
+import json
+import time
+
+from policyhandler.onap.audit import Audit, AuditHttpCode, Metrics
+from policyhandler.utils import Utils
+
+_LOGGER = Utils.get_logger(__file__)
+
+class Node(object):
+ """making the cycled objects"""
+ def __init__(self, name):
+ self.name = name
+ self.next = None
+ def __repr__(self):
+ return '%s(%s)' % (self.__class__.__name__, self.name)
+
+
+def test_healthcheck():
+ """test /healthcheck"""
+ audit = Audit(job_name="test_healthcheck",
+ req_message="get /healthcheck")
+ metrics = Metrics(aud_parent=audit, targetEntity="test_healthcheck")
+ metrics.metrics_start("test /healthcheck")
+ time.sleep(0.1)
+
+ metrics.metrics("test /healthcheck")
+ health = audit.health(full=True)
+ audit.audit_done(result=json.dumps(health))
+
+ _LOGGER.info("healthcheck: %s", json.dumps(health))
+ assert bool(health)
+
+
+def test_healthcheck_with_error():
+ """test /healthcheck"""
+ audit = Audit(job_name="test_healthcheck_with_error",
+ req_message="get /healthcheck")
+ metrics = Metrics(aud_parent=audit, targetEntity="test_healthcheck_with_error")
+ metrics.metrics_start("test /healthcheck")
+ time.sleep(0.2)
+ audit.error("error from test_healthcheck_with_error")
+ audit.fatal("fatal from test_healthcheck_with_error")
+ audit.debug("debug from test_healthcheck_with_error")
+ audit.warn("debug from test_healthcheck_with_error")
+ audit.info_requested("debug from test_healthcheck_with_error")
+ if audit.is_success():
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ metrics.metrics("test /healthcheck")
+
+ health = audit.health(full=True)
+ audit.audit_done(result=json.dumps(health))
+
+ _LOGGER.info("healthcheck: %s", json.dumps(health))
+ assert bool(health)
+
+
+def test_healthcheck_with_garbage():
+ """test /healthcheck"""
+ gc_found = gc.collect()
+ gc.set_debug(gc.DEBUG_LEAK)
+
+ node1 = Node("one")
+ node2 = Node("two")
+ node3 = Node("three")
+ node1.next = node2
+ node2.next = node3
+ node3.next = node1
+ node1 = node2 = node3 = None
+ gc_found = gc.collect()
+
+ audit = Audit(job_name="test_healthcheck_with_garbage",
+ req_message="get /test_healthcheck_with_garbage")
+ health = audit.health(full=True)
+ audit.audit_done(result=json.dumps(health))
+
+ _LOGGER.info("test_healthcheck_with_garbage[%s]: %s", gc_found, json.dumps(health))
+ assert bool(health)
+ assert bool(health.get("runtime", {}).get("gc", {}).get("gc_garbage"))
+
+ _LOGGER.info("clearing up garbage...")
+ for obj in gc.garbage:
+ if isinstance(obj, Node):
+ _LOGGER.info("in garbage: %s 0x%x", obj, id(obj))
+ obj.next = None
+
+ gc_found = gc.collect()
+ _LOGGER.info("after clear test_healthcheck_with_garbage[%s]: %s",
+ gc_found, json.dumps(audit.health(full=True)))
+
+ gc.set_debug(False)
+
+ gc_found = gc.collect()
+ _LOGGER.info("after turned off gc debug test_healthcheck_with_garbage[%s]: %s",
+ gc_found, json.dumps(audit.health(full=True)))