diff options
Diffstat (limited to 'test/csit/tests/dcae/testcases/resources/DcaeLibrary.py')
-rw-r--r-- | test/csit/tests/dcae/testcases/resources/DcaeLibrary.py | 135 |
1 files changed, 0 insertions, 135 deletions
diff --git a/test/csit/tests/dcae/testcases/resources/DcaeLibrary.py b/test/csit/tests/dcae/testcases/resources/DcaeLibrary.py deleted file mode 100644 index 650f8fef8..000000000 --- a/test/csit/tests/dcae/testcases/resources/DcaeLibrary.py +++ /dev/null @@ -1,135 +0,0 @@ -'''
-Created on Aug 18, 2017
-
-@author: sw6830
-'''
-from robot.api import logger
-from Queue import Queue
-import uuid, time, datetime,json, threading
-import DcaeVariables
-import DMaaP
-
-class DcaeLibrary(object):
-
- def __init__(self):
- pass
-
- def setup_dmaap_server(self, portNum=3904):
- if DcaeVariables.HttpServerThread != None:
- DMaaP.cleanUpEvent()
- logger.console("Clean up event from event queue before test")
- logger.info("DMaaP Server already started")
- return "true"
-
- DcaeVariables.IsRobotRun = True
- DMaaP.test(port=portNum)
- try:
- DcaeVariables.VESEventQ = Queue()
- DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever)
- DcaeVariables.HttpServerThread.start()
- logger.console("DMaaP Mockup Sever started")
- time.sleep(2)
- return "true"
- except Exception as e:
- print (str(e))
- return "false"
-
- def shutdown_dmaap(self):
- if DcaeVariables.HTTPD != None:
- DcaeVariables.HTTPD.shutdown()
- logger.console("DMaaP Server shut down")
- time.sleep(3)
- return "true"
- else:
- return "false"
-
- def cleanup_ves_events(self):
- if DcaeVariables.HttpServerThread != None:
- DMaaP.cleanUpEvent()
- logger.console("DMaaP event queue is cleaned up")
- return "true"
- logger.console("DMaaP server not started yet")
- return "false"
-
- def dmaap_message_receive(self, evtobj, action='contain'):
-
- evtStr = DMaaP.dequeEvent()
- while evtStr != None:
- logger.console("DMaaP receive VES Event:\n" + evtStr)
- if action == 'contain':
- if evtobj in evtStr:
- logger.info("DMaaP Receive Expected Publish Event:\n" + evtStr)
- return 'true'
- if action == 'sizematch':
- if len(evtobj) == len(evtStr):
- return 'true'
- if action == 'dictmatch':
- evtDict = json.loads(evtStr)
- if cmp(evtobj, evtDict) == 0:
- return 'true'
- evtStr = DMaaP.dequeEvent()
- return 'false'
-
- def create_header_from_string(self, dictStr):
- logger.info("Enter create_header_from_string: dictStr")
- return dict(u.split("=") for u in dictStr.split(","))
-
- def is_json_empty(self, resp):
- logger.info("Enter is_json_empty: resp.text: " + resp.text)
- if resp.text == None or len(resp.text) < 2:
- return 'True'
- return 'False'
-
- def Generate_UUID(self):
- """generate a uuid"""
- return uuid.uuid4()
-
- def get_json_value_list(self, jsonstr, keyval):
- logger.info("Enter Get_Json_Key_Value_List")
- if jsonstr == None or len(jsonstr) < 2:
- logger.info("No Json data found")
- return []
- try:
- data = json.loads(jsonstr)
- nodelist = []
- for item in data:
- nodelist.append(item[keyval])
- return nodelist
- except Exception as e:
- logger.info("Json data parsing fails")
- print str(e)
- return []
-
- def generate_MilliTimestamp_UUID(self):
- """generate a millisecond timestamp uuid"""
- then = datetime.datetime.now()
- return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
-
- def test (self):
- import json
- from pprint import pprint
-
- with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:
- data = json.load(data_file)
-
- data['event']['commonEventHeader']['version'] = '5.0'
- pprint(data)
-
-
-
-if __name__ == '__main__':
- '''
- dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284"
- cls = DcaeLibrary()
- #dict = cls.create_header_from_string(dictStr)
- #print str(dict)
- jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]"
- lsObj = cls.get_json_value_list(jsonStr, 'Status')
- print lsObj
- '''
-
- lib = DcaeLibrary()
- ret = lib.setup_dmaap_server()
- print ret
- time.sleep(10000000000)
-
\ No newline at end of file |