aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator
diff options
context:
space:
mode:
Diffstat (limited to 'tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator')
-rw-r--r--tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPHandler.py73
-rw-r--r--tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPQueue.py48
-rw-r--r--tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPServer.py22
-rw-r--r--tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/__init__.py0
4 files changed, 143 insertions, 0 deletions
diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPHandler.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPHandler.py
new file mode 100644
index 00000000..f1c46e19
--- /dev/null
+++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPHandler.py
@@ -0,0 +1,73 @@
+'''
+Created on Aug 15, 2017
+
+@author: sw6830
+'''
+import os
+import posixpath
+import BaseHTTPServer
+import urllib
+import urlparse
+import cgi
+import sys
+import shutil
+import mimetypes
+from robot_library import DcaeVariables
+
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+
+class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+
+ def __init__(self, dmaap_simulator, *args):
+ self.dmaap_simulator = dmaap_simulator
+ BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args)
+
+ def do_POST(self):
+ if 'POST' not in self.requestline:
+ resp_code = 405
+ else:
+ resp_code = self.parse_the_posted_data()
+
+ if resp_code == 0:
+ self.send_successful_response()
+ else:
+ self.send_response(resp_code)
+
+ def parse_the_posted_data(self):
+ topic = self.extract_topic_from_path()
+ content_len = self.get_content_length()
+ post_body = self.rfile.read(content_len)
+ post_body = self.get_json_part_of_post_body(post_body)
+ event = "{\"" + topic + "\":" + post_body + "}"
+ if self.dmaap_simulator.enque_event(event):
+ resp_code = 0
+ else:
+ print "enque event fails"
+ resp_code = 500
+ return resp_code
+
+ def get_json_part_of_post_body(self, post_body):
+ indx = post_body.index("{")
+ if indx != 0:
+ post_body = post_body[indx:]
+ return post_body
+
+ def extract_topic_from_path(self):
+ return self.path["/events/".__len__():]
+
+ def get_content_length(self):
+ return int(self.headers.getheader('content-length', 0))
+
+ def send_successful_response(self):
+ if 'clientThrottlingState' in self.requestline:
+ self.send_response(204)
+ else:
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json')
+ self.end_headers()
+ self.wfile.write("{'count': 1, 'serverTimeMs': 3}")
+ self.wfile.close()
diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPQueue.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPQueue.py
new file mode 100644
index 00000000..3d3a81fa
--- /dev/null
+++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPQueue.py
@@ -0,0 +1,48 @@
+class DMaaPQueue(object):
+
+ def __init__(self, event_queue, wait_timeout_sec=25):
+ self.event_queue = event_queue
+ self.wait_timeout_sec = wait_timeout_sec
+
+ def set_deque_event_timeout(self, wait_timeout_sec):
+ self.wait_timeout_sec = wait_timeout_sec
+
+ def clean_up_event(self):
+ if self.queue_is_valid():
+ with self.event_queue.mutex:
+ try:
+ self.event_queue.queue.clear()
+ except:
+ pass
+
+ def enque_event(self, event):
+ event_placed_on_queue = False
+ if self.queue_is_valid():
+ event_placed_on_queue = self._enque_event(event, event_placed_on_queue)
+ return event_placed_on_queue
+
+ def _enque_event(self, event, event_placed_on_queue):
+ try:
+ self.event_queue.put(event)
+ event_placed_on_queue = True
+ except Exception as e:
+ print (str(e))
+ return event_placed_on_queue
+
+ def deque_event(self, wait_sec=None):
+ if wait_sec is None:
+ wait_sec = self.wait_timeout_sec
+ event_from_queue = None
+ if self.queue_is_valid():
+ event_from_queue = self._deque_event(event_from_queue, wait_sec)
+ return event_from_queue
+
+ def _deque_event(self, event_from_queue, wait_sec):
+ try:
+ event_from_queue = self.event_queue.get(True, wait_sec)
+ except Exception as e:
+ print("DMaaP Event dequeue timeout")
+ return event_from_queue
+
+ def queue_is_valid(self):
+ return self.event_queue is not None
diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPServer.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPServer.py
new file mode 100644
index 00000000..37499be8
--- /dev/null
+++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPServer.py
@@ -0,0 +1,22 @@
+import BaseHTTPServer
+import DMaaPHandler
+
+
+class DMaaPServer(BaseHTTPServer.HTTPServer):
+
+ def __init__(self, server_address, protocol, dmaap_simulator):
+
+ def handler_class_constructor(*args):
+ DMaaPHandler.DMaaPHandler(dmaap_simulator, *args)
+ DMaaPHandler.protocol_version = protocol
+ BaseHTTPServer.HTTPServer.__init__(self, server_address, handler_class_constructor)
+
+ serer_address = self.socket.getsockname()
+ print "Serving HTTP on", serer_address[0], "port", serer_address[1], "..."
+
+
+def create_dmaap_server(dmaap_simulator, protocol="HTTP/1.0", port=3904):
+ server_address = ('', port)
+ httpd = DMaaPServer(server_address, protocol, dmaap_simulator)
+
+ return httpd
diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/__init__.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/__init__.py