From 131d4dda822b637e816ef225d8a4c9981ccf56a7 Mon Sep 17 00:00:00 2001 From: Bartosz Gardziejewski Date: Tue, 4 Aug 2020 14:29:10 +0200 Subject: Refactor DMaaP simulator and add tests. Issue-ID: DCAEGEN2-1771 Signed-off-by: Bartosz Gardziejewski Change-Id: I65772f9cdaf546171941253abdf3977b41a3e50e --- tests/dcaegen2/testcases/resources/DcaeLibrary.py | 184 ---------------------- 1 file changed, 184 deletions(-) delete mode 100644 tests/dcaegen2/testcases/resources/DcaeLibrary.py (limited to 'tests/dcaegen2/testcases/resources/DcaeLibrary.py') diff --git a/tests/dcaegen2/testcases/resources/DcaeLibrary.py b/tests/dcaegen2/testcases/resources/DcaeLibrary.py deleted file mode 100644 index a9d5def8..00000000 --- a/tests/dcaegen2/testcases/resources/DcaeLibrary.py +++ /dev/null @@ -1,184 +0,0 @@ -''' -Created on Aug 18, 2017 - -@author: sw6830 -''' -from robot.api import logger -from Queue import Queue -import uuid -import time -import datetime -import json -import threading -import os -import platform -import subprocess -import paramiko -import DcaeVariables -import DMaaP - - -class DcaeLibrary(object): - - def __init__(self): - pass - - @staticmethod - def setup_dmaap_server(port_num=3904): - if DcaeVariables.HttpServerThread is not None: - DMaaP.clean_up_event() - logger.console("Clean up event from event queue before test") - logger.info("DMaaP Server already started") - return "true" - - DcaeVariables.IsRobotRun = True - DMaaP.test(port=port_num) - try: - DcaeVariables.VESEventQ = Queue() - DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever) - DcaeVariables.HttpServerThread.start() - logger.console("DMaaP Mockup Sever started") - time.sleep(2) - return "true" - except Exception as e: - print (str(e)) - return "false" - - @staticmethod - def shutdown_dmaap(): - if DcaeVariables.HTTPD is not None: - DcaeVariables.HTTPD.shutdown() - logger.console("DMaaP Server shut down") - time.sleep(3) - return "true" - else: - return "false" - - @staticmethod - def cleanup_ves_events(): - if DcaeVariables.HttpServerThread is not None: - DMaaP.clean_up_event() - logger.console("DMaaP event queue is cleaned up") - return "true" - logger.console("DMaaP server not started yet") - return "false" - - @staticmethod - def enable_vesc_with_certBasicAuth(): - global client - if 'Windows' in platform.system(): - try: - client = paramiko.SSHClient() - client.load_system_host_keys() - # client.set_missing_host_key_policy(paramiko.WarningPolicy) - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD']) - stdin, stdout, stderr = client.exec_command('%{WORKSPACE}/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh') - logger.console(stdout.read()) - finally: - client.close() - return - ws = os.environ['WORKSPACE'] - script2run = ws + "/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh" - logger.info("Running script: " + script2run) - logger.console("Running script: " + script2run) - subprocess.call(script2run) - time.sleep(5) - return - - @staticmethod - def dmaap_message_receive_on_topic(evtobj, topic): - - evt_str = DMaaP.deque_event() - while evt_str != None: - if evtobj in evt_str and topic in evt_str: - logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str) - logger.info("On Expected Topic:\n" + topic) - return 'true' - evt_str = DMaaP.deque_event() - return 'false' - - @staticmethod - def dmaap_message_receive(evtobj, action='contain'): - - evt_str = DMaaP.deque_event() - while evt_str != None: - if action == 'contain': - if evtobj in evt_str: - logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str) - return 'true' - if action == 'sizematch': - if len(evtobj) == len(evt_str): - return 'true' - if action == 'dictmatch': - evt_dict = json.loads(evt_str) - if cmp(evtobj, evt_dict) == 0: - return 'true' - evt_str = DMaaP.deque_event() - return 'false' - - @staticmethod - def is_json_empty(resp): - logger.info("Enter is_json_empty: resp.text: " + resp.text) - if resp.text is None or len(resp.text) < 2: - return 'True' - return 'False' - - @staticmethod - def generate_uuid(): - """generate a uuid""" - return uuid.uuid4() - - @staticmethod - def get_json_value_list(jsonstr, keyval): - logger.info("Enter Get_Json_Key_Value_List") - if jsonstr is None or len(jsonstr) < 2: - logger.info("No Json data found") - return [] - try: - data = json.loads(jsonstr) - nodelist = [] - for item in data: - nodelist.append(item[keyval]) - return nodelist - except Exception as e: - logger.info("Json data parsing fails") - print str(e) - return [] - - @staticmethod - def generate_millitimestamp_uuid(): - """generate a millisecond timestamp uuid""" - then = datetime.datetime.now() - return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3) - - @staticmethod - def test(): - import json - from pprint import pprint - - with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file: - data = json.load(data_file) - - data['event']['commonEventHeader']['version'] = '5.0' - pprint(data) - - -if __name__ == '__main__': - ''' - dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284" - cls = DcaeLibrary() - #dict = cls.create_header_from_string(dictStr) - #print str(dict) - jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]" - lsObj = cls.get_json_value_list(jsonStr, 'Status') - print lsObj - ''' - - lib = DcaeLibrary() - lib.enable_vesc_https_auth() - - ret = lib.setup_dmaap_server() - print ret - time.sleep(100000) -- cgit 1.2.3-korg