From 131d4dda822b637e816ef225d8a4c9981ccf56a7 Mon Sep 17 00:00:00 2001 From: Bartosz Gardziejewski Date: Tue, 4 Aug 2020 14:29:10 +0200 Subject: Refactor DMaaP simulator and add tests. Issue-ID: DCAEGEN2-1771 Signed-off-by: Bartosz Gardziejewski Change-Id: I65772f9cdaf546171941253abdf3977b41a3e50e --- tests/dcaegen2/testcases/resources/DMaaP.py | 418 ---------------------------- 1 file changed, 418 deletions(-) delete mode 100644 tests/dcaegen2/testcases/resources/DMaaP.py (limited to 'tests/dcaegen2/testcases/resources/DMaaP.py') diff --git a/tests/dcaegen2/testcases/resources/DMaaP.py b/tests/dcaegen2/testcases/resources/DMaaP.py deleted file mode 100644 index 4c245614..00000000 --- a/tests/dcaegen2/testcases/resources/DMaaP.py +++ /dev/null @@ -1,418 +0,0 @@ -''' -Created on Aug 15, 2017 - -@author: sw6830 -''' -import os -import posixpath -import BaseHTTPServer -import urllib -import urlparse -import cgi -import sys -import shutil -import mimetypes -from jsonschema import validate -import jsonschema -import json -import DcaeVariables -import SimpleHTTPServer - -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO - -EvtSchema = None -DMaaPHttpd = None - - -def clean_up_event(): - sz = DcaeVariables.VESEventQ.qsize() - for i in range(sz): - try: - self.evtQueue.get_nowait() - except: - pass - - -def enque_event(evt): - if DcaeVariables.VESEventQ is not None: - try: - DcaeVariables.VESEventQ.put(evt) - return True - except Exception as e: - print (str(e)) - return False - return False - - -def deque_event(wait_sec=25): - if DcaeVariables.IsRobotRun: - pass - try: - evt = DcaeVariables.VESEventQ.get(True, wait_sec) - return evt - except Exception as e: - if DcaeVariables.IsRobotRun: - pass - - else: - print("DMaaP Event dequeue timeout") - return None - - -class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler): - - def do_PUT(self): - self.send_response(405) - return - - def do_POST(self): - resp_code = 0 - # Parse the form data posted - ''' - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={'REQUEST_METHOD':'POST', - 'CONTENT_TYPE':self.headers['Content-Type'], - }) - - - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={"REQUEST_METHOD": "POST"}) - - for item in form.list: - print "%s=%s" % (item.name, item.value) - - ''' - - if 'POST' not in self.requestline: - resp_code = 405 - - ''' - if resp_code == 0: - if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \ - '/eventlistener/v5/clientThrottlingState' not in self.requestline: - resp_code = 404 - - - if resp_code == 0: - if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers): - resp_code = 401 - ''' - - if resp_code == 0: - topic = self.extract_topic_from_path() - content_len = int(self.headers.getheader('content-length', 0)) - post_body = self.rfile.read(content_len) - - indx = post_body.index("{") - if indx != 0: - post_body = post_body[indx:] - - event = "\""+topic+"\":" + post_body - if not enque_event(event): - print "enque event fails" - - global EvtSchema - try: - if EvtSchema is None: - with open(DcaeVariables.CommonEventSchema) as opened_file: - EvtSchema = json.load(opened_file) - decoded_body = json.loads(post_body) - jsonschema.validate(decoded_body, EvtSchema) - except: - resp_code = 400 - - # Begin the response - if not DcaeVariables.IsRobotRun: - print ("Response Message:") - - ''' - { - "200" : { - "description" : "Success", - "schema" : { - "$ref" : "#/definitions/DR_Pub" - } - } - - rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}" - rspStr1 = "{'count': 1, 'serverTimeMs': 3}" - - ''' - - if resp_code == 0: - if 'clientThrottlingState' in self.requestline: - self.send_response(204) - else: - self.send_response(200) - self.send_header('Content-Type', 'application/json') - self.end_headers() - self.wfile.write("{'count': 1, 'serverTimeMs': 3}") - self.wfile.close() - else: - self.send_response(resp_code) - - ''' - self.end_headers() - self.wfile.write('Client: %s\n' % str(self.client_address)) - self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent'])) - self.wfile.write('Path: %s\n' % self.path) - self.wfile.write('Form data:\n') - self.wfile.close() - - # Echo back information about what was posted in the form - for field in form.keys(): - field_item = form[field] - if field_item.filename: - # The field contains an uploaded file - file_data = field_item.file.read() - file_len = len(file_data) - del file_data - self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \ - (field, field_item.filename, file_len)) - else: - # Regular form value - self.wfile.write('\t%s=%s\n' % (field, form[field].value)) - ''' - return - - def extract_topic_from_path(self): - return self.path["/events/".__len__():] - - def do_GET(self): - """Serve a GET request.""" - f = self.send_head() - if f: - try: - self.copyfile(f, self.wfile) - finally: - f.close() - - def do_HEAD(self): - """Serve a HEAD request.""" - f = self.send_head() - if f: - f.close() - - def send_head(self): - """Common code for GET and HEAD commands. - - This sends the response code and MIME headers. - - Return value is either a file object (which has to be copied - to the outputfile by the caller unless the command was HEAD, - and must be closed by the caller under all circumstances), or - None, in which case the caller has nothing further to do. - - """ - path = self.translate_path(self.path) - if os.path.isdir(path): - parts = urlparse.urlsplit(self.path) - if not parts.path.endswith('/'): - # redirect browser - doing basically what apache does - self.send_response(301) - new_parts = (parts[0], parts[1], parts[2] + '/', - parts[3], parts[4]) - new_url = urlparse.urlunsplit(new_parts) - self.send_header("Location", new_url) - self.end_headers() - return None - for index in "index.html", "index.htm": - index = os.path.join(path, index) - if os.path.exists(index): - path = index - break - else: - return self.list_directory(path) - ctype = self.guess_type(path) - try: - # Always read in binary mode. Opening files in text mode may cause - # newline translations, making the actual size of the content - # transmitted *less* than the content-length! - f = open(path, 'rb') - except IOError: - self.send_error(404, "File not found") - return None - try: - self.send_response(200) - self.send_header("Content-type", ctype) - fs = os.fstat(f.fileno()) - self.send_header("Content-Length", str(fs[6])) - self.send_header("Last-Modified", self.date_time_string(fs.st_mtime)) - self.end_headers() - return f - except: - f.close() - raise - - def list_directory(self, path): - """Helper to produce a directory listing (absent index.html). - - Return value is either a file object, or None (indicating an - error). In either case, the headers are sent, making the - interface the same as for send_head(). - - """ - try: - list_dir = os.listdir(path) - except os.error: - self.send_error(404, "No permission to list directory") - return None - list_dir.sort(key=lambda a: a.lower()) - f = StringIO() - displaypath = cgi.escape(urllib.unquote(self.path)) - f.write('') - f.write("\nDirectory listing for %s\n" % displaypath) - f.write("\n

Directory listing for %s

\n" % displaypath) - f.write("
\n\n
\n\n\n") - length = f.tell() - f.seek(0) - self.send_response(200) - encoding = sys.getfilesystemencoding() - self.send_header("Content-type", "text/html; charset=%s" % encoding) - self.send_header("Content-Length", str(length)) - self.end_headers() - return f - - @staticmethod - def translate_path(path): - """Translate a /-separated PATH to the local filename syntax. - - Components that mean special things to the local file system - (e.g. drive or directory names) are ignored. (XXX They should - probably be diagnosed.) - - """ - # abandon query parameters - path = path.split('?', 1)[0] - path = path.split('#', 1)[0] - # Don't forget explicit trailing slash when normalizing. Issue17324 - trailing_slash = path.rstrip().endswith('/') - path = posixpath.normpath(urllib.unquote(path)) - words = path.split('/') - words = filter(None, words) - path = os.getcwd() - for word in words: - if os.path.dirname(word) or word in (os.curdir, os.pardir): - # Ignore components that are not a simple file/directory name - continue - path = os.path.join(path, word) - if trailing_slash: - path += '/' - return path - - @staticmethod - def copyfile(source, outputfile): - """Copy all data between two file objects. - - The SOURCE argument is a file object open for reading - (or anything with a read() method) and the DESTINATION - argument is a file object open for writing (or - anything with a write() method). - - The only reason for overriding this would be to change - the block size or perhaps to replace newlines by CRLF - -- note however that this the default server uses this - to copy binary data as well. - - """ - shutil.copyfileobj(source, outputfile) - - def guess_type(self, path): - """Guess the type of a file. - - Argument is a PATH (a filename). - - Return value is a string of the form type/subtype, - usable for a MIME Content-type header. - - The default implementation looks the file's extension - up in the table self.extensions_map, using application/octet-stream - as a default; however it would be permissible (if - slow) to look inside the data to make a better guess. - - """ - - base, ext = posixpath.splitext(path) - if ext in self.extensions_map: - return self.extensions_map[ext] - ext = ext.lower() - if ext in self.extensions_map: - return self.extensions_map[ext] - else: - return self.extensions_map[''] - - if not mimetypes.inited: - mimetypes.init() # try to read system mime.types - extensions_map = mimetypes.types_map.copy() - extensions_map.update({ - '': 'application/octet-stream', # Default - '.py': 'text/plain', - '.c': 'text/plain', - '.h': 'text/plain', - }) - - -def test(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0", port=3904): - print "Load event schema file: " + DcaeVariables.CommonEventSchema - with open(DcaeVariables.CommonEventSchema) as opened_file: - global EvtSchema - EvtSchema = json.load(opened_file) - - server_address = ('', port) - - handler_class.protocol_version = protocol - httpd = server_class(server_address, handler_class) - - global DMaaPHttpd - DMaaPHttpd = httpd - DcaeVariables.HTTPD = httpd - - sa = httpd.socket.getsockname() - print "Serving HTTP on", sa[0], "port", sa[1], "..." - # httpd.serve_forever() - - -def _main_(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"): - - if sys.argv[1:]: - port = int(sys.argv[1]) - else: - port = 3904 - - print "Load event schema file: " + DcaeVariables.CommonEventSchema - with open(DcaeVariables.CommonEventSchema) as opened_file: - global EvtSchema - EvtSchema = json.load(opened_file) - - server_address = ('', port) - - handler_class.protocol_version = protocol - httpd = server_class(server_address, handler_class) - - sa = httpd.socket.getsockname() - print "Serving HTTP on", sa[0], "port", sa[1], "..." - httpd.serve_forever() - - -if __name__ == '__main__': - _main_() -- cgit 1.2.3-korg