1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
'''
Created on Aug 18, 2017
@author: sw6830
'''
from robot.api import logger
from Queue import Queue
import uuid
import time
import datetime
import json
import threading
import os
import platform
import subprocess
import paramiko
import DcaeVariables
import DMaaP
class DcaeLibrary(object):
def __init__(self):
pass
@staticmethod
def setup_dmaap_server(port_num=3904):
if DcaeVariables.HttpServerThread is not None:
DMaaP.clean_up_event()
logger.console("Clean up event from event queue before test")
logger.info("DMaaP Server already started")
return "true"
DcaeVariables.IsRobotRun = True
DMaaP.test(port=port_num)
try:
DcaeVariables.VESEventQ = Queue()
DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever)
DcaeVariables.HttpServerThread.start()
logger.console("DMaaP Mockup Sever started")
time.sleep(2)
return "true"
except Exception as e:
print (str(e))
return "false"
@staticmethod
def shutdown_dmaap():
if DcaeVariables.HTTPD is not None:
DcaeVariables.HTTPD.shutdown()
logger.console("DMaaP Server shut down")
time.sleep(3)
return "true"
else:
return "false"
@staticmethod
def cleanup_ves_events():
if DcaeVariables.HttpServerThread is not None:
DMaaP.clean_up_event()
logger.console("DMaaP event queue is cleaned up")
return "true"
logger.console("DMaaP server not started yet")
return "false"
@staticmethod
def enable_vesc_https_auth():
global client
if 'Windows' in platform.system():
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
# client.set_missing_host_key_policy(paramiko.WarningPolicy)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
stdin, stdout, stderr = client.exec_command('%{WORKSPACE}/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh')
logger.console(stdout.read())
finally:
client.close()
return
ws = os.environ['WORKSPACE']
script2run = ws + "/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh"
logger.info("Running script: " + script2run)
logger.console("Running script: " + script2run)
subprocess.call(script2run)
time.sleep(5)
return
@staticmethod
def dmaap_message_receive(evtobj, action='contain'):
evt_str = DMaaP.deque_event()
while evt_str != None:
logger.console("DMaaP receive VES Event:\n" + evt_str)
if action == 'contain':
if evtobj in evt_str:
logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
return 'true'
if action == 'sizematch':
if len(evtobj) == len(evt_str):
return 'true'
if action == 'dictmatch':
evt_dict = json.loads(evt_str)
if cmp(evtobj, evt_dict) == 0:
return 'true'
evt_str = DMaaP.deque_event()
return 'false'
@staticmethod
def is_json_empty(resp):
logger.info("Enter is_json_empty: resp.text: " + resp.text)
if resp.text is None or len(resp.text) < 2:
return 'True'
return 'False'
@staticmethod
def generate_uuid():
"""generate a uuid"""
return uuid.uuid4()
@staticmethod
def get_json_value_list(jsonstr, keyval):
logger.info("Enter Get_Json_Key_Value_List")
if jsonstr is None or len(jsonstr) < 2:
logger.info("No Json data found")
return []
try:
data = json.loads(jsonstr)
nodelist = []
for item in data:
nodelist.append(item[keyval])
return nodelist
except Exception as e:
logger.info("Json data parsing fails")
print str(e)
return []
@staticmethod
def generate_millitimestamp_uuid():
"""generate a millisecond timestamp uuid"""
then = datetime.datetime.now()
return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
@staticmethod
def test():
import json
from pprint import pprint
with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:
data = json.load(data_file)
data['event']['commonEventHeader']['version'] = '5.0'
pprint(data)
if __name__ == '__main__':
'''
dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284"
cls = DcaeLibrary()
#dict = cls.create_header_from_string(dictStr)
#print str(dict)
jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]"
lsObj = cls.get_json_value_list(jsonStr, 'Status')
print lsObj
'''
lib = DcaeLibrary()
lib.enable_vesc_https_auth()
ret = lib.setup_dmaap_server()
print ret
time.sleep(100000)
|