1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
'''
Created on Aug 18, 2017
@author: sw6830
'''
from robot.api import logger
from Queue import Queue
import uuid, time, datetime,json, threading,os, platform, subprocess,paramiko
import DcaeVariables
import DMaaP
class DcaeLibrary(object):
def __init__(self):
pass
def setup_dmaap_server(self, portNum=3904):
if DcaeVariables.HttpServerThread != None:
DMaaP.cleanUpEvent()
logger.console("Clean up event from event queue before test")
logger.info("DMaaP Server already started")
return "true"
DcaeVariables.IsRobotRun = True
DMaaP.test(port=portNum)
try:
DcaeVariables.VESEventQ = Queue()
DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever)
DcaeVariables.HttpServerThread.start()
logger.console("DMaaP Mockup Sever started")
time.sleep(2)
return "true"
except Exception as e:
print (str(e))
return "false"
def shutdown_dmaap(self):
if DcaeVariables.HTTPD != None:
DcaeVariables.HTTPD.shutdown()
logger.console("DMaaP Server shut down")
time.sleep(3)
return "true"
else:
return "false"
def cleanup_ves_events(self):
if DcaeVariables.HttpServerThread != None:
DMaaP.cleanUpEvent()
logger.console("DMaaP event queue is cleaned up")
return "true"
logger.console("DMaaP server not started yet")
return "false"
def enable_vesc_https_auth(self):
if 'Windows' in platform.system():
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
#client.set_missing_host_key_policy(paramiko.WarningPolicy)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
stdin, stdout, stderr = client.exec_command('%{WORKSPACE}/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh')
logger.console(stdout.read())
finally:
client.close()
return
ws = os.environ['WORKSPACE']
script2run = ws + "/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh"
logger.info("Running script: " + script2run)
logger.console("Running script: " + script2run)
subprocess.call(script2run)
time.sleep(5)
return
def dmaap_message_receive(self, evtobj, action='contain'):
evtStr = DMaaP.dequeEvent()
while evtStr != None:
logger.console("DMaaP receive VES Event:\n" + evtStr)
if action == 'contain':
if evtobj in evtStr:
logger.info("DMaaP Receive Expected Publish Event:\n" + evtStr)
return 'true'
if action == 'sizematch':
if len(evtobj) == len(evtStr):
return 'true'
if action == 'dictmatch':
evtDict = json.loads(evtStr)
if cmp(evtobj, evtDict) == 0:
return 'true'
evtStr = DMaaP.dequeEvent()
return 'false'
def create_header_from_string(self, dictStr):
logger.info("Enter create_header_from_string: dictStr")
return dict(u.split("=") for u in dictStr.split(","))
def is_json_empty(self, resp):
logger.info("Enter is_json_empty: resp.text: " + resp.text)
if resp.text == None or len(resp.text) < 2:
return 'True'
return 'False'
def Generate_UUID(self):
"""generate a uuid"""
return uuid.uuid4()
def get_json_value_list(self, jsonstr, keyval):
logger.info("Enter Get_Json_Key_Value_List")
if jsonstr == None or len(jsonstr) < 2:
logger.info("No Json data found")
return []
try:
data = json.loads(jsonstr)
nodelist = []
for item in data:
nodelist.append(item[keyval])
return nodelist
except Exception as e:
logger.info("Json data parsing fails")
print str(e)
return []
def generate_MilliTimestamp_UUID(self):
"""generate a millisecond timestamp uuid"""
then = datetime.datetime.now()
return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
def test (self):
import json
from pprint import pprint
with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:
data = json.load(data_file)
data['event']['commonEventHeader']['version'] = '5.0'
pprint(data)
if __name__ == '__main__':
'''
dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284"
cls = DcaeLibrary()
#dict = cls.create_header_from_string(dictStr)
#print str(dict)
jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]"
lsObj = cls.get_json_value_list(jsonStr, 'Status')
print lsObj
'''
lib = DcaeLibrary()
lib.enable_vesc_https_auth()
ret = lib.setup_dmaap_server()
print ret
time.sleep(100000)
|