blob: d5dd972161838bab7dbd14ce98a82523d9df2b7f (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
'''
Created on Aug 18, 2017
@author: sw6830
'''
from robot.api import logger
import uuid
import time
import datetime
import json
import os
import platform
import subprocess
import paramiko
class DcaeLibrary(object):
def __init__(self):
pass
@staticmethod
def override_collector_properties(properties_path):
global client
if 'Windows' in platform.system():
try:
DcaeLibrary.change_properties_for_windows_platform_system(properties_path)
finally:
client.close()
return
DcaeLibrary.change_properties_for_non_windows_platform_system(properties_path)
return
@staticmethod
def change_properties_for_non_windows_platform_system(properties_path):
ws = os.environ['WORKSPACE']
script2run = ws + '/tests/dcaegen2/testcases/resources/override_collector_properties.sh'
logger.info("Running script: " + script2run)
logger.console("Running script: " + script2run)
subprocess.call([script2run, properties_path])
time.sleep(5)
@staticmethod
def change_properties_for_windows_platform_system(properties_path):
global client
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
stdin, stdout, stderr = client.exec_command(
'%{WORKSPACE}' + '/tests/dcaegen2/testcases/resources/override_collector_properties.sh', properties_path)
logger.console(stdout.read())
@staticmethod
def is_json_empty(resp):
logger.info("Enter is_json_empty: resp.text: " + resp.text)
if resp.text is None or len(resp.text) < 2:
return 'True'
return 'False'
@staticmethod
def generate_uuid():
"""generate a uuid"""
return uuid.uuid4()
@staticmethod
def get_json_value_list(jsonstr, keyval):
logger.info("Enter Get_Json_Key_Value_List")
if jsonstr is None or len(jsonstr) < 2:
logger.info("No Json data found")
return []
try:
return DcaeLibrary.extract_list_of_items_from_json_string(jsonstr, keyval)
except Exception as e:
logger.info("Json data parsing fails")
print str(e)
return []
@staticmethod
def extract_list_of_items_from_json_string(jsonstr, keyval):
data = json.loads(jsonstr)
nodelist = []
for item in data:
nodelist.append(item[keyval])
return nodelist
@staticmethod
def generate_millitimestamp_uuid():
"""generate a millisecond timestamp uuid"""
then = datetime.datetime.now()
return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
@staticmethod
def test():
import json
from pprint import pprint
with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:
data = json.load(data_file)
data['event']['commonEventHeader']['version'] = '5.0'
pprint(data)
if __name__ == '__main__':
lib = DcaeLibrary()
lib.enable_vesc_https_auth()
ret = lib.setup_dmaap_server()
print ret
time.sleep(100000)
|