From 47e5ebb708f7e7a6fd550503ce103210f1935cec Mon Sep 17 00:00:00 2001 From: pswang Date: Mon, 25 Sep 2017 17:05:15 -0500 Subject: Rename dcae directory to dcaegen2 Change-Id: Icd930f1fe31e3db8cdf7715eca52495273e9b2b5 IssueId: DCAEGEN2-111 Signed-off-by: pswang --- .../tests/dcae/testcases/resources/DcaeLibrary.py | 135 --------------------- 1 file changed, 135 deletions(-) delete mode 100644 test/csit/tests/dcae/testcases/resources/DcaeLibrary.py (limited to 'test/csit/tests/dcae/testcases/resources/DcaeLibrary.py') diff --git a/test/csit/tests/dcae/testcases/resources/DcaeLibrary.py b/test/csit/tests/dcae/testcases/resources/DcaeLibrary.py deleted file mode 100644 index 650f8fef8..000000000 --- a/test/csit/tests/dcae/testcases/resources/DcaeLibrary.py +++ /dev/null @@ -1,135 +0,0 @@ -''' -Created on Aug 18, 2017 - -@author: sw6830 -''' -from robot.api import logger -from Queue import Queue -import uuid, time, datetime,json, threading -import DcaeVariables -import DMaaP - -class DcaeLibrary(object): - - def __init__(self): - pass - - def setup_dmaap_server(self, portNum=3904): - if DcaeVariables.HttpServerThread != None: - DMaaP.cleanUpEvent() - logger.console("Clean up event from event queue before test") - logger.info("DMaaP Server already started") - return "true" - - DcaeVariables.IsRobotRun = True - DMaaP.test(port=portNum) - try: - DcaeVariables.VESEventQ = Queue() - DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever) - DcaeVariables.HttpServerThread.start() - logger.console("DMaaP Mockup Sever started") - time.sleep(2) - return "true" - except Exception as e: - print (str(e)) - return "false" - - def shutdown_dmaap(self): - if DcaeVariables.HTTPD != None: - DcaeVariables.HTTPD.shutdown() - logger.console("DMaaP Server shut down") - time.sleep(3) - return "true" - else: - return "false" - - def cleanup_ves_events(self): - if DcaeVariables.HttpServerThread != None: - DMaaP.cleanUpEvent() - logger.console("DMaaP event queue is cleaned up") - return "true" - logger.console("DMaaP server not started yet") - return "false" - - def dmaap_message_receive(self, evtobj, action='contain'): - - evtStr = DMaaP.dequeEvent() - while evtStr != None: - logger.console("DMaaP receive VES Event:\n" + evtStr) - if action == 'contain': - if evtobj in evtStr: - logger.info("DMaaP Receive Expected Publish Event:\n" + evtStr) - return 'true' - if action == 'sizematch': - if len(evtobj) == len(evtStr): - return 'true' - if action == 'dictmatch': - evtDict = json.loads(evtStr) - if cmp(evtobj, evtDict) == 0: - return 'true' - evtStr = DMaaP.dequeEvent() - return 'false' - - def create_header_from_string(self, dictStr): - logger.info("Enter create_header_from_string: dictStr") - return dict(u.split("=") for u in dictStr.split(",")) - - def is_json_empty(self, resp): - logger.info("Enter is_json_empty: resp.text: " + resp.text) - if resp.text == None or len(resp.text) < 2: - return 'True' - return 'False' - - def Generate_UUID(self): - """generate a uuid""" - return uuid.uuid4() - - def get_json_value_list(self, jsonstr, keyval): - logger.info("Enter Get_Json_Key_Value_List") - if jsonstr == None or len(jsonstr) < 2: - logger.info("No Json data found") - return [] - try: - data = json.loads(jsonstr) - nodelist = [] - for item in data: - nodelist.append(item[keyval]) - return nodelist - except Exception as e: - logger.info("Json data parsing fails") - print str(e) - return [] - - def generate_MilliTimestamp_UUID(self): - """generate a millisecond timestamp uuid""" - then = datetime.datetime.now() - return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3) - - def test (self): - import json - from pprint import pprint - - with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file: - data = json.load(data_file) - - data['event']['commonEventHeader']['version'] = '5.0' - pprint(data) - - - -if __name__ == '__main__': - ''' - dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284" - cls = DcaeLibrary() - #dict = cls.create_header_from_string(dictStr) - #print str(dict) - jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]" - lsObj = cls.get_json_value_list(jsonStr, 'Status') - print lsObj - ''' - - lib = DcaeLibrary() - ret = lib.setup_dmaap_server() - print ret - time.sleep(10000000000) - \ No newline at end of file -- cgit 1.2.3-korg