aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPQueue.py
blob: 3d3a81fa1b397ef960dc66d7455b59a266d9c2a6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class DMaaPQueue(object):

    def __init__(self, event_queue, wait_timeout_sec=25):
        self.event_queue = event_queue
        self.wait_timeout_sec = wait_timeout_sec

    def set_deque_event_timeout(self, wait_timeout_sec):
        self.wait_timeout_sec = wait_timeout_sec

    def clean_up_event(self):
        if self.queue_is_valid():
            with self.event_queue.mutex:
                try:
                    self.event_queue.queue.clear()
                except:
                    pass

    def enque_event(self, event):
        event_placed_on_queue = False
        if self.queue_is_valid():
            event_placed_on_queue = self._enque_event(event, event_placed_on_queue)
        return event_placed_on_queue

    def _enque_event(self, event, event_placed_on_queue):
        try:
            self.event_queue.put(event)
            event_placed_on_queue = True
        except Exception as e:
            print (str(e))
        return event_placed_on_queue

    def deque_event(self, wait_sec=None):
        if wait_sec is None:
            wait_sec = self.wait_timeout_sec
        event_from_queue = None
        if self.queue_is_valid():
            event_from_queue = self._deque_event(event_from_queue, wait_sec)
        return event_from_queue

    def _deque_event(self, event_from_queue, wait_sec):
        try:
            event_from_queue = self.event_queue.get(True, wait_sec)
        except Exception as e:
            print("DMaaP Event dequeue timeout")
        return event_from_queue

    def queue_is_valid(self):
        return self.event_queue is not None