1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
import json
import docker
import time
class PrhLibrary(object):
def __init__(self):
pass
@staticmethod
def check_for_log(search_for):
client = docker.from_env()
container = client.containers.get('prh')
print ("Check for log searches for pattern: ", search_for )
for line in container.logs(stream=True):
print ("Check for log analysis line: ", line )
if search_for in line.strip():
return True
else:
return False
@staticmethod
def create_invalid_notification(json_file):
json_to_python = json.loads(json_file)
correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV4IpAddress", "oamV4IpAddress")
ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV6IpAddress", "oamV6IpAddress")
serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "serialNumber", "serialNumber")
vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "vendorName", "vendorName")
model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "modelNumber", "modelNumber")
unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "unitType", "unitType")
additional_fields = PrhLibrary.extract_additional_fields(json_to_python)
str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields
return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}'
@staticmethod
def create_pnf_ready_notification_as_pnf_ready(json_file):
json_to_python = json.loads(json_file)
correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python)
str_json = '{' + correlation_id + additional_fields
return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1]
@staticmethod
def extract_additional_fields_value(content):
fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
if len(fields) == 0:
return ""
return PrhLibrary.build_additional_fields_json(fields)
@staticmethod
def extract_additional_fields(content):
fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
if fields == []:
return '"additionalFields":null'
return PrhLibrary.build_additional_fields_json(fields)
@staticmethod
def get_additional_fields_as_key_value_pairs(content):
return content.get("event").get("pnfRegistrationFields").get(
"additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else []
@staticmethod
def build_additional_fields_json(fields):
res = '"additionalFields":{'
for f in fields:
res += '"' + f + '":"' + fields.get(f) + '",'
return res.rstrip(',') + '},'
@staticmethod
def extract_value_from_pnfRegistrationFields(content, name, key):
return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",')
@staticmethod
def extract_correlation_id_value(content, name):
return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",')
@staticmethod
def create_pnf_name(json_file):
json_to_python = json.loads(json_file)
correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
return correlation_id
@staticmethod
def ensure_container_is_running(name):
client = docker.from_env()
if not PrhLibrary.is_in_status(client, name, "running"):
print ("starting container", name)
container = client.containers.get(name)
container.start()
PrhLibrary.wait_for_status(client, name, "running")
PrhLibrary.print_status(client)
@staticmethod
def ensure_container_is_exited(name):
client = docker.from_env()
if not PrhLibrary.is_in_status(client, name, "exited"):
print ("stopping container", name)
container = client.containers.get(name)
container.stop()
PrhLibrary.wait_for_status(client, name, "exited")
PrhLibrary.print_status(client)
@staticmethod
def print_status(client):
print("containers status")
for c in client.containers.list(all=True):
print(c.name, " ", c.status)
@staticmethod
def wait_for_status(client, name, status):
while not PrhLibrary.is_in_status(client, name, status):
print ("waiting for container: ", name, "to be in status: ", status)
time.sleep(3)
@staticmethod
def is_in_status(client, name, status):
return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
|