aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2-services-bbs-event-processor/bbs-testcases/resources/BbsLibrary.py
blob: 8dbdc5a3d47577b170d746a030e40abc8b337fc2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import json

import docker
import time
from docker.utils.json_stream import json_stream
from collections import OrderedDict


class BbsLibrary(object):

    def __init__(self):
        pass

    @staticmethod
    def check_for_log(search_for):
        client = docker.from_env()
        container = client.containers.get('bbs')

        alog = container.logs(stream=False, tail=1000)
        try:
            alog = alog.decode()
        except AttributeError:
            pass

        found = alog.find(search_for)
        if found != -1:
            return True
        else:
            return False

    @staticmethod
    def create_pnf_name_from_auth(json_file):
        json_to_python = json.loads(json_file)
        correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
        return correlation_id

    @staticmethod
    def get_invalid_auth_elements(json_file):
        """
        Get the correlationId, oldState, newState, stateInterface, macAddress, swVersion elements
        from the invalid message and place the elements into a JSON object (string) as fields for comparision
        """
        json_to_python = json.loads(json_file)
        correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
        oldState = json_to_python.get("event").get("stateChangeFields").get("oldState")
        newState = json_to_python.get("event").get("stateChangeFields").get("newState")
        stateInterface = json_to_python.get("event").get("stateChangeFields").get("stateInterface")
        macAddress = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("macAddress")
        swVersion = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("swVersion")
        if swVersion is None:
            swVersion = ""
        
        inv_fields = OrderedDict()

        #inv_fields = dict()
        inv_fields['correlationId'] = correlation_id
        inv_fields['oldState'] = oldState
        inv_fields['newState'] = newState
        inv_fields['stateInterface'] = stateInterface
        inv_fields['macAddress'] = macAddress
        inv_fields['swVersion'] = swVersion
        
        # Transform the dictionary to JSON string
        json_str = json.dumps(inv_fields)
        
        # Need to remove spaces between elements
        json_str = json_str.replace(', ', ',')
        return json_str

    @staticmethod
    def get_invalid_update_elements(json_file):
        """
        Get the correlationId, attachment-point, remote-id, cvlan, svlan, elements
        from the invalid message and place the elements into a JSON object (string) as fields for comparision
        """
        json_to_python = json.loads(json_file)
        correlation_id = json_to_python.get("correlationId")
        attachmentPoint = json_to_python.get("additionalFields").get("attachment-point")
        remoteId = json_to_python.get("additionalFields").get("remote-id")
        cvlan = json_to_python.get("additionalFields").get("cvlan")
        svlan = json_to_python.get("additionalFields").get("svlan")
        
        inv_fields = OrderedDict()
        #inv_fields = dict()
        inv_fields['correlationId'] = correlation_id
        inv_fields['attachment-point'] = attachmentPoint
        inv_fields['remote-id'] = remoteId
        inv_fields['cvlan'] = cvlan
        inv_fields['svlan'] = svlan
        
        # Transform the dictionary to JSON string
        json_str = json.dumps(inv_fields)
        
        # Need to remove spaces between elements
        json_str = json_str.replace(', ', ',')
        return json_str

    @staticmethod
    def compare_policy(dmaap_policy, json_policy):
        resp = False
        try:
            python_policy = json.loads(json_policy).pop()
        except:
            python_policy = ""
        
        try:
            python_dmaap_policy = json.loads(dmaap_policy)
        except:
            python_dmaap_policy = ""

        try:
            d_policy = python_dmaap_policy.get("policyName")
        except:
            d_policy = ""

        try:
            j_policy = python_policy.get("policyName")
        except:
            return "False"
        
        resp = "False"
        if (d_policy == j_policy):
            resp = "True"
        return resp

    @staticmethod
    def create_pnf_name_from_update(json_file):
        json_to_python = json.loads(json_file)
        correlation_id = json_to_python.get("correlationId")
        return correlation_id

    @staticmethod
    def ensure_container_is_running(name):
        
        client = docker.from_env()

        if not BbsLibrary.is_in_status(client, name, "running"):
            print ("starting container", name)
            container = client.containers.get(name)
            container.start()
            BbsLibrary.wait_for_status(client, name, "running")

        BbsLibrary.print_status(client)

    @staticmethod
    def ensure_container_is_exited(name):

        client = docker.from_env()

        if not BbsLibrary.is_in_status(client, name, "exited"):
            print ("stopping container", name)
            container = client.containers.get(name)
            container.stop()
            BbsLibrary.wait_for_status(client, name, "exited")

        BbsLibrary.print_status(client)

    @staticmethod
    def print_status(client):
        print("containers status")
        for c in client.containers.list(all=True):
            print(c.name, "   ", c.status)

    @staticmethod
    def wait_for_status(client, name, status):
        while not BbsLibrary.is_in_status(client, name, status):
            print ("waiting for container: ", name, "to be in status: ", status)
            time.sleep(3)

    @staticmethod
    def is_in_status(client, name, status):
        return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1