diff options
Diffstat (limited to 'tests/dcaegen2/testcases/resources/DcaeLibrary.py')
-rw-r--r-- | tests/dcaegen2/testcases/resources/DcaeLibrary.py | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/tests/dcaegen2/testcases/resources/DcaeLibrary.py b/tests/dcaegen2/testcases/resources/DcaeLibrary.py new file mode 100644 index 00000000..b43ee29e --- /dev/null +++ b/tests/dcaegen2/testcases/resources/DcaeLibrary.py @@ -0,0 +1,173 @@ +''' +Created on Aug 18, 2017 + +@author: sw6830 +''' +from robot.api import logger +from Queue import Queue +import uuid +import time +import datetime +import json +import threading +import os +import platform +import subprocess +import paramiko +import DcaeVariables +import DMaaP + + +class DcaeLibrary(object): + + def __init__(self): + pass + + @staticmethod + def setup_dmaap_server(port_num=3904): + if DcaeVariables.HttpServerThread is not None: + DMaaP.clean_up_event() + logger.console("Clean up event from event queue before test") + logger.info("DMaaP Server already started") + return "true" + + DcaeVariables.IsRobotRun = True + DMaaP.test(port=port_num) + try: + DcaeVariables.VESEventQ = Queue() + DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever) + DcaeVariables.HttpServerThread.start() + logger.console("DMaaP Mockup Sever started") + time.sleep(2) + return "true" + except Exception as e: + print (str(e)) + return "false" + + @staticmethod + def shutdown_dmaap(): + if DcaeVariables.HTTPD is not None: + DcaeVariables.HTTPD.shutdown() + logger.console("DMaaP Server shut down") + time.sleep(3) + return "true" + else: + return "false" + + @staticmethod + def cleanup_ves_events(): + if DcaeVariables.HttpServerThread is not None: + DMaaP.clean_up_event() + logger.console("DMaaP event queue is cleaned up") + return "true" + logger.console("DMaaP server not started yet") + return "false" + + @staticmethod + def enable_vesc_https_auth(): + global client + if 'Windows' in platform.system(): + try: + client = paramiko.SSHClient() + client.load_system_host_keys() + # client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD']) + stdin, stdout, stderr = client.exec_command('%{WORKSPACE}/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh') + logger.console(stdout.read()) + finally: + client.close() + return + ws = os.environ['WORKSPACE'] + script2run = ws + "/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh" + logger.info("Running script: " + script2run) + logger.console("Running script: " + script2run) + subprocess.call(script2run) + time.sleep(5) + return + + @staticmethod + def dmaap_message_receive(evtobj, action='contain'): + + evt_str = DMaaP.deque_event() + while evt_str != None: + logger.console("DMaaP receive VES Event:\n" + evt_str) + if action == 'contain': + if evtobj in evt_str: + logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str) + return 'true' + if action == 'sizematch': + if len(evtobj) == len(evt_str): + return 'true' + if action == 'dictmatch': + evt_dict = json.loads(evt_str) + if cmp(evtobj, evt_dict) == 0: + return 'true' + evt_str = DMaaP.deque_event() + return 'false' + + @staticmethod + def is_json_empty(resp): + logger.info("Enter is_json_empty: resp.text: " + resp.text) + if resp.text is None or len(resp.text) < 2: + return 'True' + return 'False' + + @staticmethod + def generate_uuid(): + """generate a uuid""" + return uuid.uuid4() + + @staticmethod + def get_json_value_list(jsonstr, keyval): + logger.info("Enter Get_Json_Key_Value_List") + if jsonstr is None or len(jsonstr) < 2: + logger.info("No Json data found") + return [] + try: + data = json.loads(jsonstr) + nodelist = [] + for item in data: + nodelist.append(item[keyval]) + return nodelist + except Exception as e: + logger.info("Json data parsing fails") + print str(e) + return [] + + @staticmethod + def generate_millitimestamp_uuid(): + """generate a millisecond timestamp uuid""" + then = datetime.datetime.now() + return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3) + + @staticmethod + def test(): + import json + from pprint import pprint + + with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file: + data = json.load(data_file) + + data['event']['commonEventHeader']['version'] = '5.0' + pprint(data) + + +if __name__ == '__main__': + ''' + dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284" + cls = DcaeLibrary() + #dict = cls.create_header_from_string(dictStr) + #print str(dict) + jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]" + lsObj = cls.get_json_value_list(jsonStr, 'Status') + print lsObj + ''' + + lib = DcaeLibrary() + lib.enable_vesc_https_auth() + + ret = lib.setup_dmaap_server() + print ret + time.sleep(100000) |