aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_binding.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_binding.py')
-rw-r--r--tests/test_binding.py70
1 files changed, 65 insertions, 5 deletions
diff --git a/tests/test_binding.py b/tests/test_binding.py
index 60c0809..0e5d05a 100644
--- a/tests/test_binding.py
+++ b/tests/test_binding.py
@@ -1,6 +1,4 @@
# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,6 +22,10 @@ import json
from requests.exceptions import HTTPError, RequestException
from requests import Response
+#####
+# MONKEYPATCHES
+#####
+
def monkeyed_get_connection_info_from_consul(service_component_name):
#shared monkeypatch. probably somewhat lazy because the function htis patches can be broken up.
if service_component_name == "cdap":
@@ -46,7 +48,7 @@ class FakeResponse():
self.text = text
self.status_code = status_code
-def monkeyed_consul_get_key(k):
+def monkeyed_consul_get(k):
if k == "dti_exists_test:dti":
return {"foo" : "bar"}
elif k == "dti_NOTexists_test:dti":
@@ -55,9 +57,40 @@ def monkeyed_consul_get_key(k):
return {"foo2" : "bar2"}
elif k == "policies_NOTexists_test:policies":
raise HTTPError(response = FakeResponse(text= "", status_code = 404))
+ else:
+ raise Exception("BLOW UP IN FIRE")
+
+def monkeyed_resolve(k):
+ if k == "scn_NOTexists":
+ raise client.CantGetConfig(code = 404, response = "")
+ elif k == "scn_exists":
+ return {"foo3" : "bar3"}
+ elif k == "scn_exists:rel":
+ return ["foo"]
+ elif k == "scn_exists:dmaap":
+ return {"foo4" : "bar4"}
+
+ elif k == "scn_exists_nord":
+ return {"foo5" : "bar5"}
+ elif k == "scn_exists_nord:rel":
+ raise HTTPError(response = FakeResponse(text= "", status_code = 404))
+ elif k == "scn_exists_nord:dmaap":
+ raise HTTPError(response = FakeResponse(text= "", status_code = 404))
+
+ else:
+ raise Exception("BLOW UP IN FIRE")
-def test_dti_policies(monkeypatch):
- monkeypatch.setattr('config_binding_service.client._consul_get_key', monkeyed_consul_get_key)
+#######
+# TESTS
+#######
+
+def test_get_config_rels_dmaap(monkeypatch):
+ monkeypatch.setattr('config_binding_service.client._consul_get_key', monkeyed_resolve)
+ assert ({"foo3" : "bar3"}, ["foo"], {"foo4" : "bar4"}) == client._get_config_rels_dmaap("scn_exists")
+ assert ({"foo5" : "bar5"}, [], {}) == client._get_config_rels_dmaap("scn_exists_nord")
+
+def test_dti(monkeypatch):
+ monkeypatch.setattr('config_binding_service.client._consul_get_key', monkeyed_consul_get)
assert client.resolve_DTI("dti_exists_test") == {"foo" : "bar"}
with pytest.raises(client.CantGetConfig):
@@ -70,6 +103,13 @@ def test_dti_policies(monkeypatch):
R = controller.dtievents("dti_NOTexists_test")
assert(R.status_code == 404)
+ R = controller.dtievents("asdfasdf")
+ assert(R.status_code == 500)
+
+
+def test_policies(monkeypatch):
+ monkeypatch.setattr('config_binding_service.client._consul_get_key', monkeyed_consul_get)
+
assert client.resolve_policies("policies_exists_test") == {"foo2" : "bar2"}
with pytest.raises(client.CantGetConfig):
client.resolve_policies("policies_NOTexists_test")
@@ -81,6 +121,26 @@ def test_dti_policies(monkeypatch):
R = controller.policies("policies_NOTexists_test")
assert(R.status_code == 404)
+ R = controller.policies("asdfasdf")
+ assert(R.status_code == 500)
+
+def test_bind_config_for_scn(monkeypatch):
+ monkeypatch.setattr('config_binding_service.client.resolve', monkeyed_resolve)
+
+ assert(client.resolve("scn_exists") == {"foo3" : "bar3"})
+ with pytest.raises(client.CantGetConfig):
+ client.resolve("scn_NOTexists")
+
+ R = controller.bind_config_for_scn("scn_exists")
+ assert(json.loads(R.data) == {"foo3" : "bar3"})
+ assert(R.status_code == 200)
+
+ R = controller.bind_config_for_scn("scn_NOTexists")
+ assert(R.status_code == 404)
+
+ R = controller.bind_config_for_scn("asdfasdf")
+ assert(R.status_code == 500)
+
def test_bad_config_http():
test_config = {'yeahhhhh' : "{{}}"}
test_rels = ["testing_bravo.somedomain.com"]