summaryrefslogtreecommitdiffstats
path: root/test/csit/tests/dcaegen2/testcases/resources/DcaeLibrary.py
blob: b43ee29e2902fabc01a7af1a66644d46c90067a6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
'''
Created on Aug 18, 2017

@author: sw6830
'''
from robot.api import logger
from Queue import Queue
import uuid
import time
import datetime
import json
import threading
import os
import platform
import subprocess
import paramiko
import DcaeVariables
import DMaaP


class DcaeLibrary(object):
    
    def __init__(self):
        pass 
    
    @staticmethod
    def setup_dmaap_server(port_num=3904):
        if DcaeVariables.HttpServerThread is not None:
            DMaaP.clean_up_event()
            logger.console("Clean up event from event queue before test")
            logger.info("DMaaP Server already started")
            return "true"
        
        DcaeVariables.IsRobotRun = True
        DMaaP.test(port=port_num)
        try:
            DcaeVariables.VESEventQ = Queue()
            DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever)
            DcaeVariables.HttpServerThread.start()
            logger.console("DMaaP Mockup Sever started")
            time.sleep(2)
            return "true"
        except Exception as e:
            print (str(e))
            return "false"
            
    @staticmethod
    def shutdown_dmaap():
        if DcaeVariables.HTTPD is not None:
            DcaeVariables.HTTPD.shutdown()
            logger.console("DMaaP Server shut down")
            time.sleep(3)
            return "true"
        else:
            return "false"
            
    @staticmethod
    def cleanup_ves_events():
        if DcaeVariables.HttpServerThread is not None:
            DMaaP.clean_up_event()
            logger.console("DMaaP event queue is cleaned up")
            return "true"
        logger.console("DMaaP server not started yet")
        return "false"
    
    @staticmethod
    def enable_vesc_https_auth():
        global client
        if 'Windows' in platform.system():
            try:
                client = paramiko.SSHClient()
                client.load_system_host_keys()
                # client.set_missing_host_key_policy(paramiko.WarningPolicy)
                client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                
                client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
                stdin, stdout, stderr = client.exec_command('%{WORKSPACE}/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh')
                logger.console(stdout.read())    
            finally:
                client.close()
            return
        ws = os.environ['WORKSPACE']
        script2run = ws + "/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh"
        logger.info("Running script: " + script2run)
        logger.console("Running script: " + script2run)
        subprocess.call(script2run)
        time.sleep(5)
        return  
                   
    @staticmethod
    def dmaap_message_receive(evtobj, action='contain'):
        
        evt_str = DMaaP.deque_event()
        while evt_str != None:
            logger.console("DMaaP receive VES Event:\n" + evt_str)
            if action == 'contain':
                if evtobj in evt_str:
                    logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
                    return 'true'
            if action == 'sizematch':
                if len(evtobj) == len(evt_str):
                    return 'true'
            if action == 'dictmatch':
                evt_dict = json.loads(evt_str)
                if cmp(evtobj, evt_dict) == 0:
                    return 'true'
            evt_str = DMaaP.deque_event()
        return 'false'

    @staticmethod
    def is_json_empty(resp):
        logger.info("Enter is_json_empty: resp.text: " + resp.text)
        if resp.text is None or len(resp.text) < 2:
            return 'True'
        return 'False'
    
    @staticmethod
    def generate_uuid():
        """generate a uuid"""
        return uuid.uuid4()
    
    @staticmethod
    def get_json_value_list(jsonstr, keyval):
        logger.info("Enter Get_Json_Key_Value_List")
        if jsonstr is None or len(jsonstr) < 2:
            logger.info("No Json data found")
            return []
        try:
            data = json.loads(jsonstr)   
            nodelist = []
            for item in data:
                nodelist.append(item[keyval])
            return nodelist
        except Exception as e:
            logger.info("Json data parsing fails")
            print str(e)
            return []
        
    @staticmethod
    def generate_millitimestamp_uuid():
        """generate a millisecond timestamp uuid"""
        then = datetime.datetime.now()
        return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
    
    @staticmethod
    def test():
        import json
        from pprint import pprint

        with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:    
            data = json.load(data_file)

        data['event']['commonEventHeader']['version'] = '5.0'
        pprint(data)


if __name__ == '__main__':
    '''
    dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284"
    cls = DcaeLibrary()
    #dict = cls.create_header_from_string(dictStr)
    #print str(dict)
    jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]"
    lsObj = cls.get_json_value_list(jsonStr, 'Status')
    print lsObj
    '''
    
    lib = DcaeLibrary()
    lib.enable_vesc_https_auth()
    
    ret = lib.setup_dmaap_server()
    print ret
    time.sleep(100000)