aboutsummaryrefslogtreecommitdiffstats
path: root/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py')
-rw-r--r--test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py86
1 files changed, 50 insertions, 36 deletions
diff --git a/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py b/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py
index e581f1b2c..b43ee29e2 100644
--- a/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py
+++ b/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py
@@ -5,24 +5,34 @@ Created on Aug 18, 2017
'''
from robot.api import logger
from Queue import Queue
-import uuid, time, datetime,json, threading,os, platform, subprocess,paramiko
+import uuid
+import time
+import datetime
+import json
+import threading
+import os
+import platform
+import subprocess
+import paramiko
import DcaeVariables
import DMaaP
+
class DcaeLibrary(object):
def __init__(self):
pass
- def setup_dmaap_server(self, portNum=3904):
- if DcaeVariables.HttpServerThread != None:
- DMaaP.cleanUpEvent()
+ @staticmethod
+ def setup_dmaap_server(port_num=3904):
+ if DcaeVariables.HttpServerThread is not None:
+ DMaaP.clean_up_event()
logger.console("Clean up event from event queue before test")
logger.info("DMaaP Server already started")
return "true"
DcaeVariables.IsRobotRun = True
- DMaaP.test(port=portNum)
+ DMaaP.test(port=port_num)
try:
DcaeVariables.VESEventQ = Queue()
DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever)
@@ -34,8 +44,9 @@ class DcaeLibrary(object):
print (str(e))
return "false"
- def shutdown_dmaap(self):
- if DcaeVariables.HTTPD != None:
+ @staticmethod
+ def shutdown_dmaap():
+ if DcaeVariables.HTTPD is not None:
DcaeVariables.HTTPD.shutdown()
logger.console("DMaaP Server shut down")
time.sleep(3)
@@ -43,20 +54,23 @@ class DcaeLibrary(object):
else:
return "false"
- def cleanup_ves_events(self):
- if DcaeVariables.HttpServerThread != None:
- DMaaP.cleanUpEvent()
+ @staticmethod
+ def cleanup_ves_events():
+ if DcaeVariables.HttpServerThread is not None:
+ DMaaP.clean_up_event()
logger.console("DMaaP event queue is cleaned up")
return "true"
logger.console("DMaaP server not started yet")
return "false"
- def enable_vesc_https_auth(self):
+ @staticmethod
+ def enable_vesc_https_auth():
+ global client
if 'Windows' in platform.system():
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
- #client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ # client.set_missing_host_key_policy(paramiko.WarningPolicy)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
@@ -73,42 +87,42 @@ class DcaeLibrary(object):
time.sleep(5)
return
- def dmaap_message_receive(self, evtobj, action='contain'):
+ @staticmethod
+ def dmaap_message_receive(evtobj, action='contain'):
- evtStr = DMaaP.dequeEvent()
- while evtStr != None:
- logger.console("DMaaP receive VES Event:\n" + evtStr)
+ evt_str = DMaaP.deque_event()
+ while evt_str != None:
+ logger.console("DMaaP receive VES Event:\n" + evt_str)
if action == 'contain':
- if evtobj in evtStr:
- logger.info("DMaaP Receive Expected Publish Event:\n" + evtStr)
+ if evtobj in evt_str:
+ logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
return 'true'
if action == 'sizematch':
- if len(evtobj) == len(evtStr):
+ if len(evtobj) == len(evt_str):
return 'true'
if action == 'dictmatch':
- evtDict = json.loads(evtStr)
- if cmp(evtobj, evtDict) == 0:
+ evt_dict = json.loads(evt_str)
+ if cmp(evtobj, evt_dict) == 0:
return 'true'
- evtStr = DMaaP.dequeEvent()
+ evt_str = DMaaP.deque_event()
return 'false'
-
- def create_header_from_string(self, dictStr):
- logger.info("Enter create_header_from_string: dictStr")
- return dict(u.split("=") for u in dictStr.split(","))
-
- def is_json_empty(self, resp):
+
+ @staticmethod
+ def is_json_empty(resp):
logger.info("Enter is_json_empty: resp.text: " + resp.text)
- if resp.text == None or len(resp.text) < 2:
+ if resp.text is None or len(resp.text) < 2:
return 'True'
return 'False'
- def Generate_UUID(self):
+ @staticmethod
+ def generate_uuid():
"""generate a uuid"""
return uuid.uuid4()
- def get_json_value_list(self, jsonstr, keyval):
+ @staticmethod
+ def get_json_value_list(jsonstr, keyval):
logger.info("Enter Get_Json_Key_Value_List")
- if jsonstr == None or len(jsonstr) < 2:
+ if jsonstr is None or len(jsonstr) < 2:
logger.info("No Json data found")
return []
try:
@@ -122,12 +136,14 @@ class DcaeLibrary(object):
print str(e)
return []
- def generate_MilliTimestamp_UUID(self):
+ @staticmethod
+ def generate_millitimestamp_uuid():
"""generate a millisecond timestamp uuid"""
then = datetime.datetime.now()
return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
- def test (self):
+ @staticmethod
+ def test():
import json
from pprint import pprint
@@ -138,7 +154,6 @@ class DcaeLibrary(object):
pprint(data)
-
if __name__ == '__main__':
'''
dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284"
@@ -156,4 +171,3 @@ if __name__ == '__main__':
ret = lib.setup_dmaap_server()
print ret
time.sleep(100000)
-