aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2/bbs-testcases/resources/simulator/DMaaP.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/dcaegen2/bbs-testcases/resources/simulator/DMaaP.py')
-rw-r--r--tests/dcaegen2/bbs-testcases/resources/simulator/DMaaP.py92
1 files changed, 92 insertions, 0 deletions
diff --git a/tests/dcaegen2/bbs-testcases/resources/simulator/DMaaP.py b/tests/dcaegen2/bbs-testcases/resources/simulator/DMaaP.py
new file mode 100644
index 00000000..edeec8cf
--- /dev/null
+++ b/tests/dcaegen2/bbs-testcases/resources/simulator/DMaaP.py
@@ -0,0 +1,92 @@
+import re
+import time
+from http.server import BaseHTTPRequestHandler
+import httpServerLib
+
+
+posted_event_from_bbs = b'[]'
+received_event_to_get_method = b'[]'
+
+
+class DmaapSetup(BaseHTTPRequestHandler):
+
+ """
+ This Handler is used by the test harness to prepare the buffers:
+ test harness places VES events using PUT from the harness into the
+ "received_event buffer"
+ test harness will write policy topics from the BBS us to the posted event buffer
+ """
+ def do_PUT(self):
+ # Read the event from the test harness place it in the received event buffer
+ if re.search('/set_get_event', self.path):
+ content_length = int(self.headers['Content-Length'])
+ global received_event_to_get_method
+ received_event_to_get_method = self.rfile.read(content_length)
+ httpServerLib.header_200_and_json(self)
+
+ return
+
+ def do_GET(self):
+ # The test harness receives the policy triggers from the posted event
+ # by issuing a get and receiving the response.
+ if re.search('/events/dcaeClOutput', self.path):
+ global posted_event_from_bbs
+ httpServerLib.header_200_and_json(self)
+ self.wfile.write(posted_event_from_bbs)
+
+ return
+
+ def do_POST(self):
+ if re.search('/reset', self.path):
+ global posted_event_from_bbs
+ global received_event_to_get_method
+ posted_event_from_bbs = b'[]'
+ received_event_to_get_method = b'[]'
+ httpServerLib.header_200_and_json(self)
+
+ return
+
+
+class DMaaPHandler(BaseHTTPRequestHandler):
+ """
+ This Handler is what the BBS uS connects to - The test library has posted the
+ the VES events in the setup Handler which are then received by the BBS uS via
+ this handler's do_GET function.
+ Likewise the policy trigger posted by the BBS uS is received and placed in the
+ in the posted event buffer which the test harness retrieves using the setup handler.
+ """
+
+ def do_POST(self):
+ # Post of the policy triggers from the BBS uS
+ if re.search('/events/unauthenticated.DCAE_CL_OUTPUT', self.path):
+ global posted_event_from_bbs
+ content_length = int(self.headers['Content-Length'])
+ posted_event_from_bbs = self.rfile.read(content_length)
+ httpServerLib.header_200_and_json(self)
+
+ return
+
+ def do_GET(self):
+ # BBS uS issues a Get to receive VES and PNF UPdate event from DMAAP
+ global received_event_to_get_method
+ if re.search('/events/unauthenticated.PNF_UPDATE', self.path):
+ httpServerLib.header_200_and_json(self)
+ self.wfile.write(received_event_to_get_method)
+ elif re.search('/events/unauthenticated_CPE_AUTHENTICATION/OpenDcae-c12/c12', self.path):
+ httpServerLib.header_200_and_json(self)
+ self.wfile.write(received_event_to_get_method)
+
+ return
+
+
+def _main_(handler_class=DMaaPHandler, protocol="HTTP/1.0"):
+ handler_class.protocol_version = protocol
+ httpServerLib.start_http_endpoint(2222, DMaaPHandler)
+ httpServerLib.start_https_endpoint(2223, DMaaPHandler, keyfile="certs/org.onap.dmaap-bc.key", certfile="certs/dmaap_bc_topic_mgr_dmaap_bc.onap.org.cer", ca_certs="certs/ca_local_0.cer")
+ httpServerLib.start_http_endpoint(2224, DmaapSetup)
+ while 1:
+ time.sleep(10)
+
+
+if __name__ == '__main__':
+ _main_()