diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_binding.py | 70 |
1 files changed, 65 insertions, 5 deletions
diff --git a/tests/test_binding.py b/tests/test_binding.py index 60c0809..0e5d05a 100644 --- a/tests/test_binding.py +++ b/tests/test_binding.py @@ -1,6 +1,4 @@ # ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,6 +22,10 @@ import json from requests.exceptions import HTTPError, RequestException from requests import Response +##### +# MONKEYPATCHES +##### + def monkeyed_get_connection_info_from_consul(service_component_name): #shared monkeypatch. probably somewhat lazy because the function htis patches can be broken up. if service_component_name == "cdap": @@ -46,7 +48,7 @@ class FakeResponse(): self.text = text self.status_code = status_code -def monkeyed_consul_get_key(k): +def monkeyed_consul_get(k): if k == "dti_exists_test:dti": return {"foo" : "bar"} elif k == "dti_NOTexists_test:dti": @@ -55,9 +57,40 @@ def monkeyed_consul_get_key(k): return {"foo2" : "bar2"} elif k == "policies_NOTexists_test:policies": raise HTTPError(response = FakeResponse(text= "", status_code = 404)) + else: + raise Exception("BLOW UP IN FIRE") + +def monkeyed_resolve(k): + if k == "scn_NOTexists": + raise client.CantGetConfig(code = 404, response = "") + elif k == "scn_exists": + return {"foo3" : "bar3"} + elif k == "scn_exists:rel": + return ["foo"] + elif k == "scn_exists:dmaap": + return {"foo4" : "bar4"} + + elif k == "scn_exists_nord": + return {"foo5" : "bar5"} + elif k == "scn_exists_nord:rel": + raise HTTPError(response = FakeResponse(text= "", status_code = 404)) + elif k == "scn_exists_nord:dmaap": + raise HTTPError(response = FakeResponse(text= "", status_code = 404)) + + else: + raise Exception("BLOW UP IN FIRE") -def test_dti_policies(monkeypatch): - monkeypatch.setattr('config_binding_service.client._consul_get_key', monkeyed_consul_get_key) +####### +# TESTS +####### + +def test_get_config_rels_dmaap(monkeypatch): + monkeypatch.setattr('config_binding_service.client._consul_get_key', monkeyed_resolve) + assert ({"foo3" : "bar3"}, ["foo"], {"foo4" : "bar4"}) == client._get_config_rels_dmaap("scn_exists") + assert ({"foo5" : "bar5"}, [], {}) == client._get_config_rels_dmaap("scn_exists_nord") + +def test_dti(monkeypatch): + monkeypatch.setattr('config_binding_service.client._consul_get_key', monkeyed_consul_get) assert client.resolve_DTI("dti_exists_test") == {"foo" : "bar"} with pytest.raises(client.CantGetConfig): @@ -70,6 +103,13 @@ def test_dti_policies(monkeypatch): R = controller.dtievents("dti_NOTexists_test") assert(R.status_code == 404) + R = controller.dtievents("asdfasdf") + assert(R.status_code == 500) + + +def test_policies(monkeypatch): + monkeypatch.setattr('config_binding_service.client._consul_get_key', monkeyed_consul_get) + assert client.resolve_policies("policies_exists_test") == {"foo2" : "bar2"} with pytest.raises(client.CantGetConfig): client.resolve_policies("policies_NOTexists_test") @@ -81,6 +121,26 @@ def test_dti_policies(monkeypatch): R = controller.policies("policies_NOTexists_test") assert(R.status_code == 404) + R = controller.policies("asdfasdf") + assert(R.status_code == 500) + +def test_bind_config_for_scn(monkeypatch): + monkeypatch.setattr('config_binding_service.client.resolve', monkeyed_resolve) + + assert(client.resolve("scn_exists") == {"foo3" : "bar3"}) + with pytest.raises(client.CantGetConfig): + client.resolve("scn_NOTexists") + + R = controller.bind_config_for_scn("scn_exists") + assert(json.loads(R.data) == {"foo3" : "bar3"}) + assert(R.status_code == 200) + + R = controller.bind_config_for_scn("scn_NOTexists") + assert(R.status_code == 404) + + R = controller.bind_config_for_scn("asdfasdf") + assert(R.status_code == 500) + def test_bad_config_http(): test_config = {'yeahhhhh' : "{{}}"} test_rels = ["testing_bravo.somedomain.com"] |