aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2/testcases/resources/robot_library/DmaapLibrary.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/dcaegen2/testcases/resources/robot_library/DmaapLibrary.py')
-rw-r--r--tests/dcaegen2/testcases/resources/robot_library/DmaapLibrary.py79
1 files changed, 79 insertions, 0 deletions
diff --git a/tests/dcaegen2/testcases/resources/robot_library/DmaapLibrary.py b/tests/dcaegen2/testcases/resources/robot_library/DmaapLibrary.py
new file mode 100644
index 00000000..c9a0ff7b
--- /dev/null
+++ b/tests/dcaegen2/testcases/resources/robot_library/DmaapLibrary.py
@@ -0,0 +1,79 @@
+from Queue import Queue
+
+import robot.api.logger as logger
+import threading
+import time
+
+import DcaeVariables
+from robot_library.dmaap_simulator import DMaaPServer
+from robot_library.dmaap_simulator.DMaaPQueue import DMaaPQueue
+
+class DmaapLibrary(object):
+
+ dmaap_queue = None
+ dmaap_server = None
+ server_thread = None
+
+ def __init__(self):
+ pass
+
+ @staticmethod
+ def setup_dmaap_server(port_num=3904):
+ try:
+ DmaapLibrary.start_dmaap_server_on_new_thread(port_num)
+ return "true"
+ except Exception as e:
+ print (str(e))
+ return "false"
+
+ @staticmethod
+ def start_dmaap_server_on_new_thread(port_num):
+ DmaapLibrary.dmaap_queue = DMaaPQueue(Queue())
+ DmaapLibrary.dmaap_server = DMaaPServer.create_dmaap_server(DmaapLibrary.dmaap_queue, port=port_num)
+ DmaapLibrary.server_thread = threading.Thread(name='DMAAP_HTTPServer',
+ target=DmaapLibrary.dmaap_server.serve_forever)
+ DmaapLibrary.server_thread.start()
+ logger.console("DMaaP Mockup Sever started")
+ DcaeVariables.IsRobotRun = True
+ time.sleep(2)
+
+ @staticmethod
+ def shutdown_dmaap():
+ if DmaapLibrary.dmaap_server is not None:
+ DmaapLibrary.dmaap_server.shutdown()
+ logger.console("DMaaP Server shut down")
+ time.sleep(3)
+ return "true"
+ else:
+ return "false"
+
+ @staticmethod
+ def cleanup_ves_events():
+ if DmaapLibrary.server_thread is not None:
+ DmaapLibrary.dmaap_queue.clean_up_event()
+ logger.console("DMaaP event queue is cleaned up")
+ return "true"
+ logger.console("DMaaP server not started yet")
+ return "false"
+
+ @staticmethod
+ def dmaap_message_receive_on_topic(evtobj, topic):
+
+ evt_str = DmaapLibrary.dmaap_queue.deque_event()
+ while evt_str != None:
+ if evtobj in evt_str and topic in evt_str:
+ logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
+ logger.info("On Expected Topic:\n" + topic)
+ return 'true'
+ evt_str = DmaapLibrary.dmaap_queue.deque_event()
+ return 'false'
+
+ @staticmethod
+ def dmaap_message_receive(evtobj):
+ evt_str = DmaapLibrary.dmaap_queue.deque_event()
+ while evt_str != None:
+ if evtobj in evt_str:
+ logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
+ return 'true'
+ evt_str = DmaapLibrary.dmaap_queue.deque_event()
+ return 'false'