aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2/testcases/resources/robot_library/DcaeLibrary.py
blob: d5dd972161838bab7dbd14ce98a82523d9df2b7f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
'''
Created on Aug 18, 2017

@author: sw6830
'''
from robot.api import logger
import uuid
import time
import datetime
import json
import os
import platform
import subprocess
import paramiko


class DcaeLibrary(object):

    def __init__(self):
        pass

    @staticmethod
    def override_collector_properties(properties_path):
        global client
        if 'Windows' in platform.system():
            try:
                DcaeLibrary.change_properties_for_windows_platform_system(properties_path)
            finally:
                client.close()
            return
        DcaeLibrary.change_properties_for_non_windows_platform_system(properties_path)
        return

    @staticmethod
    def change_properties_for_non_windows_platform_system(properties_path):
        ws = os.environ['WORKSPACE']
        script2run = ws + '/tests/dcaegen2/testcases/resources/override_collector_properties.sh'
        logger.info("Running script: " + script2run)
        logger.console("Running script: " + script2run)
        subprocess.call([script2run, properties_path])
        time.sleep(5)

    @staticmethod
    def change_properties_for_windows_platform_system(properties_path):
        global client
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
        stdin, stdout, stderr = client.exec_command(
            '%{WORKSPACE}' + '/tests/dcaegen2/testcases/resources/override_collector_properties.sh', properties_path)
        logger.console(stdout.read())

    @staticmethod
    def is_json_empty(resp):
        logger.info("Enter is_json_empty: resp.text: " + resp.text)
        if resp.text is None or len(resp.text) < 2:
            return 'True'
        return 'False'
    
    @staticmethod
    def generate_uuid():
        """generate a uuid"""
        return uuid.uuid4()
    
    @staticmethod
    def get_json_value_list(jsonstr, keyval):
        logger.info("Enter Get_Json_Key_Value_List")
        if jsonstr is None or len(jsonstr) < 2:
            logger.info("No Json data found")
            return []
        try:
            return DcaeLibrary.extract_list_of_items_from_json_string(jsonstr, keyval)
        except Exception as e:
            logger.info("Json data parsing fails")
            print str(e)
            return []

    @staticmethod
    def extract_list_of_items_from_json_string(jsonstr, keyval):
        data = json.loads(jsonstr)
        nodelist = []
        for item in data:
            nodelist.append(item[keyval])
        return nodelist

    @staticmethod
    def generate_millitimestamp_uuid():
        """generate a millisecond timestamp uuid"""
        then = datetime.datetime.now()
        return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
    
    @staticmethod
    def test():
        import json
        from pprint import pprint

        with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:    
            data = json.load(data_file)

        data['event']['commonEventHeader']['version'] = '5.0'
        pprint(data)


if __name__ == '__main__':

    lib = DcaeLibrary()
    lib.enable_vesc_https_auth()
    
    ret = lib.setup_dmaap_server()
    print ret
    time.sleep(100000)