diff options
author | Bartosz Gardziejewski <bartosz.gardziejewski@nokia.com> | 2020-08-04 14:29:10 +0200 |
---|---|---|
committer | Bartosz Gardziejewski <bartosz.gardziejewski@nokia.com> | 2020-08-12 10:59:08 +0200 |
commit | 131d4dda822b637e816ef225d8a4c9981ccf56a7 (patch) | |
tree | a4ea4f3b350276b1da45033618f7ff7b74481a00 | |
parent | bbd96bd93c8d3c773aee53a69c44b3ac9cc3696d (diff) |
Refactor DMaaP simulator and add tests.
Issue-ID: DCAEGEN2-1771
Signed-off-by: Bartosz Gardziejewski <bartosz.gardziejewski@nokia.com>
Change-Id: I65772f9cdaf546171941253abdf3977b41a3e50e
17 files changed, 490 insertions, 610 deletions
diff --git a/tests/dcaegen2/testcases/resources/DMaaP.py b/tests/dcaegen2/testcases/resources/DMaaP.py deleted file mode 100644 index 4c245614..00000000 --- a/tests/dcaegen2/testcases/resources/DMaaP.py +++ /dev/null @@ -1,418 +0,0 @@ -''' -Created on Aug 15, 2017 - -@author: sw6830 -''' -import os -import posixpath -import BaseHTTPServer -import urllib -import urlparse -import cgi -import sys -import shutil -import mimetypes -from jsonschema import validate -import jsonschema -import json -import DcaeVariables -import SimpleHTTPServer - -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO - -EvtSchema = None -DMaaPHttpd = None - - -def clean_up_event(): - sz = DcaeVariables.VESEventQ.qsize() - for i in range(sz): - try: - self.evtQueue.get_nowait() - except: - pass - - -def enque_event(evt): - if DcaeVariables.VESEventQ is not None: - try: - DcaeVariables.VESEventQ.put(evt) - return True - except Exception as e: - print (str(e)) - return False - return False - - -def deque_event(wait_sec=25): - if DcaeVariables.IsRobotRun: - pass - try: - evt = DcaeVariables.VESEventQ.get(True, wait_sec) - return evt - except Exception as e: - if DcaeVariables.IsRobotRun: - pass - - else: - print("DMaaP Event dequeue timeout") - return None - - -class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler): - - def do_PUT(self): - self.send_response(405) - return - - def do_POST(self): - resp_code = 0 - # Parse the form data posted - ''' - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={'REQUEST_METHOD':'POST', - 'CONTENT_TYPE':self.headers['Content-Type'], - }) - - - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={"REQUEST_METHOD": "POST"}) - - for item in form.list: - print "%s=%s" % (item.name, item.value) - - ''' - - if 'POST' not in self.requestline: - resp_code = 405 - - ''' - if resp_code == 0: - if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \ - '/eventlistener/v5/clientThrottlingState' not in self.requestline: - resp_code = 404 - - - if resp_code == 0: - if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers): - resp_code = 401 - ''' - - if resp_code == 0: - topic = self.extract_topic_from_path() - content_len = int(self.headers.getheader('content-length', 0)) - post_body = self.rfile.read(content_len) - - indx = post_body.index("{") - if indx != 0: - post_body = post_body[indx:] - - event = "\""+topic+"\":" + post_body - if not enque_event(event): - print "enque event fails" - - global EvtSchema - try: - if EvtSchema is None: - with open(DcaeVariables.CommonEventSchema) as opened_file: - EvtSchema = json.load(opened_file) - decoded_body = json.loads(post_body) - jsonschema.validate(decoded_body, EvtSchema) - except: - resp_code = 400 - - # Begin the response - if not DcaeVariables.IsRobotRun: - print ("Response Message:") - - ''' - { - "200" : { - "description" : "Success", - "schema" : { - "$ref" : "#/definitions/DR_Pub" - } - } - - rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}" - rspStr1 = "{'count': 1, 'serverTimeMs': 3}" - - ''' - - if resp_code == 0: - if 'clientThrottlingState' in self.requestline: - self.send_response(204) - else: - self.send_response(200) - self.send_header('Content-Type', 'application/json') - self.end_headers() - self.wfile.write("{'count': 1, 'serverTimeMs': 3}") - self.wfile.close() - else: - self.send_response(resp_code) - - ''' - self.end_headers() - self.wfile.write('Client: %s\n' % str(self.client_address)) - self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent'])) - self.wfile.write('Path: %s\n' % self.path) - self.wfile.write('Form data:\n') - self.wfile.close() - - # Echo back information about what was posted in the form - for field in form.keys(): - field_item = form[field] - if field_item.filename: - # The field contains an uploaded file - file_data = field_item.file.read() - file_len = len(file_data) - del file_data - self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \ - (field, field_item.filename, file_len)) - else: - # Regular form value - self.wfile.write('\t%s=%s\n' % (field, form[field].value)) - ''' - return - - def extract_topic_from_path(self): - return self.path["/events/".__len__():] - - def do_GET(self): - """Serve a GET request.""" - f = self.send_head() - if f: - try: - self.copyfile(f, self.wfile) - finally: - f.close() - - def do_HEAD(self): - """Serve a HEAD request.""" - f = self.send_head() - if f: - f.close() - - def send_head(self): - """Common code for GET and HEAD commands. - - This sends the response code and MIME headers. - - Return value is either a file object (which has to be copied - to the outputfile by the caller unless the command was HEAD, - and must be closed by the caller under all circumstances), or - None, in which case the caller has nothing further to do. - - """ - path = self.translate_path(self.path) - if os.path.isdir(path): - parts = urlparse.urlsplit(self.path) - if not parts.path.endswith('/'): - # redirect browser - doing basically what apache does - self.send_response(301) - new_parts = (parts[0], parts[1], parts[2] + '/', - parts[3], parts[4]) - new_url = urlparse.urlunsplit(new_parts) - self.send_header("Location", new_url) - self.end_headers() - return None - for index in "index.html", "index.htm": - index = os.path.join(path, index) - if os.path.exists(index): - path = index - break - else: - return self.list_directory(path) - ctype = self.guess_type(path) - try: - # Always read in binary mode. Opening files in text mode may cause - # newline translations, making the actual size of the content - # transmitted *less* than the content-length! - f = open(path, 'rb') - except IOError: - self.send_error(404, "File not found") - return None - try: - self.send_response(200) - self.send_header("Content-type", ctype) - fs = os.fstat(f.fileno()) - self.send_header("Content-Length", str(fs[6])) - self.send_header("Last-Modified", self.date_time_string(fs.st_mtime)) - self.end_headers() - return f - except: - f.close() - raise - - def list_directory(self, path): - """Helper to produce a directory listing (absent index.html). - - Return value is either a file object, or None (indicating an - error). In either case, the headers are sent, making the - interface the same as for send_head(). - - """ - try: - list_dir = os.listdir(path) - except os.error: - self.send_error(404, "No permission to list directory") - return None - list_dir.sort(key=lambda a: a.lower()) - f = StringIO() - displaypath = cgi.escape(urllib.unquote(self.path)) - f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') - f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath) - f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath) - f.write("<hr>\n<ul>\n") - for name in list_dir: - fullname = os.path.join(path, name) - displayname = linkname = name - # Append / for directories or @ for symbolic links - if os.path.isdir(fullname): - displayname = name + "/" - linkname = name + "/" - if os.path.islink(fullname): - displayname = name + "@" - # Note: a link to a directory displays with @ and links with / - f.write('<li><a href="%s">%s</a>\n' - % (urllib.quote(linkname), cgi.escape(displayname))) - f.write("</ul>\n<hr>\n</body>\n</html>\n") - length = f.tell() - f.seek(0) - self.send_response(200) - encoding = sys.getfilesystemencoding() - self.send_header("Content-type", "text/html; charset=%s" % encoding) - self.send_header("Content-Length", str(length)) - self.end_headers() - return f - - @staticmethod - def translate_path(path): - """Translate a /-separated PATH to the local filename syntax. - - Components that mean special things to the local file system - (e.g. drive or directory names) are ignored. (XXX They should - probably be diagnosed.) - - """ - # abandon query parameters - path = path.split('?', 1)[0] - path = path.split('#', 1)[0] - # Don't forget explicit trailing slash when normalizing. Issue17324 - trailing_slash = path.rstrip().endswith('/') - path = posixpath.normpath(urllib.unquote(path)) - words = path.split('/') - words = filter(None, words) - path = os.getcwd() - for word in words: - if os.path.dirname(word) or word in (os.curdir, os.pardir): - # Ignore components that are not a simple file/directory name - continue - path = os.path.join(path, word) - if trailing_slash: - path += '/' - return path - - @staticmethod - def copyfile(source, outputfile): - """Copy all data between two file objects. - - The SOURCE argument is a file object open for reading - (or anything with a read() method) and the DESTINATION - argument is a file object open for writing (or - anything with a write() method). - - The only reason for overriding this would be to change - the block size or perhaps to replace newlines by CRLF - -- note however that this the default server uses this - to copy binary data as well. - - """ - shutil.copyfileobj(source, outputfile) - - def guess_type(self, path): - """Guess the type of a file. - - Argument is a PATH (a filename). - - Return value is a string of the form type/subtype, - usable for a MIME Content-type header. - - The default implementation looks the file's extension - up in the table self.extensions_map, using application/octet-stream - as a default; however it would be permissible (if - slow) to look inside the data to make a better guess. - - """ - - base, ext = posixpath.splitext(path) - if ext in self.extensions_map: - return self.extensions_map[ext] - ext = ext.lower() - if ext in self.extensions_map: - return self.extensions_map[ext] - else: - return self.extensions_map[''] - - if not mimetypes.inited: - mimetypes.init() # try to read system mime.types - extensions_map = mimetypes.types_map.copy() - extensions_map.update({ - '': 'application/octet-stream', # Default - '.py': 'text/plain', - '.c': 'text/plain', - '.h': 'text/plain', - }) - - -def test(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0", port=3904): - print "Load event schema file: " + DcaeVariables.CommonEventSchema - with open(DcaeVariables.CommonEventSchema) as opened_file: - global EvtSchema - EvtSchema = json.load(opened_file) - - server_address = ('', port) - - handler_class.protocol_version = protocol - httpd = server_class(server_address, handler_class) - - global DMaaPHttpd - DMaaPHttpd = httpd - DcaeVariables.HTTPD = httpd - - sa = httpd.socket.getsockname() - print "Serving HTTP on", sa[0], "port", sa[1], "..." - # httpd.serve_forever() - - -def _main_(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"): - - if sys.argv[1:]: - port = int(sys.argv[1]) - else: - port = 3904 - - print "Load event schema file: " + DcaeVariables.CommonEventSchema - with open(DcaeVariables.CommonEventSchema) as opened_file: - global EvtSchema - EvtSchema = json.load(opened_file) - - server_address = ('', port) - - handler_class.protocol_version = protocol - httpd = server_class(server_address, handler_class) - - sa = httpd.socket.getsockname() - print "Serving HTTP on", sa[0], "port", sa[1], "..." - httpd.serve_forever() - - -if __name__ == '__main__': - _main_() diff --git a/tests/dcaegen2/testcases/resources/DcaeLibrary.py b/tests/dcaegen2/testcases/resources/DcaeLibrary.py deleted file mode 100644 index a9d5def8..00000000 --- a/tests/dcaegen2/testcases/resources/DcaeLibrary.py +++ /dev/null @@ -1,184 +0,0 @@ -''' -Created on Aug 18, 2017 - -@author: sw6830 -''' -from robot.api import logger -from Queue import Queue -import uuid -import time -import datetime -import json -import threading -import os -import platform -import subprocess -import paramiko -import DcaeVariables -import DMaaP - - -class DcaeLibrary(object): - - def __init__(self): - pass - - @staticmethod - def setup_dmaap_server(port_num=3904): - if DcaeVariables.HttpServerThread is not None: - DMaaP.clean_up_event() - logger.console("Clean up event from event queue before test") - logger.info("DMaaP Server already started") - return "true" - - DcaeVariables.IsRobotRun = True - DMaaP.test(port=port_num) - try: - DcaeVariables.VESEventQ = Queue() - DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever) - DcaeVariables.HttpServerThread.start() - logger.console("DMaaP Mockup Sever started") - time.sleep(2) - return "true" - except Exception as e: - print (str(e)) - return "false" - - @staticmethod - def shutdown_dmaap(): - if DcaeVariables.HTTPD is not None: - DcaeVariables.HTTPD.shutdown() - logger.console("DMaaP Server shut down") - time.sleep(3) - return "true" - else: - return "false" - - @staticmethod - def cleanup_ves_events(): - if DcaeVariables.HttpServerThread is not None: - DMaaP.clean_up_event() - logger.console("DMaaP event queue is cleaned up") - return "true" - logger.console("DMaaP server not started yet") - return "false" - - @staticmethod - def enable_vesc_with_certBasicAuth(): - global client - if 'Windows' in platform.system(): - try: - client = paramiko.SSHClient() - client.load_system_host_keys() - # client.set_missing_host_key_policy(paramiko.WarningPolicy) - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD']) - stdin, stdout, stderr = client.exec_command('%{WORKSPACE}/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh') - logger.console(stdout.read()) - finally: - client.close() - return - ws = os.environ['WORKSPACE'] - script2run = ws + "/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh" - logger.info("Running script: " + script2run) - logger.console("Running script: " + script2run) - subprocess.call(script2run) - time.sleep(5) - return - - @staticmethod - def dmaap_message_receive_on_topic(evtobj, topic): - - evt_str = DMaaP.deque_event() - while evt_str != None: - if evtobj in evt_str and topic in evt_str: - logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str) - logger.info("On Expected Topic:\n" + topic) - return 'true' - evt_str = DMaaP.deque_event() - return 'false' - - @staticmethod - def dmaap_message_receive(evtobj, action='contain'): - - evt_str = DMaaP.deque_event() - while evt_str != None: - if action == 'contain': - if evtobj in evt_str: - logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str) - return 'true' - if action == 'sizematch': - if len(evtobj) == len(evt_str): - return 'true' - if action == 'dictmatch': - evt_dict = json.loads(evt_str) - if cmp(evtobj, evt_dict) == 0: - return 'true' - evt_str = DMaaP.deque_event() - return 'false' - - @staticmethod - def is_json_empty(resp): - logger.info("Enter is_json_empty: resp.text: " + resp.text) - if resp.text is None or len(resp.text) < 2: - return 'True' - return 'False' - - @staticmethod - def generate_uuid(): - """generate a uuid""" - return uuid.uuid4() - - @staticmethod - def get_json_value_list(jsonstr, keyval): - logger.info("Enter Get_Json_Key_Value_List") - if jsonstr is None or len(jsonstr) < 2: - logger.info("No Json data found") - return [] - try: - data = json.loads(jsonstr) - nodelist = [] - for item in data: - nodelist.append(item[keyval]) - return nodelist - except Exception as e: - logger.info("Json data parsing fails") - print str(e) - return [] - - @staticmethod - def generate_millitimestamp_uuid(): - """generate a millisecond timestamp uuid""" - then = datetime.datetime.now() - return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3) - - @staticmethod - def test(): - import json - from pprint import pprint - - with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file: - data = json.load(data_file) - - data['event']['commonEventHeader']['version'] = '5.0' - pprint(data) - - -if __name__ == '__main__': - ''' - dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284" - cls = DcaeLibrary() - #dict = cls.create_header_from_string(dictStr) - #print str(dict) - jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]" - lsObj = cls.get_json_value_list(jsonStr, 'Status') - print lsObj - ''' - - lib = DcaeLibrary() - lib.enable_vesc_https_auth() - - ret = lib.setup_dmaap_server() - print ret - time.sleep(100000) diff --git a/tests/dcaegen2/testcases/resources/dcae_keywords.robot b/tests/dcaegen2/testcases/resources/dcae_keywords.robot index 52424e63..fb5fc1d6 100644 --- a/tests/dcaegen2/testcases/resources/dcae_keywords.robot +++ b/tests/dcaegen2/testcases/resources/dcae_keywords.robot @@ -1,13 +1,14 @@ *** Settings *** Documentation The main interface for interacting with DCAE. It handles low level stuff like managing the http request library and DCAE required fields +Library robot_library.DcaeLibrary +Library robot_library.DmaapLibrary +Library robot_library.CertsLibrary Library RequestsLibrary -Library DcaeLibrary Library OperatingSystem Library Collections -Library CertsLibrary -Variables ../resources/DcaeVariables.py +Variables ./robot_library/DcaeVariables.py Resource ../../../common.robot -Resource ../resources/dcae_properties.robot +Resource ./dcae_properties.robot *** Keywords *** Create sessions diff --git a/tests/dcaegen2/testcases/resources/CertsLibrary.py b/tests/dcaegen2/testcases/resources/robot_library/CertsLibrary.py index b8189422..b8189422 100644 --- a/tests/dcaegen2/testcases/resources/CertsLibrary.py +++ b/tests/dcaegen2/testcases/resources/robot_library/CertsLibrary.py diff --git a/tests/dcaegen2/testcases/resources/robot_library/DcaeLibrary.py b/tests/dcaegen2/testcases/resources/robot_library/DcaeLibrary.py new file mode 100644 index 00000000..a467431f --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/DcaeLibrary.py @@ -0,0 +1,112 @@ +''' +Created on Aug 18, 2017 + +@author: sw6830 +''' +from robot.api import logger +import uuid +import time +import datetime +import json +import os +import platform +import subprocess +import paramiko + + +class DcaeLibrary(object): + + def __init__(self): + pass + + @staticmethod + def enable_vesc_with_cert_basic_auth(): + global client + if 'Windows' in platform.system(): + try: + DcaeLibrary.enable_https_auth_for_windows_platform_system() + finally: + client.close() + return + DcaeLibrary.enable_https_auth_for_non_windows_platform_system() + return + + @staticmethod + def enable_https_auth_for_non_windows_platform_system(): + ws = os.environ['WORKSPACE'] + script2run = ws + "/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh" + logger.info("Running script: " + script2run) + logger.console("Running script: " + script2run) + subprocess.call(script2run) + time.sleep(5) + + @staticmethod + def enable_https_auth_for_windows_platform_system(): + global client + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD']) + stdin, stdout, stderr = client.exec_command( + '%{WORKSPACE}/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh') + logger.console(stdout.read()) + + @staticmethod + def is_json_empty(resp): + logger.info("Enter is_json_empty: resp.text: " + resp.text) + if resp.text is None or len(resp.text) < 2: + return 'True' + return 'False' + + @staticmethod + def generate_uuid(): + """generate a uuid""" + return uuid.uuid4() + + @staticmethod + def get_json_value_list(jsonstr, keyval): + logger.info("Enter Get_Json_Key_Value_List") + if jsonstr is None or len(jsonstr) < 2: + logger.info("No Json data found") + return [] + try: + return DcaeLibrary.extract_list_of_items_from_json_string(jsonstr, keyval) + except Exception as e: + logger.info("Json data parsing fails") + print str(e) + return [] + + @staticmethod + def extract_list_of_items_from_json_string(jsonstr, keyval): + data = json.loads(jsonstr) + nodelist = [] + for item in data: + nodelist.append(item[keyval]) + return nodelist + + @staticmethod + def generate_millitimestamp_uuid(): + """generate a millisecond timestamp uuid""" + then = datetime.datetime.now() + return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3) + + @staticmethod + def test(): + import json + from pprint import pprint + + with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file: + data = json.load(data_file) + + data['event']['commonEventHeader']['version'] = '5.0' + pprint(data) + + +if __name__ == '__main__': + + lib = DcaeLibrary() + lib.enable_vesc_https_auth() + + ret = lib.setup_dmaap_server() + print ret + time.sleep(100000) diff --git a/tests/dcaegen2/testcases/resources/DcaeVariables.py b/tests/dcaegen2/testcases/resources/robot_library/DcaeVariables.py index 1617714a..47d169f1 100644 --- a/tests/dcaegen2/testcases/resources/DcaeVariables.py +++ b/tests/dcaegen2/testcases/resources/robot_library/DcaeVariables.py @@ -10,8 +10,4 @@ DCAE_HEALTH_CHECK_URL1 = "http://135.205.228.170:8500" CommonEventSchema = get_environment_variable('WORKSPACE') + "/tests/dcaegen2/testcases/assets/json_events/CommonEventFormat_30.2_ONAP.json" -HttpServerThread = None -HTTPD = None -VESEventQ = None IsRobotRun = False - diff --git a/tests/dcaegen2/testcases/resources/robot_library/DmaapLibrary.py b/tests/dcaegen2/testcases/resources/robot_library/DmaapLibrary.py new file mode 100644 index 00000000..c9a0ff7b --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/DmaapLibrary.py @@ -0,0 +1,79 @@ +from Queue import Queue + +import robot.api.logger as logger +import threading +import time + +import DcaeVariables +from robot_library.dmaap_simulator import DMaaPServer +from robot_library.dmaap_simulator.DMaaPQueue import DMaaPQueue + +class DmaapLibrary(object): + + dmaap_queue = None + dmaap_server = None + server_thread = None + + def __init__(self): + pass + + @staticmethod + def setup_dmaap_server(port_num=3904): + try: + DmaapLibrary.start_dmaap_server_on_new_thread(port_num) + return "true" + except Exception as e: + print (str(e)) + return "false" + + @staticmethod + def start_dmaap_server_on_new_thread(port_num): + DmaapLibrary.dmaap_queue = DMaaPQueue(Queue()) + DmaapLibrary.dmaap_server = DMaaPServer.create_dmaap_server(DmaapLibrary.dmaap_queue, port=port_num) + DmaapLibrary.server_thread = threading.Thread(name='DMAAP_HTTPServer', + target=DmaapLibrary.dmaap_server.serve_forever) + DmaapLibrary.server_thread.start() + logger.console("DMaaP Mockup Sever started") + DcaeVariables.IsRobotRun = True + time.sleep(2) + + @staticmethod + def shutdown_dmaap(): + if DmaapLibrary.dmaap_server is not None: + DmaapLibrary.dmaap_server.shutdown() + logger.console("DMaaP Server shut down") + time.sleep(3) + return "true" + else: + return "false" + + @staticmethod + def cleanup_ves_events(): + if DmaapLibrary.server_thread is not None: + DmaapLibrary.dmaap_queue.clean_up_event() + logger.console("DMaaP event queue is cleaned up") + return "true" + logger.console("DMaaP server not started yet") + return "false" + + @staticmethod + def dmaap_message_receive_on_topic(evtobj, topic): + + evt_str = DmaapLibrary.dmaap_queue.deque_event() + while evt_str != None: + if evtobj in evt_str and topic in evt_str: + logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str) + logger.info("On Expected Topic:\n" + topic) + return 'true' + evt_str = DmaapLibrary.dmaap_queue.deque_event() + return 'false' + + @staticmethod + def dmaap_message_receive(evtobj): + evt_str = DmaapLibrary.dmaap_queue.deque_event() + while evt_str != None: + if evtobj in evt_str: + logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str) + return 'true' + evt_str = DmaapLibrary.dmaap_queue.deque_event() + return 'false' diff --git a/tests/dcaegen2/testcases/resources/robot_library/README.md b/tests/dcaegen2/testcases/resources/robot_library/README.md new file mode 100644 index 00000000..ac432f79 --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/README.md @@ -0,0 +1,14 @@ +# Robot Library +This catalog contains python files used in Robot tests for dcaegen2.ves. + +# DMaaP Simulator +Catalog dmaap_simulator contains python implementation of DMaaP simulator. It uses python BaseHTTPServer to expose endpoints. + +# DMaaP Tests +Catalog dmaap_test contains tests that are used to validate DMaaP simulator. Test are using "pytest" and "MagicMock". + +### In order to run tests: +1. create virtual environemnt with Python 2.7; +2. install requirements from file requirements.txt located in dmaap_test; +3. set environement variable WORKSPACE to point root csit catalog +4. run py.test command in catalog dmaap_test diff --git a/tests/dcaegen2/testcases/resources/robot_library/__init__.py b/tests/dcaegen2/testcases/resources/robot_library/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/__init__.py diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPHandler.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPHandler.py new file mode 100644 index 00000000..f1c46e19 --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPHandler.py @@ -0,0 +1,73 @@ +''' +Created on Aug 15, 2017 + +@author: sw6830 +''' +import os +import posixpath +import BaseHTTPServer +import urllib +import urlparse +import cgi +import sys +import shutil +import mimetypes +from robot_library import DcaeVariables + +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + + +class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler): + + def __init__(self, dmaap_simulator, *args): + self.dmaap_simulator = dmaap_simulator + BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args) + + def do_POST(self): + if 'POST' not in self.requestline: + resp_code = 405 + else: + resp_code = self.parse_the_posted_data() + + if resp_code == 0: + self.send_successful_response() + else: + self.send_response(resp_code) + + def parse_the_posted_data(self): + topic = self.extract_topic_from_path() + content_len = self.get_content_length() + post_body = self.rfile.read(content_len) + post_body = self.get_json_part_of_post_body(post_body) + event = "{\"" + topic + "\":" + post_body + "}" + if self.dmaap_simulator.enque_event(event): + resp_code = 0 + else: + print "enque event fails" + resp_code = 500 + return resp_code + + def get_json_part_of_post_body(self, post_body): + indx = post_body.index("{") + if indx != 0: + post_body = post_body[indx:] + return post_body + + def extract_topic_from_path(self): + return self.path["/events/".__len__():] + + def get_content_length(self): + return int(self.headers.getheader('content-length', 0)) + + def send_successful_response(self): + if 'clientThrottlingState' in self.requestline: + self.send_response(204) + else: + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write("{'count': 1, 'serverTimeMs': 3}") + self.wfile.close() diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPQueue.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPQueue.py new file mode 100644 index 00000000..3d3a81fa --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPQueue.py @@ -0,0 +1,48 @@ +class DMaaPQueue(object): + + def __init__(self, event_queue, wait_timeout_sec=25): + self.event_queue = event_queue + self.wait_timeout_sec = wait_timeout_sec + + def set_deque_event_timeout(self, wait_timeout_sec): + self.wait_timeout_sec = wait_timeout_sec + + def clean_up_event(self): + if self.queue_is_valid(): + with self.event_queue.mutex: + try: + self.event_queue.queue.clear() + except: + pass + + def enque_event(self, event): + event_placed_on_queue = False + if self.queue_is_valid(): + event_placed_on_queue = self._enque_event(event, event_placed_on_queue) + return event_placed_on_queue + + def _enque_event(self, event, event_placed_on_queue): + try: + self.event_queue.put(event) + event_placed_on_queue = True + except Exception as e: + print (str(e)) + return event_placed_on_queue + + def deque_event(self, wait_sec=None): + if wait_sec is None: + wait_sec = self.wait_timeout_sec + event_from_queue = None + if self.queue_is_valid(): + event_from_queue = self._deque_event(event_from_queue, wait_sec) + return event_from_queue + + def _deque_event(self, event_from_queue, wait_sec): + try: + event_from_queue = self.event_queue.get(True, wait_sec) + except Exception as e: + print("DMaaP Event dequeue timeout") + return event_from_queue + + def queue_is_valid(self): + return self.event_queue is not None diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPServer.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPServer.py new file mode 100644 index 00000000..37499be8 --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/DMaaPServer.py @@ -0,0 +1,22 @@ +import BaseHTTPServer +import DMaaPHandler + + +class DMaaPServer(BaseHTTPServer.HTTPServer): + + def __init__(self, server_address, protocol, dmaap_simulator): + + def handler_class_constructor(*args): + DMaaPHandler.DMaaPHandler(dmaap_simulator, *args) + DMaaPHandler.protocol_version = protocol + BaseHTTPServer.HTTPServer.__init__(self, server_address, handler_class_constructor) + + serer_address = self.socket.getsockname() + print "Serving HTTP on", serer_address[0], "port", serer_address[1], "..." + + +def create_dmaap_server(dmaap_simulator, protocol="HTTP/1.0", port=3904): + server_address = ('', port) + httpd = DMaaPServer(server_address, protocol, dmaap_simulator) + + return httpd diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/__init__.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_simulator/__init__.py diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/__init__.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/__init__.py diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/requirements.txt b/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/requirements.txt new file mode 100644 index 00000000..7a687204 --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/requirements.txt @@ -0,0 +1,23 @@ +atomicwrites==1.4.0 +attrs==19.3.0 +backports.functools-lru-cache==1.6.1 +configparser==4.0.2 +contextlib2==0.6.0.post1 +funcsigs==1.0.2 +httplib2==0.18.1 +importlib-metadata==1.7.0 +magicmock==0.3 +mock==3.0.5 +mocker==1.1.1 +more-itertools==5.0.0 +packaging==20.4 +pathlib2==2.3.5 +pluggy==0.13.1 +py==1.9.0 +pyparsing==2.4.7 +pytest==4.6.11 +pytest-mock==2.0.0 +scandir==1.10.0 +six==1.15.0 +wcwidth==0.2.5 +zipp==1.2.0 diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/test_DMaaPSQueue.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/test_DMaaPSQueue.py new file mode 100644 index 00000000..f278a391 --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/test_DMaaPSQueue.py @@ -0,0 +1,52 @@ +from Queue import Queue +import pytest +from robot_library.dmaap_simulator.DMaaPQueue import DMaaPQueue + +wait_sec_for_dequeing_event = 0.1 +test_event = "\"topic\":{\"test\":123}" + + +class TestDMaaPQueue: + + dmaap_simulator = None + + @pytest.fixture(autouse=True, scope="function") + def initiate_dmaap_simulator(self): + TestDMaaPQueue.dmaap_simulator = DMaaPQueue(Queue()) + TestDMaaPQueue.dmaap_simulator.set_deque_event_timeout(wait_sec_for_dequeing_event) + yield + + def test_when_queue_is_empty_then_deque_returns_none(self): + # when + event = TestDMaaPQueue.dmaap_simulator.deque_event() + + # then + assert event is None + + def test_when_enque_event_then_dequeue_return_same_event(self): + # when + TestDMaaPQueue.dmaap_simulator.enque_event(test_event) + event = TestDMaaPQueue.dmaap_simulator.deque_event() + + # then + assert event == test_event + + def test_when_enque_and_dequeue_event_then_deque_return_none(self): + # when + TestDMaaPQueue.dmaap_simulator.enque_event(test_event) + TestDMaaPQueue.dmaap_simulator.deque_event() + event = TestDMaaPQueue.dmaap_simulator.deque_event() + + # then + assert event is None + + def test_when_enque_few_events_and_clean_up_then_dequeu_return_none(self): + # when + TestDMaaPQueue.dmaap_simulator.enque_event(test_event) + TestDMaaPQueue.dmaap_simulator.enque_event(test_event) + TestDMaaPQueue.dmaap_simulator.enque_event(test_event) + TestDMaaPQueue.dmaap_simulator.clean_up_event() + event = TestDMaaPQueue.dmaap_simulator.deque_event() + + # then + assert event is None diff --git a/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/test_DMaaPSimulator.py b/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/test_DMaaPSimulator.py new file mode 100644 index 00000000..82f95ff8 --- /dev/null +++ b/tests/dcaegen2/testcases/resources/robot_library/dmaap_test/test_DMaaPSimulator.py @@ -0,0 +1,62 @@ +import sys +import pytest +from mock import MagicMock + +sys.modules['robot'] = MagicMock() +sys.modules['robot.api'] = MagicMock() +sys.modules['robot.api.logger'] = MagicMock() +from robot_library.DmaapLibrary import DmaapLibrary + +wait_sec_for_dequeing_event = 0.1 +test_event = "{\"test\":\"123\"}" +test_topic = "topic" +test_message = "\"" + test_topic + "\":" + test_event + + +class TestDMaaPSimulator: + + @pytest.fixture(autouse=True, scope="class") + def initiate_dmaap_simulator(self): + DmaapLibrary.setup_dmaap_server() + DmaapLibrary.dmaap_queue.set_deque_event_timeout(wait_sec_for_dequeing_event) + yield + assert DmaapLibrary.shutdown_dmaap() == "true" + + @pytest.fixture(autouse=True, scope="function") + def clear_dmaap_simulator(self): + yield + DmaapLibrary.cleanup_ves_events() + + def test_start_stop_dmaap_server(self): + # when / then + assert DmaapLibrary.dmaap_queue is not None + assert DmaapLibrary.dmaap_server is not None + assert DmaapLibrary.server_thread is not None + + def test_dmaap_server_returns_true_when_event_is_present_on_queue(self): + # when + DmaapLibrary.dmaap_queue.enque_event(test_message) + + # then + assert DmaapLibrary.dmaap_message_receive(test_event) == 'true' + + def test_dmaap_server_returns_true_when_event_is_present_on_given_topic_on_queue(self): + # when + DmaapLibrary.dmaap_queue.enque_event(test_message) + + # then + assert DmaapLibrary.dmaap_message_receive_on_topic(test_event, test_topic) == 'true' + + def test_dmaap_server_returns_timeout_when_event_is_not_present_on_queue(self): + # when / then + assert DmaapLibrary.dmaap_message_receive(test_event) == 'false' + + def test_dmaap_server_returns_false_when_queue_was_cleared(self): + # when + DmaapLibrary.dmaap_queue.enque_event(test_message) + DmaapLibrary.dmaap_queue.enque_event(test_message) + DmaapLibrary.dmaap_queue.enque_event(test_message) + DmaapLibrary.cleanup_ves_events() + + # then + assert DmaapLibrary.dmaap_message_receive_on_topic(test_event, test_topic) == 'false' |