diff options
Diffstat (limited to 'test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py')
-rw-r--r-- | test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py | 86 |
1 files changed, 50 insertions, 36 deletions
diff --git a/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py b/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py index e581f1b2c..b43ee29e2 100644 --- a/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py +++ b/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py @@ -5,24 +5,34 @@ Created on Aug 18, 2017 ''' from robot.api import logger from Queue import Queue -import uuid, time, datetime,json, threading,os, platform, subprocess,paramiko +import uuid +import time +import datetime +import json +import threading +import os +import platform +import subprocess +import paramiko import DcaeVariables import DMaaP + class DcaeLibrary(object): def __init__(self): pass - def setup_dmaap_server(self, portNum=3904): - if DcaeVariables.HttpServerThread != None: - DMaaP.cleanUpEvent() + @staticmethod + def setup_dmaap_server(port_num=3904): + if DcaeVariables.HttpServerThread is not None: + DMaaP.clean_up_event() logger.console("Clean up event from event queue before test") logger.info("DMaaP Server already started") return "true" DcaeVariables.IsRobotRun = True - DMaaP.test(port=portNum) + DMaaP.test(port=port_num) try: DcaeVariables.VESEventQ = Queue() DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever) @@ -34,8 +44,9 @@ class DcaeLibrary(object): print (str(e)) return "false" - def shutdown_dmaap(self): - if DcaeVariables.HTTPD != None: + @staticmethod + def shutdown_dmaap(): + if DcaeVariables.HTTPD is not None: DcaeVariables.HTTPD.shutdown() logger.console("DMaaP Server shut down") time.sleep(3) @@ -43,20 +54,23 @@ class DcaeLibrary(object): else: return "false" - def cleanup_ves_events(self): - if DcaeVariables.HttpServerThread != None: - DMaaP.cleanUpEvent() + @staticmethod + def cleanup_ves_events(): + if DcaeVariables.HttpServerThread is not None: + DMaaP.clean_up_event() logger.console("DMaaP event queue is cleaned up") return "true" logger.console("DMaaP server not started yet") return "false" - def enable_vesc_https_auth(self): + @staticmethod + def enable_vesc_https_auth(): + global client if 'Windows' in platform.system(): try: client = paramiko.SSHClient() client.load_system_host_keys() - #client.set_missing_host_key_policy(paramiko.WarningPolicy) + # client.set_missing_host_key_policy(paramiko.WarningPolicy) client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD']) @@ -73,42 +87,42 @@ class DcaeLibrary(object): time.sleep(5) return - def dmaap_message_receive(self, evtobj, action='contain'): + @staticmethod + def dmaap_message_receive(evtobj, action='contain'): - evtStr = DMaaP.dequeEvent() - while evtStr != None: - logger.console("DMaaP receive VES Event:\n" + evtStr) + evt_str = DMaaP.deque_event() + while evt_str != None: + logger.console("DMaaP receive VES Event:\n" + evt_str) if action == 'contain': - if evtobj in evtStr: - logger.info("DMaaP Receive Expected Publish Event:\n" + evtStr) + if evtobj in evt_str: + logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str) return 'true' if action == 'sizematch': - if len(evtobj) == len(evtStr): + if len(evtobj) == len(evt_str): return 'true' if action == 'dictmatch': - evtDict = json.loads(evtStr) - if cmp(evtobj, evtDict) == 0: + evt_dict = json.loads(evt_str) + if cmp(evtobj, evt_dict) == 0: return 'true' - evtStr = DMaaP.dequeEvent() + evt_str = DMaaP.deque_event() return 'false' - - def create_header_from_string(self, dictStr): - logger.info("Enter create_header_from_string: dictStr") - return dict(u.split("=") for u in dictStr.split(",")) - - def is_json_empty(self, resp): + + @staticmethod + def is_json_empty(resp): logger.info("Enter is_json_empty: resp.text: " + resp.text) - if resp.text == None or len(resp.text) < 2: + if resp.text is None or len(resp.text) < 2: return 'True' return 'False' - def Generate_UUID(self): + @staticmethod + def generate_uuid(): """generate a uuid""" return uuid.uuid4() - def get_json_value_list(self, jsonstr, keyval): + @staticmethod + def get_json_value_list(jsonstr, keyval): logger.info("Enter Get_Json_Key_Value_List") - if jsonstr == None or len(jsonstr) < 2: + if jsonstr is None or len(jsonstr) < 2: logger.info("No Json data found") return [] try: @@ -122,12 +136,14 @@ class DcaeLibrary(object): print str(e) return [] - def generate_MilliTimestamp_UUID(self): + @staticmethod + def generate_millitimestamp_uuid(): """generate a millisecond timestamp uuid""" then = datetime.datetime.now() return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3) - def test (self): + @staticmethod + def test(): import json from pprint import pprint @@ -138,7 +154,6 @@ class DcaeLibrary(object): pprint(data) - if __name__ == '__main__': ''' dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284" @@ -156,4 +171,3 @@ if __name__ == '__main__': ret = lib.setup_dmaap_server() print ret time.sleep(100000) - |