aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/test_DMaaPSimulator.py
blob: 82f95ff86840fb18cb66c979da4748d275f1ba22 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import sys
import pytest
from mock import MagicMock

sys.modules['robot'] = MagicMock()
sys.modules['robot.api'] = MagicMock()
sys.modules['robot.api.logger'] = MagicMock()
from robot_library.DmaapLibrary import DmaapLibrary

wait_sec_for_dequeing_event = 0.1
test_event = "{\"test\":\"123\"}"
test_topic = "topic"
test_message = "\"" + test_topic + "\":" + test_event


class TestDMaaPSimulator:

    @pytest.fixture(autouse=True, scope="class")
    def initiate_dmaap_simulator(self):
        DmaapLibrary.setup_dmaap_server()
        DmaapLibrary.dmaap_queue.set_deque_event_timeout(wait_sec_for_dequeing_event)
        yield
        assert DmaapLibrary.shutdown_dmaap() == "true"

    @pytest.fixture(autouse=True, scope="function")
    def clear_dmaap_simulator(self):
        yield
        DmaapLibrary.cleanup_ves_events()

    def test_start_stop_dmaap_server(self):
        # when / then
        assert DmaapLibrary.dmaap_queue is not None
        assert DmaapLibrary.dmaap_server is not None
        assert DmaapLibrary.server_thread is not None

    def test_dmaap_server_returns_true_when_event_is_present_on_queue(self):
        # when
        DmaapLibrary.dmaap_queue.enque_event(test_message)

        # then
        assert DmaapLibrary.dmaap_message_receive(test_event) == 'true'

    def test_dmaap_server_returns_true_when_event_is_present_on_given_topic_on_queue(self):
        # when
        DmaapLibrary.dmaap_queue.enque_event(test_message)

        # then
        assert DmaapLibrary.dmaap_message_receive_on_topic(test_event, test_topic) == 'true'

    def test_dmaap_server_returns_timeout_when_event_is_not_present_on_queue(self):
        # when / then
        assert DmaapLibrary.dmaap_message_receive(test_event) == 'false'

    def test_dmaap_server_returns_false_when_queue_was_cleared(self):
        # when
        DmaapLibrary.dmaap_queue.enque_event(test_message)
        DmaapLibrary.dmaap_queue.enque_event(test_message)
        DmaapLibrary.dmaap_queue.enque_event(test_message)
        DmaapLibrary.cleanup_ves_events()

        # then
        assert DmaapLibrary.dmaap_message_receive_on_topic(test_event, test_topic) == 'false'