blob: a467431f2253eb26ec7d8afb07e16272e6140ba7 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
'''
Created on Aug 18, 2017
@author: sw6830
'''
from robot.api import logger
import uuid
import time
import datetime
import json
import os
import platform
import subprocess
import paramiko
class DcaeLibrary(object):
def __init__(self):
pass
@staticmethod
def enable_vesc_with_cert_basic_auth():
global client
if 'Windows' in platform.system():
try:
DcaeLibrary.enable_https_auth_for_windows_platform_system()
finally:
client.close()
return
DcaeLibrary.enable_https_auth_for_non_windows_platform_system()
return
@staticmethod
def enable_https_auth_for_non_windows_platform_system():
ws = os.environ['WORKSPACE']
script2run = ws + "/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh"
logger.info("Running script: " + script2run)
logger.console("Running script: " + script2run)
subprocess.call(script2run)
time.sleep(5)
@staticmethod
def enable_https_auth_for_windows_platform_system():
global client
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
stdin, stdout, stderr = client.exec_command(
'%{WORKSPACE}/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh')
logger.console(stdout.read())
@staticmethod
def is_json_empty(resp):
logger.info("Enter is_json_empty: resp.text: " + resp.text)
if resp.text is None or len(resp.text) < 2:
return 'True'
return 'False'
@staticmethod
def generate_uuid():
"""generate a uuid"""
return uuid.uuid4()
@staticmethod
def get_json_value_list(jsonstr, keyval):
logger.info("Enter Get_Json_Key_Value_List")
if jsonstr is None or len(jsonstr) < 2:
logger.info("No Json data found")
return []
try:
return DcaeLibrary.extract_list_of_items_from_json_string(jsonstr, keyval)
except Exception as e:
logger.info("Json data parsing fails")
print str(e)
return []
@staticmethod
def extract_list_of_items_from_json_string(jsonstr, keyval):
data = json.loads(jsonstr)
nodelist = []
for item in data:
nodelist.append(item[keyval])
return nodelist
@staticmethod
def generate_millitimestamp_uuid():
"""generate a millisecond timestamp uuid"""
then = datetime.datetime.now()
return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
@staticmethod
def test():
import json
from pprint import pprint
with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:
data = json.load(data_file)
data['event']['commonEventHeader']['version'] = '5.0'
pprint(data)
if __name__ == '__main__':
lib = DcaeLibrary()
lib.enable_vesc_https_auth()
ret = lib.setup_dmaap_server()
print ret
time.sleep(100000)
|