blob: 2a02dc8a45cb48f7ebc0afa941b58af1879781b8 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
import osdf.adapters.dcae.message_router as MR
import unittest
from osdf.operation.exceptions import MessageBusConfigurationException
from unittest.mock import patch
class TestMessageRouter(unittest.TestCase):
def test_valid_MR(self):
mr = MR.MessageRouterClient(dmaap_url="https://MYHOST:3905")
def test_valid_MR_with_base_urls(self):
base_urls = ["https://MYHOST1:3905/","https://MYHOST2:3905/"]
mr = MR.MessageRouterClient(mr_host_base_urls=base_urls, topic="MY-TOPIC")
def test_invalid_valid_MR_with_base_urls(self):
"""Topic missing"""
base_urls = ["https://MYHOST1:3905/","https://MYHOST2:3905/"]
try:
mr = MR.MessageRouterClient(mr_host_base_urls=base_urls)
except MessageBusConfigurationException:
return
raise Exception("Allows invalid MR configuration") # if it failed to error out
@patch('osdf.adapters.dcae.message_router.MessageRouterClient.http_request', return_value={})
def test_mr_http_request_mocked(self, http_request):
mr = MR.MessageRouterClient(dmaap_url="https://MYHOST:3905")
mr.http_request = http_request
assert mr.get() == {}
assert mr.post("Hello") == {}
def test_mr_http_request_non_existent_host(self):
mr = MR.MessageRouterClient(dmaap_url="https://MYHOST:3905")
try:
mr.get()
except:
return
raise Exception("Allows invalid host") # if it failed to error out
if __name__ == "__main__":
unittest.main()
|