summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py21
-rw-r--r--tests/mock_settings.py59
-rw-r--r--tests/test_policyhandler.py962
-rw-r--r--tests/test_step_timer.py209
-rw-r--r--tests/test_zzz_memory.py117
5 files changed, 1275 insertions, 93 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..1875bf6
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,21 @@
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+
+# empty __init__.py so that pytest can add correct path to coverage report, -- per pytest
+# best practice guideline
diff --git a/tests/mock_settings.py b/tests/mock_settings.py
new file mode 100644
index 0000000..28bc6cc
--- /dev/null
+++ b/tests/mock_settings.py
@@ -0,0 +1,59 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+"""settings that are general to all tests"""
+
+import json
+import logging
+import sys
+import uuid
+from datetime import datetime
+
+from policyhandler import LogWriter
+from policyhandler.config import Config
+from policyhandler.onap.audit import Audit
+
+
+class Settings(object):
+ """init all locals"""
+ logger = None
+ RUN_TS = datetime.utcnow().isoformat()[:-3] + 'Z'
+ dicovered_config = None
+ deploy_handler_instance_uuid = str(uuid.uuid4())
+
+ @staticmethod
+ def init():
+ """init configs"""
+ Config.load_from_file()
+
+ with open("etc_upload/config.json", 'r') as config_json:
+ Settings.dicovered_config = json.load(config_json)
+
+ Config.load_from_file("etc_upload/config.json")
+
+ Config.settings["catch_up"] = {"interval": 10, "max_skips": 2}
+
+ Settings.logger = logging.getLogger("policy_handler.unit_test")
+ sys.stdout = LogWriter(Settings.logger.info)
+ sys.stderr = LogWriter(Settings.logger.error)
+
+ print("print ========== run_policy_handler ==========")
+ Settings.logger.info("========== run_policy_handler ==========")
+ Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH)
+
+ Settings.logger.info("starting policy_handler with config:")
+ Settings.logger.info(Audit.log_json_dumps(Config.settings))
diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py
index 61ca00c..ec0b030 100644
--- a/tests/test_policyhandler.py
+++ b/tests/test_policyhandler.py
@@ -1,7 +1,5 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,45 +16,84 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import sys
+"""test of the package for policy-handler of DCAE-Controller"""
+
+import copy
import json
import re
-import logging
-from datetime import datetime
+import time
+import uuid
-# import pytest
+import pytest
+
+import cherrypy
+from cherrypy.test.helper import CPWebCase
from policyhandler.config import Config
-from policyhandler.policy_handler import LogWriter
-from policyhandler.onap.audit import Audit
-from policyhandler.policy_rest import PolicyRest, PolicyUtils
-from policyhandler.policy_consts import POLICY_ID, POLICY_VERSION, POLICY_NAME, \
- POLICY_BODY, POLICY_CONFIG
+from policyhandler.deploy_handler import DeployHandler
+from policyhandler.discovery import DiscoveryClient
+from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit,
+ AuditHttpCode)
+from policyhandler.policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES,
+ LATEST_POLICIES, POLICY_BODY,
+ POLICY_CONFIG, POLICY_ID, POLICY_NAME,
+ POLICY_VERSION, SCOPE_PREFIXES)
+from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER,
+ REMOVED_POLICIES, PolicyReceiver)
+from policyhandler.policy_rest import PolicyRest
+from policyhandler.policy_utils import PolicyUtils, Utils
+from policyhandler.web_server import _PolicyWeb
-class Settings(object):
- """init all locals"""
- logger = None
- RUN_TS = datetime.utcnow().isoformat()[:-3] + 'Z'
+from .mock_settings import Settings
- @staticmethod
- def init():
- """init locals"""
- Config.load_from_file()
- Config.load_from_file("etc_upload/config.json")
+Settings.init()
- Settings.logger = logging.getLogger("policy_handler")
- sys.stdout = LogWriter(Settings.logger.info)
- sys.stderr = LogWriter(Settings.logger.error)
+class MonkeyHttpResponse(object):
+ """Monkey http reposne"""
+ def __init__(self, headers):
+ self.headers = headers or {}
- Settings.logger.info("========== run_policy_handler ==========")
- Audit.init(Config.get_system_name(), Config.LOGGER_CONFIG_FILE_PATH)
- Settings.logger.info("starting policy_handler with config:")
- Settings.logger.info(Audit.log_json_dumps(Config.config))
+class MonkeyedResponse(object):
+ """Monkey response"""
+ def __init__(self, full_path, res_json, json_body=None, headers=None):
+ self.full_path = full_path
+ self.req_json = json_body or {}
+ self.status_code = 200
+ self.request = MonkeyHttpResponse(headers)
+ self.res = res_json
+ self.text = json.dumps(self.res)
- PolicyRest._lazy_init()
+ def json(self):
+ """returns json of response"""
+ return self.res
+
+ def raise_for_status(self):
+ """ignoring"""
+ pass
+
+
+def monkeyed_discovery(full_path):
+ """monkeypatch for get from consul"""
+ res_json = {}
+ if full_path == DiscoveryClient.CONSUL_SERVICE_MASK.format(Config.settings["deploy_handler"]):
+ res_json = [{
+ "ServiceAddress": "1.1.1.1",
+ "ServicePort": "123"
+ }]
+ elif full_path == DiscoveryClient.CONSUL_KV_MASK.format(Config.get_system_name()):
+ res_json = copy.deepcopy(Settings.dicovered_config)
+ return MonkeyedResponse(full_path, res_json)
+
+
+@pytest.fixture()
+def fix_discovery(monkeypatch):
+ """monkeyed discovery request.get"""
+ Settings.logger.info("setup fix_discovery")
+ monkeypatch.setattr('policyhandler.discovery.requests.get', monkeyed_discovery)
+ yield fix_discovery # provide the fixture value
+ Settings.logger.info("teardown fix_discovery")
-Settings.init()
class MonkeyPolicyBody(object):
"""policy body that policy-engine returns"""
@@ -80,34 +117,19 @@ class MonkeyPolicyBody(object):
POLICY_VERSION: this_ver,
POLICY_CONFIG: json.dumps(config),
"matchingConditions": {
- "ECOMPName": "DCAE",
+ "ONAPName": "DCAE",
"ConfigName": "alex_config_name"
},
"responseAttributes": {},
"property": None
}
- @staticmethod
- def is_the_same_dict(policy_body_1, policy_body_2):
- """check whether both policy_body objects are the same"""
- if not isinstance(policy_body_1, dict) or not isinstance(policy_body_2, dict):
- return False
- for key in policy_body_1.keys():
- if key not in policy_body_2:
- return False
- if isinstance(policy_body_1[key], dict):
- return MonkeyPolicyBody.is_the_same_dict(
- policy_body_1[key], policy_body_2[key])
- if (policy_body_1[key] is None and policy_body_2[key] is not None) \
- or (policy_body_1[key] is not None and policy_body_2[key] is None) \
- or (policy_body_1[key] != policy_body_2[key]):
- return False
- return True
class MonkeyPolicyEngine(object):
"""pretend this is the policy-engine"""
- _scope_prefix = Config.config["scope_prefixes"][0]
- LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur""".split()
+ _scope_prefix = Config.settings["scope_prefixes"][0]
+ LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur ametist""".split()
+ LONG_TEXT = "0123456789" * 100
_policies = []
@staticmethod
@@ -115,69 +137,823 @@ class MonkeyPolicyEngine(object):
"""init static vars"""
MonkeyPolicyEngine._policies = [
MonkeyPolicyBody.create_policy_body(
- MonkeyPolicyEngine._scope_prefix + policy_id, policy_version)
+ MonkeyPolicyEngine._scope_prefix + policy_id, policy_index + 1)
for policy_id in MonkeyPolicyEngine.LOREM_IPSUM
- for policy_version in range(1, 1 + MonkeyPolicyEngine.LOREM_IPSUM.index(policy_id))]
+ for policy_index in range(1 + MonkeyPolicyEngine.LOREM_IPSUM.index(policy_id))]
+ Settings.logger.info("MonkeyPolicyEngine._policies: %s",
+ json.dumps(MonkeyPolicyEngine._policies))
@staticmethod
def get_config(policy_name):
"""find policy the way the policy-engine finds"""
if not policy_name:
return []
- if policy_name[-2:] == ".*":
- policy_name = policy_name[:-2]
- return [policy for policy in MonkeyPolicyEngine._policies
+ return [copy.deepcopy(policy)
+ for policy in MonkeyPolicyEngine._policies
if re.match(policy_name, policy[POLICY_NAME])]
@staticmethod
+ def get_configs_all():
+ """get all policies the way the policy-engine finds"""
+ policies = [copy.deepcopy(policy)
+ for policy in MonkeyPolicyEngine._policies]
+ for policy in policies:
+ policy["config"] = MonkeyPolicyEngine.LONG_TEXT
+ return policies
+
+ @staticmethod
def get_policy_id(policy_index):
"""get the policy_id by index"""
- return MonkeyPolicyEngine._scope_prefix \
- + MonkeyPolicyEngine.LOREM_IPSUM[policy_index % len(MonkeyPolicyEngine.LOREM_IPSUM)]
+ return (MonkeyPolicyEngine._scope_prefix
+ + MonkeyPolicyEngine.LOREM_IPSUM[
+ policy_index % len(MonkeyPolicyEngine.LOREM_IPSUM)])
+
+ @staticmethod
+ def gen_policy_latest(policy_index):
+ """generate the policy response by policy_index = version - 1"""
+ policy_id = MonkeyPolicyEngine.get_policy_id(policy_index)
+ expected_policy = {
+ POLICY_ID: policy_id,
+ POLICY_BODY: MonkeyPolicyBody.create_policy_body(policy_id, policy_index + 1)
+ }
+ return policy_id, PolicyUtils.parse_policy_config(expected_policy)
+
+ @staticmethod
+ def gen_all_policies_latest():
+ """generate all latest policies"""
+ return {
+ LATEST_POLICIES: dict(MonkeyPolicyEngine.gen_policy_latest(policy_index)
+ for policy_index in range(len(MonkeyPolicyEngine.LOREM_IPSUM))),
+ ERRORED_SCOPES: ["DCAE.Config_*"],
+ SCOPE_PREFIXES: ["DCAE.Config_*"],
+ ERRORED_POLICIES: {}
+ }
+
+ @staticmethod
+ def gen_policies_latest(match_to_policy_name):
+ """generate all latest policies"""
+ return {
+ LATEST_POLICIES:
+ dict((k, v)
+ for k, v in MonkeyPolicyEngine.gen_all_policies_latest()
+ [LATEST_POLICIES].items()
+ if re.match(match_to_policy_name, k)),
+ ERRORED_SCOPES: [],
+ ERRORED_POLICIES: {}
+ }
+
MonkeyPolicyEngine.init()
-class MonkeyHttpResponse(object):
- """Monkey http reposne"""
- def __init__(self, headers):
- self.headers = headers or {}
-class MonkeyedResponse(object):
- """Monkey response"""
- def __init__(self, full_path, json_body, headers):
- self.full_path = full_path
- self.req_json = json_body or {}
- self.status_code = 200
- self.request = MonkeyHttpResponse(headers)
- self.req_policy_name = self.req_json.get(POLICY_NAME)
- self.res = MonkeyPolicyEngine.get_config(self.req_policy_name)
- self.text = json.dumps(self.res)
+@pytest.fixture()
+def fix_pdp_post(monkeypatch):
+ """monkeyed request /getConfig to PDP"""
+ def monkeyed_policy_rest_post(full_path, json=None, headers=None):
+ """monkeypatch for the POST to policy-engine"""
+ res_json = MonkeyPolicyEngine.get_config(json.get(POLICY_NAME))
+ return MonkeyedResponse(full_path, res_json, json, headers)
+
+ Settings.logger.info("setup fix_pdp_post")
+ PolicyRest._lazy_init()
+ monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post',
+ monkeyed_policy_rest_post)
+ yield fix_pdp_post # provide the fixture value
+ Settings.logger.info("teardown fix_pdp_post")
+
+
+@pytest.fixture()
+def fix_pdp_post_big(monkeypatch):
+ """monkeyed request /getConfig to PDP"""
+ def monkeyed_policy_rest_post(full_path, json=None, headers=None):
+ """monkeypatch for the POST to policy-engine"""
+ res_json = MonkeyPolicyEngine.get_configs_all()
+ return MonkeyedResponse(full_path, res_json, json, headers)
+
+ Settings.logger.info("setup fix_pdp_post_big")
+ PolicyRest._lazy_init()
+ monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post',
+ monkeyed_policy_rest_post)
+ yield fix_pdp_post_big # provide the fixture value
+ Settings.logger.info("teardown fix_pdp_post_big")
+
+
+class MockException(Exception):
+ """mock exception"""
+ pass
+
+
+@pytest.fixture()
+def fix_pdp_post_boom(monkeypatch):
+ """monkeyed request /getConfig to PDP - exception"""
+ def monkeyed_policy_rest_post_boom(full_path, json=None, headers=None):
+ """monkeypatch for the POST to policy-engine"""
+ raise MockException("fix_pdp_post_boom")
+
+ Settings.logger.info("setup fix_pdp_post_boom")
+ PolicyRest._lazy_init()
+ monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post',
+ monkeyed_policy_rest_post_boom)
+ yield fix_pdp_post_boom
+ Settings.logger.info("teardown fix_pdp_post_boom")
+
+@staticmethod
+def monkeyed_boom(*args, **kwargs):
+ """monkeypatch for the select_latest_policies"""
+ raise MockException("monkeyed_boom")
+
+@pytest.fixture()
+def fix_select_latest_policies_boom(monkeypatch):
+ """monkeyed exception"""
+
+ Settings.logger.info("setup fix_select_latest_policies_boom")
+ monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.select_latest_policies',
+ monkeyed_boom)
+ monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.select_latest_policy',
+ monkeyed_boom)
+ monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.extract_policy_id',
+ monkeyed_boom)
+
+ yield fix_select_latest_policies_boom
+ Settings.logger.info("teardown fix_select_latest_policies_boom")
+
+
+@pytest.fixture()
+def fix_deploy_handler(monkeypatch, fix_discovery):
+ """monkeyed discovery request.get"""
+ def monkeyed_deploy_handler(full_path, json=None, headers=None):
+ """monkeypatch for deploy_handler"""
+ return MonkeyedResponse(
+ full_path,
+ {"server_instance_uuid": Settings.deploy_handler_instance_uuid},
+ json, headers
+ )
+
+ Settings.logger.info("setup fix_deploy_handler")
+ audit = Audit(req_message="fix_deploy_handler")
+ DeployHandler._lazy_init(audit)
+ monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.post',
+ monkeyed_deploy_handler)
+ yield fix_deploy_handler # provide the fixture value
+ Settings.logger.info("teardown fix_deploy_handler")
+
+
+@pytest.fixture()
+def fix_deploy_handler_fail(monkeypatch, fix_discovery):
+ """monkeyed failed discovery request.get"""
+ def monkeyed_deploy_handler(full_path, json=None, headers=None):
+ """monkeypatch for deploy_handler"""
+ res = MonkeyedResponse(
+ full_path,
+ {"server_instance_uuid": Settings.deploy_handler_instance_uuid},
+ json, headers
+ )
+ res.status_code = 413
+ return res
+
+ @staticmethod
+ def monkeyed_deploy_handler_init(audit_ignore, rediscover=False):
+ """monkeypatch for deploy_handler init"""
+ DeployHandler._url = None
+
+ Settings.logger.info("setup fix_deploy_handler_fail")
+ config_catch_up = Config.settings["catch_up"]
+ Config.settings["catch_up"] = {"interval": 1, "max_skips": 0}
+
+ audit = Audit(req_message="fix_deploy_handler_fail")
+ DeployHandler._lazy_init(audit, rediscover=True)
+ monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.post',
+ monkeyed_deploy_handler)
+ monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._lazy_init',
+ monkeyed_deploy_handler_init)
+ yield fix_deploy_handler_fail
+ Settings.logger.info("teardown fix_deploy_handler_fail")
+ Config.settings["catch_up"] = config_catch_up
+
+
+def monkeyed_cherrypy_engine_exit():
+ """monkeypatch for deploy_handler"""
+ Settings.logger.info("cherrypy_engine_exit()")
+
+
+@pytest.fixture()
+def fix_cherrypy_engine_exit(monkeypatch):
+ """monkeyed cherrypy.engine.exit()"""
+ Settings.logger.info("setup fix_cherrypy_engine_exit")
+ monkeypatch.setattr('policyhandler.web_server.cherrypy.engine.exit',
+ monkeyed_cherrypy_engine_exit)
+ yield fix_cherrypy_engine_exit # provide the fixture value
+ Settings.logger.info("teardown fix_cherrypy_engine_exit")
- def json(self):
- """returns json of response"""
- return self.res
-def monkeyed_policy_rest_post(full_path, json={}, headers={}):
- """monkeypatch for the POST to policy-engine"""
- return MonkeyedResponse(full_path, json, headers)
+class MonkeyedWebSocket(object):
+ """Monkey websocket"""
+ on_message = None
-def test_get_policy_latest(monkeypatch):
+ @staticmethod
+ def send_notification(updated_indexes):
+ """fake notification through the web-socket"""
+ if not MonkeyedWebSocket.on_message:
+ return
+ message = {
+ LOADED_POLICIES: [
+ {POLICY_NAME: "{0}.{1}.xml".format(
+ MonkeyPolicyEngine.get_policy_id(policy_index), policy_index + 1),
+ POLICY_VER: str(policy_index + 1)}
+ for policy_index in updated_indexes or []
+ ],
+ REMOVED_POLICIES : []
+ }
+ message = json.dumps(message)
+ Settings.logger.info("send_notification: %s", message)
+ MonkeyedWebSocket.on_message(None, message)
+
+ @staticmethod
+ def enableTrace(yes_no):
+ """ignore"""
+ pass
+
+ class MonkeyedSocket(object):
+ """Monkey websocket"""
+ def __init__(self):
+ self.connected = True
+
+ class WebSocketApp(object):
+ """Monkeyed WebSocketApp"""
+ def __init__(self, web_socket_url, on_message=None, on_close=None, on_error=None):
+ self.web_socket_url = web_socket_url
+ self.on_message = MonkeyedWebSocket.on_message = on_message
+ self.on_close = on_close
+ self.on_error = on_error
+ self.sock = MonkeyedWebSocket.MonkeyedSocket()
+ Settings.logger.info("MonkeyedWebSocket for: %s", self.web_socket_url)
+
+ def run_forever(self):
+ """forever in the loop"""
+ counter = 0
+ while self.sock.connected:
+ counter += 1
+ Settings.logger.info("MonkeyedWebSocket sleep %s...", counter)
+ time.sleep(5)
+ Settings.logger.info("MonkeyedWebSocket exit %s", counter)
+
+ def close(self):
+ """close socket"""
+ self.sock.connected = False
+
+
+@pytest.fixture()
+def fix_policy_receiver_websocket(monkeypatch):
+ """monkeyed websocket for policy_receiver"""
+ Settings.logger.info("setup fix_policy_receiver_websocket")
+ monkeypatch.setattr('policyhandler.policy_receiver.websocket', MonkeyedWebSocket)
+ yield fix_policy_receiver_websocket # provide the fixture value
+ Settings.logger.info("teardown fix_policy_receiver_websocket")
+
+
+def test_get_policy_latest(fix_pdp_post):
"""test /policy_latest/<policy-id>"""
- monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post', \
- monkeyed_policy_rest_post)
- policy_index = 3
- policy_id = MonkeyPolicyEngine.get_policy_id(policy_index)
- expected_policy = {
- POLICY_ID : policy_id,
- POLICY_BODY : MonkeyPolicyBody.create_policy_body(policy_id, policy_index)
- }
- expected_policy = PolicyUtils.parse_policy_config(expected_policy)
-
- audit = Audit(req_message="get /policy_latest/{0}".format(policy_id or ""))
- policy_latest = PolicyRest.get_latest_policy((audit, policy_id)) or {}
+ policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3)
+
+ audit = Audit(job_name="test_get_policy_latest",
+ req_message="get /policy_latest/{0}".format(policy_id or ""))
+ policy_latest = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {}
audit.audit_done(result=json.dumps(policy_latest))
- Settings.logger.info("expected_policy: {0}".format(json.dumps(expected_policy)))
- Settings.logger.info("policy_latest: {0}".format(json.dumps(policy_latest)))
- assert MonkeyPolicyBody.is_the_same_dict(policy_latest, expected_policy)
- assert MonkeyPolicyBody.is_the_same_dict(expected_policy, policy_latest)
+ Settings.logger.info("expected_policy: %s", json.dumps(expected_policy))
+ Settings.logger.info("policy_latest: %s", json.dumps(policy_latest))
+ assert Utils.are_the_same(policy_latest, expected_policy)
+
+
+
+@pytest.mark.usefixtures("fix_pdp_post")
+class WebServerTest(CPWebCase):
+ """testing the web-server - runs tests in alphabetical order of method names"""
+ def setup_server():
+ """setup the web-server"""
+ cherrypy.tree.mount(_PolicyWeb(), '/')
+
+ setup_server = staticmethod(setup_server)
+
+ def test_web_healthcheck(self):
+ """test /healthcheck"""
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+ Settings.logger.info("got healthcheck: %s", self.body)
+ self.assertStatus('200 OK')
+
+ def test_web_policy_latest(self):
+ """test /policy_latest/<policy-id>"""
+ policy_id, expected_policy = MonkeyPolicyEngine.gen_policy_latest(3)
+
+ self.getPage("/policy_latest/{0}".format(policy_id or ""))
+ self.assertStatus('200 OK')
+
+ policy_latest = json.loads(self.body)
+
+ Settings.logger.info("policy_latest: %s", self.body)
+ Settings.logger.info("expected_policy: %s", json.dumps(expected_policy))
+ assert Utils.are_the_same(policy_latest, expected_policy)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ def test_web_all_policies_latest(self):
+ """test GET /policies_latest"""
+ expected_policies = MonkeyPolicyEngine.gen_all_policies_latest()
+ expected_policies = expected_policies[LATEST_POLICIES]
+
+ result = self.getPage("/policies_latest")
+ Settings.logger.info("result: %s", result)
+ Settings.logger.info("body: %s", self.body)
+ self.assertStatus('200 OK')
+
+ policies_latest = json.loads(self.body)
+ self.assertIn(LATEST_POLICIES, policies_latest)
+ policies_latest = policies_latest[LATEST_POLICIES]
+
+ Settings.logger.info("policies_latest: %s", json.dumps(policies_latest))
+ Settings.logger.info("expected_policies: %s", json.dumps(expected_policies))
+ assert Utils.are_the_same(policies_latest, expected_policies)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ def test_web_policies_latest(self):
+ """test POST /policies_latest with policyName"""
+ match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*"
+ expected_policies = MonkeyPolicyEngine.gen_policies_latest(match_to_policy_name)
+ expected_policies = expected_policies[LATEST_POLICIES]
+
+ body = json.dumps({POLICY_NAME: match_to_policy_name})
+ result = self.getPage("/policies_latest", method='POST',
+ body=body,
+ headers=[
+ (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())),
+ ("Content-Type", "application/json"),
+ ('Content-Length', str(len(body)))
+ ])
+ Settings.logger.info("result: %s", result)
+ Settings.logger.info("body: %s", self.body)
+ self.assertStatus('200 OK')
+
+ policies_latest = json.loads(self.body)[LATEST_POLICIES]
+
+ Settings.logger.info("policies_latest: %s", json.dumps(policies_latest))
+ Settings.logger.info("expected_policies: %s", json.dumps(expected_policies))
+ assert Utils.are_the_same(policies_latest, expected_policies)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
+ def test_zzz_policy_updates_and_catch_ups(self):
+ """test run policy handler with policy updates and catchups"""
+ Settings.logger.info("start policy_updates_and_catch_ups")
+ audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups",
+ req_message="start policy_updates_and_catch_ups")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep before send_notification...")
+ time.sleep(2)
+
+ MonkeyedWebSocket.send_notification([1, 3, 5])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ Settings.logger.info("sleep 30 before shutdown...")
+ time.sleep(30)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
+ def test_zzz_catch_up_on_deploy_handler_changed(self):
+ """test run policy handler with deployment-handler changed underneath"""
+ Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed")
+ audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed",
+ req_message="start zzz_catch_up_on_deploy_handler_changed")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep before send_notification...")
+ time.sleep(2)
+
+ MonkeyedWebSocket.send_notification([1])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ Settings.deploy_handler_instance_uuid = str(uuid.uuid4())
+ Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid)
+
+ MonkeyedWebSocket.send_notification([2, 4])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ Settings.logger.info("sleep 5 before shutdown...")
+ time.sleep(5)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
+ def test_zzz_get_catch_up(self):
+ """test /catch_up"""
+ Settings.logger.info("start /catch_up")
+ audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up")
+ PolicyReceiver.run(audit)
+ time.sleep(5)
+ result = self.getPage("/catch_up")
+ Settings.logger.info("catch_up result: %s", result)
+ self.assertStatus('200 OK')
+ Settings.logger.info("got catch_up: %s", self.body)
+
+ Settings.logger.info("sleep 5 before shutdown...")
+ time.sleep(5)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ @pytest.mark.usefixtures(
+ "fix_deploy_handler",
+ "fix_policy_receiver_websocket",
+ "fix_cherrypy_engine_exit")
+ def test_zzzzz_shutdown(self):
+ """test shutdown"""
+ Settings.logger.info("start shutdown")
+ audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep before send_notification...")
+ time.sleep(2)
+
+ MonkeyedWebSocket.send_notification([1, 3, 5])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ WebServerTest.do_gc_test = False
+ Settings.logger.info("shutdown...")
+ result = self.getPage("/shutdown")
+ Settings.logger.info("shutdown result: %s", result)
+ self.assertStatus('200 OK')
+ Settings.logger.info("got shutdown: %s", self.body)
+ time.sleep(1)
+
+
+@pytest.mark.usefixtures("fix_pdp_post_boom")
+class WebServerPDPBoomTest(CPWebCase):
+ """testing the web-server - runs tests in alphabetical order of method names"""
+ def setup_server():
+ """setup the web-server"""
+ cherrypy.tree.mount(_PolicyWeb(), '/')
+
+ setup_server = staticmethod(setup_server)
+
+ def test_web_healthcheck(self):
+ """test /healthcheck"""
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+ Settings.logger.info("got healthcheck: %s", self.body)
+ self.assertStatus('200 OK')
+
+ def test_web_policy_latest(self):
+ """test /policy_latest/<policy-id>"""
+ policy_id, _ = MonkeyPolicyEngine.gen_policy_latest(3)
+
+ self.getPage("/policy_latest/{0}".format(policy_id or ""))
+ self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ def test_web_all_policies_latest(self):
+ """test GET /policies_latest"""
+ result = self.getPage("/policies_latest")
+ Settings.logger.info("result: %s", result)
+ Settings.logger.info("body: %s", self.body)
+ self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ def test_web_policies_latest(self):
+ """test POST /policies_latest with policyName"""
+ match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*"
+
+ body = json.dumps({POLICY_NAME: match_to_policy_name})
+ result = self.getPage("/policies_latest", method='POST',
+ body=body,
+ headers=[
+ (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())),
+ ("Content-Type", "application/json"),
+ ('Content-Length', str(len(body)))
+ ])
+ Settings.logger.info("result: %s", result)
+ Settings.logger.info("body: %s", self.body)
+ self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
+ def test_zzz_policy_updates_and_catch_ups(self):
+ """test run policy handler with policy updates and catchups"""
+ Settings.logger.info("start policy_updates_and_catch_ups")
+ audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups",
+ req_message="start policy_updates_and_catch_ups")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep before send_notification...")
+ time.sleep(2)
+
+ MonkeyedWebSocket.send_notification([1, 3, 5])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ Settings.logger.info("sleep 30 before shutdown...")
+ time.sleep(30)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
+ def test_zzz_catch_up_on_deploy_handler_changed(self):
+ """test run policy handler with deployment-handler changed underneath"""
+ Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed")
+ audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed",
+ req_message="start zzz_catch_up_on_deploy_handler_changed")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep before send_notification...")
+ time.sleep(2)
+
+ MonkeyedWebSocket.send_notification([1])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ Settings.deploy_handler_instance_uuid = str(uuid.uuid4())
+ Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid)
+
+ MonkeyedWebSocket.send_notification([2, 4])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ Settings.logger.info("sleep 5 before shutdown...")
+ time.sleep(5)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
+ def test_zzz_get_catch_up(self):
+ """test /catch_up"""
+ Settings.logger.info("start /catch_up")
+ audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up")
+ PolicyReceiver.run(audit)
+ time.sleep(5)
+ result = self.getPage("/catch_up")
+ Settings.logger.info("catch_up result: %s", result)
+ self.assertStatus('200 OK')
+ Settings.logger.info("got catch_up: %s", self.body)
+
+ Settings.logger.info("sleep 5 before shutdown...")
+ time.sleep(5)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ @pytest.mark.usefixtures(
+ "fix_deploy_handler",
+ "fix_policy_receiver_websocket",
+ "fix_cherrypy_engine_exit")
+ def test_zzzzz_shutdown(self):
+ """test shutdown"""
+ Settings.logger.info("start shutdown")
+ audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep before send_notification...")
+ time.sleep(2)
+
+ MonkeyedWebSocket.send_notification([1, 3, 5])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ WebServerPDPBoomTest.do_gc_test = False
+ Settings.logger.info("shutdown...")
+ result = self.getPage("/shutdown")
+ Settings.logger.info("shutdown result: %s", result)
+ self.assertStatus('200 OK')
+ Settings.logger.info("got shutdown: %s", self.body)
+ time.sleep(1)
+
+
+@pytest.mark.usefixtures("fix_pdp_post", "fix_select_latest_policies_boom")
+class WebServerInternalBoomTest(CPWebCase):
+ """testing the web-server - runs tests in alphabetical order of method names"""
+ def setup_server():
+ """setup the web-server"""
+ cherrypy.tree.mount(_PolicyWeb(), '/')
+
+ setup_server = staticmethod(setup_server)
+
+ def test_web_healthcheck(self):
+ """test /healthcheck"""
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+ Settings.logger.info("got healthcheck: %s", self.body)
+ self.assertStatus('200 OK')
+
+ def test_web_policy_latest(self):
+ """test /policy_latest/<policy-id>"""
+ policy_id, _ = MonkeyPolicyEngine.gen_policy_latest(3)
+
+ self.getPage("/policy_latest/{0}".format(policy_id or ""))
+ self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ def test_web_all_policies_latest(self):
+ """test GET /policies_latest"""
+ result = self.getPage("/policies_latest")
+ Settings.logger.info("result: %s", result)
+ Settings.logger.info("body: %s", self.body)
+ self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ def test_web_policies_latest(self):
+ """test POST /policies_latest with policyName"""
+ match_to_policy_name = Config.settings["scope_prefixes"][0] + "amet.*"
+
+ body = json.dumps({POLICY_NAME: match_to_policy_name})
+ result = self.getPage("/policies_latest", method='POST',
+ body=body,
+ headers=[
+ (REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())),
+ ("Content-Type", "application/json"),
+ ('Content-Length', str(len(body)))
+ ])
+ Settings.logger.info("result: %s", result)
+ Settings.logger.info("body: %s", self.body)
+ self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
+ def test_zzz_policy_updates_and_catch_ups(self):
+ """test run policy handler with policy updates and catchups"""
+ Settings.logger.info("start policy_updates_and_catch_ups")
+ audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups",
+ req_message="start policy_updates_and_catch_ups")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep before send_notification...")
+ time.sleep(2)
+
+ MonkeyedWebSocket.send_notification([1, 3, 5])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ Settings.logger.info("sleep 30 before shutdown...")
+ time.sleep(30)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
+ def test_zzz_catch_up_on_deploy_handler_changed(self):
+ """test run policy handler with deployment-handler changed underneath"""
+ Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed")
+ audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed",
+ req_message="start zzz_catch_up_on_deploy_handler_changed")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep before send_notification...")
+ time.sleep(2)
+
+ MonkeyedWebSocket.send_notification([1])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ Settings.deploy_handler_instance_uuid = str(uuid.uuid4())
+ Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid)
+
+ MonkeyedWebSocket.send_notification([2, 4])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ Settings.logger.info("sleep 5 before shutdown...")
+ time.sleep(5)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ @pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
+ def test_zzz_get_catch_up(self):
+ """test /catch_up"""
+ Settings.logger.info("start /catch_up")
+ audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up")
+ PolicyReceiver.run(audit)
+ time.sleep(5)
+ result = self.getPage("/catch_up")
+ Settings.logger.info("catch_up result: %s", result)
+ self.assertStatus('200 OK')
+ Settings.logger.info("got catch_up: %s", self.body)
+
+ Settings.logger.info("sleep 5 before shutdown...")
+ time.sleep(5)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ @pytest.mark.usefixtures(
+ "fix_deploy_handler",
+ "fix_policy_receiver_websocket",
+ "fix_cherrypy_engine_exit")
+ def test_zzzzz_shutdown(self):
+ """test shutdown"""
+ Settings.logger.info("start shutdown")
+ audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep before send_notification...")
+ time.sleep(2)
+
+ MonkeyedWebSocket.send_notification([1, 3, 5])
+ Settings.logger.info("sleep after send_notification...")
+ time.sleep(3)
+
+ result = self.getPage("/healthcheck")
+ Settings.logger.info("healthcheck result: %s", result)
+
+ WebServerInternalBoomTest.do_gc_test = False
+ Settings.logger.info("shutdown...")
+ result = self.getPage("/shutdown")
+ Settings.logger.info("shutdown result: %s", result)
+ self.assertStatus('200 OK')
+ Settings.logger.info("got shutdown: %s", self.body)
+ time.sleep(1)
+
+
+@pytest.mark.usefixtures(
+ "fix_pdp_post_big",
+ "fix_deploy_handler_fail",
+ "fix_policy_receiver_websocket"
+)
+def test_catch_ups_failed_dh():
+ """test run policy handler with catchups and failed deployment-handler"""
+ Settings.logger.info("start test_catch_ups_failed_dh")
+ audit = Audit(job_name="test_catch_ups_failed_dh",
+ req_message="start test_catch_ups_failed_dh")
+ PolicyReceiver.run(audit)
+
+ Settings.logger.info("sleep 50 before shutdown...")
+ time.sleep(50)
+
+ health = audit.health(full=True)
+ audit.audit_done(result=json.dumps(health))
+
+ Settings.logger.info("healthcheck: %s", json.dumps(health))
+ assert bool(health)
+
+ PolicyReceiver.shutdown(audit)
+ time.sleep(1)
+
+ health = audit.health(full=True)
+ Settings.logger.info("healthcheck: %s", json.dumps(health))
diff --git a/tests/test_step_timer.py b/tests/test_step_timer.py
new file mode 100644
index 0000000..ff89388
--- /dev/null
+++ b/tests/test_step_timer.py
@@ -0,0 +1,209 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""test of the step_timer"""
+
+import json
+import logging
+import time
+from datetime import datetime
+
+from policyhandler.config import Config
+from policyhandler.step_timer import StepTimer
+
+Config.load_from_file()
+
+
+class MockTimerController(object):
+ """testing step_timer"""
+ logger = logging.getLogger("policy_handler.unit_test.step_timer")
+
+ INIT = "init"
+ NEXT = "next"
+ STARTED = "started"
+ PAUSED = "paused"
+ STOPPING = "stopping"
+ STOPPED = "stopped"
+
+ def __init__(self, name, interval):
+ """step_timer test settings"""
+ self.name = name or "step_timer"
+ self.interval = interval or 5
+ self.step_timer = None
+ self.status = None
+ self.run_counter = 0
+ self.status_ts = datetime.utcnow()
+ self.exe_ts = None
+ self.exe_interval = None
+ self.set_status(MockTimerController.INIT)
+
+ def __enter__(self):
+ """constructor"""
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """destructor"""
+ self.stop_timer()
+
+ def on_time(self, *args, **kwargs):
+ """timer event"""
+ self.exe_ts = datetime.utcnow()
+ self.exe_interval = (self.exe_ts - self.status_ts).total_seconds()
+ MockTimerController.logger.info("run on_time[%s] (%s, %s) in %s for %s",
+ self.run_counter, json.dumps(args), json.dumps(kwargs),
+ self.exe_interval, self.get_status())
+ time.sleep(3)
+ MockTimerController.logger.info("done on_time[%s] (%s, %s) in %s for %s",
+ self.run_counter, json.dumps(args), json.dumps(kwargs),
+ self.exe_interval, self.get_status())
+
+ def verify_last_event(self):
+ """assertions needs to be in the main thread"""
+ if self.exe_interval is None:
+ MockTimerController.logger.info("not executed: %s", self.get_status())
+ return
+
+ MockTimerController.logger.info("verify exe %s for %s",
+ self.exe_interval, self.get_status())
+ assert self.exe_interval >= (self.interval - 0.01)
+ assert self.exe_interval < 2 * self.interval
+ MockTimerController.logger.info("success %s", self.get_status())
+
+ def run_timer(self):
+ """create and start the step_timer"""
+ if self.step_timer:
+ self.step_timer.next()
+ self.set_status(MockTimerController.NEXT)
+ return
+
+ self.step_timer = StepTimer(
+ self.name, self.interval, MockTimerController.on_time,
+ MockTimerController.logger,
+ self
+ )
+ self.step_timer.start()
+ self.set_status(MockTimerController.STARTED)
+
+ def pause_timer(self):
+ """pause step_timer"""
+ if self.step_timer:
+ self.step_timer.pause()
+ self.set_status(MockTimerController.PAUSED)
+
+ def stop_timer(self):
+ """stop step_timer"""
+ if self.step_timer:
+ self.set_status(MockTimerController.STOPPING)
+ self.step_timer.stop()
+ self.step_timer.join()
+ self.step_timer = None
+ self.set_status(MockTimerController.STOPPED)
+
+ def set_status(self, status):
+ """set the status of the timer"""
+ if status in [MockTimerController.NEXT, MockTimerController.STARTED]:
+ self.run_counter += 1
+
+ self.status = status
+ utcnow = datetime.utcnow()
+ time_step = (utcnow - self.status_ts).total_seconds()
+ self.status_ts = utcnow
+ MockTimerController.logger.info("[%s]: %s", time_step, self.get_status())
+
+ def get_status(self):
+ """string representation"""
+ status = "{0}[{1}] {2} in {3} from {4} last exe {5}".format(
+ self.status, self.run_counter, self.name, self.interval,
+ str(self.status_ts), str(self.exe_ts)
+ )
+ if self.step_timer:
+ return "{0}: {1}".format(status, self.step_timer.get_timer_status())
+ return status
+
+
+def test_step_timer():
+ """test step_timer"""
+ MockTimerController.logger.info("============ test_step_timer =========")
+ with MockTimerController("step_timer", 5) as step_timer:
+ step_timer.run_timer()
+ time.sleep(1)
+ step_timer.verify_last_event()
+
+ time.sleep(1 + step_timer.interval)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(3 * step_timer.interval)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(3 * step_timer.interval)
+ step_timer.verify_last_event()
+
+
+def test_interrupt_step_timer():
+ """test step_timer"""
+ MockTimerController.logger.info("============ test_interrupt_step_timer =========")
+ with MockTimerController("step_timer", 5) as step_timer:
+ step_timer.run_timer()
+ time.sleep(1)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2 + step_timer.interval)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(2 + step_timer.interval)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.pause_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(2)
+ step_timer.verify_last_event()
+
+ step_timer.run_timer()
+ time.sleep(3 * step_timer.interval)
+ step_timer.verify_last_event()
diff --git a/tests/test_zzz_memory.py b/tests/test_zzz_memory.py
new file mode 100644
index 0000000..e70cf59
--- /dev/null
+++ b/tests/test_zzz_memory.py
@@ -0,0 +1,117 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+"""test of the package for policy-handler of DCAE-Controller"""
+
+import gc
+import json
+import time
+
+from policyhandler.onap.audit import Audit, AuditHttpCode, Metrics
+
+from .mock_settings import Settings
+
+Settings.init()
+
+class Node(object):
+ """making the cycled objects"""
+ def __init__(self, name):
+ self.name = name
+ self.next = None
+ def __repr__(self):
+ return '%s(%s)' % (self.__class__.__name__, self.name)
+
+
+def test_healthcheck():
+ """test /healthcheck"""
+ audit = Audit(job_name="test_healthcheck",
+ req_message="get /healthcheck")
+ metrics = Metrics(aud_parent=audit, targetEntity="test_healthcheck")
+ metrics.metrics_start("test /healthcheck")
+ time.sleep(0.1)
+
+ metrics.metrics("test /healthcheck")
+ health = audit.health(full=True)
+ audit.audit_done(result=json.dumps(health))
+
+ Settings.logger.info("healthcheck: %s", json.dumps(health))
+ assert bool(health)
+
+
+def test_healthcheck_with_error():
+ """test /healthcheck"""
+ audit = Audit(job_name="test_healthcheck_with_error",
+ req_message="get /healthcheck")
+ metrics = Metrics(aud_parent=audit, targetEntity="test_healthcheck_with_error")
+ metrics.metrics_start("test /healthcheck")
+ time.sleep(0.2)
+ audit.error("error from test_healthcheck_with_error")
+ audit.fatal("fatal from test_healthcheck_with_error")
+ audit.debug("debug from test_healthcheck_with_error")
+ audit.warn("debug from test_healthcheck_with_error")
+ audit.info_requested("debug from test_healthcheck_with_error")
+ if audit.is_success():
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ metrics.metrics("test /healthcheck")
+
+ health = audit.health(full=True)
+ audit.audit_done(result=json.dumps(health))
+
+ Settings.logger.info("healthcheck: %s", json.dumps(health))
+ assert bool(health)
+
+
+def test_healthcheck_with_garbage():
+ """test /healthcheck"""
+ gc_found = gc.collect()
+ gc.set_debug(gc.DEBUG_LEAK)
+
+ node1 = Node("one")
+ node2 = Node("two")
+ node3 = Node("three")
+ node1.next = node2
+ node2.next = node3
+ node3.next = node1
+ node1 = node2 = node3 = None
+ gc_found = gc.collect()
+
+ audit = Audit(job_name="test_healthcheck_with_garbage",
+ req_message="get /test_healthcheck_with_garbage")
+ health = audit.health(full=True)
+ audit.audit_done(result=json.dumps(health))
+
+ Settings.logger.info("test_healthcheck_with_garbage[%s]: %s", gc_found, json.dumps(health))
+ assert bool(health)
+ assert bool(health.get("runtime", {}).get("gc", {}).get("gc_garbage"))
+
+ Settings.logger.info("clearing up garbage...")
+ for obj in gc.garbage:
+ if isinstance(obj, Node):
+ Settings.logger.info("in garbage: %s 0x%x", obj, id(obj))
+ obj.next = None
+
+ gc_found = gc.collect()
+ Settings.logger.info("after clear test_healthcheck_with_garbage[%s]: %s",
+ gc_found, json.dumps(audit.health(full=True)))
+
+ gc.set_debug(False)
+
+ gc_found = gc.collect()
+ Settings.logger.info("after turned off gc debug test_healthcheck_with_garbage[%s]: %s",
+ gc_found, json.dumps(audit.health(full=True)))