aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py
blob: 1a3875b9a14a5b0d07d892eb89ccf5fa367d5088 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import json

import docker
import time


class PrhLibrary(object):

    def __init__(self):
        pass

    @staticmethod
    def check_for_log(search_for):
        client = docker.from_env()
        container = client.containers.get('prh')
        #print ("Check for log searches for pattern: ", search_for )
        for line in container.logs(stream=True):
            #print ("Check for log analysis line: ", line )
            if search_for in line.strip():
                return True
        else:
            return False

    @staticmethod
    def create_pnf_ready_notification_from_ves(json_file):
        json_to_python = json.loads(json_file)
        ipv4 =  PrhLibrary.extract_ip_v4(json_to_python)
        ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
        correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
        serial_number = PrhLibrary.extract_serial_number(json_to_python)
        vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
        model_number = PrhLibrary.extract_model_number(json_to_python)
        unit_type = PrhLibrary.extract_unit_type(json_to_python)

        str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serialNumber":"' + serial_number + '","vendorName":"' + vendor_name  + '","modelNumber":"' + model_number + '","unitType":"' + unit_type + '"}'
        python_to_json = json.dumps(str_json)
        return python_to_json.replace("\\", "")[1:-1]

    @staticmethod
    def create_pnf_ready_notification_as_pnf_ready(json_file):
        json_to_python = json.loads(json_file)
        ipv4 =  PrhLibrary.extract_ip_v4(json_to_python)
        ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
        correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
        serial_number = PrhLibrary.extract_serial_number(json_to_python)
        vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
        model_number = PrhLibrary.extract_model_number(json_to_python)
        unit_type = PrhLibrary.extract_unit_type(json_to_python)

        nf_role  = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else ""

        str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + vendor_name  + '","equip-model":"' + model_number + '","equip-type":"' + unit_type + '","nf-role":"' + nf_role + '","sw-version":""}'
        python_to_json = json.dumps(str_json)
        return python_to_json.replace("\\", "")[1:-1]

    @staticmethod
    def extract_ip_v4(content):
        return content.get("event").get("pnfRegistrationFields").get("oamV4IpAddress") if "oamV4IpAddress" in content["event"]["pnfRegistrationFields"] else ""

    @staticmethod
    def extract_ip_v6(content):
        return content.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in content["event"]["pnfRegistrationFields"] else ""

    @staticmethod
    def extract_correlation_id(content):
        return content.get("event").get("commonEventHeader").get("sourceName") if "sourceName" in content["event"]["commonEventHeader"] else ""

    @staticmethod
    def extract_serial_number(content):
        return content.get("event").get("pnfRegistrationFields").get("serialNumber") if "serialNumber" in content["event"]["pnfRegistrationFields"] else ""

    @staticmethod
    def extract_vendor_name(content):
        return content.get("event").get("pnfRegistrationFields").get("vendorName") if "vendorName" in content["event"]["pnfRegistrationFields"] else ""

    @staticmethod
    def extract_model_number(content):
        return content.get("event").get("pnfRegistrationFields").get("modelNumber") if "modelNumber" in content["event"]["pnfRegistrationFields"] else ""

    @staticmethod
    def extract_unit_type(content):
        return content.get("event").get("pnfRegistrationFields").get("unitType") if "unitType" in content["event"]["pnfRegistrationFields"] else ""

    @staticmethod
    def create_pnf_name(json_file):
        json_to_python = json.loads(json_file)
        correlation_id = json_to_python.get("sourceName")
        return correlation_id

    @staticmethod
    def ensure_container_is_running(name):
        client = docker.from_env()

        if not PrhLibrary.is_in_status(client, name, "running"):
            print ("starting container", name)
            container = client.containers.get(name)
            container.start()
            PrhLibrary.wait_for_status(client, name, "running")

        PrhLibrary.print_status(client)

    @staticmethod
    def ensure_container_is_exited(name):
        client = docker.from_env()

        if not PrhLibrary.is_in_status(client, name, "exited"):
            print ("stopping container", name)
            container = client.containers.get(name)
            container.stop()
            PrhLibrary.wait_for_status(client, name, "exited")

        PrhLibrary.print_status(client)

    @staticmethod
    def print_status(client):
        print("containers status")
        for c in client.containers.list(all=True):
            print(c.name, "   ", c.status)

    @staticmethod
    def wait_for_status(client, name, status):
        while not PrhLibrary.is_in_status(client, name, status):
            print ("waiting for container: ", name, "to be in status: ", status)
            time.sleep(3)

    @staticmethod
    def is_in_status(client, name, status):
        return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1


    def create_invalid_notification(self, json_file):
        invalidate_input = self.create_pnf_ready_notification_from_ves(json_file).replace("\":", "\": ") \
            .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam","oamV6IpAddress").replace("}",'')
        invalidate_input__and_add_additional_fields = invalidate_input + ',"nfNamingCode": ""' + "," + '"softwareVersion": ""' +"\\n"
        return invalidate_input__and_add_additional_fields