1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
import json
import docker
import time
class PrhLibrary(object):
def __init__(self):
pass
@staticmethod
def find_log_entry(search_for):
print (type(search_for))
client = docker.from_env()
container = client.containers.get('prh')
print ("Check for log searches for pattern: ", search_for )
for line in container.logs(stream=True):
print ("Check for log analysis line: ", line )
if search_for in line.strip():
return True
else:
return False
@staticmethod
def create_invalid_notification(json_file):
event = json.loads(json_file)[0]
correlation_id = PrhLibrary.extract_correlation_id_value(event, "correlationId")
ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV4IpAddress", "oamV4IpAddress")
ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV6IpAddress", "oamV6IpAddress")
serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "serialNumber", "serialNumber")
vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "vendorName", "vendorName")
model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "modelNumber", "modelNumber")
unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "unitType", "unitType")
additional_fields = PrhLibrary.extract_additional_fields(event)
str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields
return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}'
@staticmethod
def create_pnf_ready_notification_as_pnf_ready(json_file):
json_to_python = json.loads(json_file)
correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python)
str_json = '{' + correlation_id + additional_fields
return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1]
@staticmethod
def extract_additional_fields_value(content):
fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
if len(fields) == 0:
return ""
return PrhLibrary.build_additional_fields_json(fields)
@staticmethod
def extract_additional_fields(content):
fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
if fields == []:
return '"additionalFields":null'
return PrhLibrary.build_additional_fields_json(fields)
@staticmethod
def get_additional_fields_as_key_value_pairs(content):
return content.get("event").get("pnfRegistrationFields").get(
"additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else []
@staticmethod
def build_additional_fields_json(fields):
res = '"additionalFields":{'
for f in fields:
res += '"' + f + '":"' + fields.get(f) + '",'
return res.rstrip(',') + '},'
@staticmethod
def extract_value_from_pnfRegistrationFields(content, name, key):
return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",')
@staticmethod
def extract_correlation_id_value(content, name):
return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",')
@staticmethod
def create_pnf_name(json_file):
json_to_python = json.loads(json_file)
correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
return correlation_id
@staticmethod
def ensure_container_is_running(name):
client = docker.from_env()
if not PrhLibrary.is_in_status(client, name, "running"):
print ("starting container", name)
container = client.containers.get(name)
container.start()
PrhLibrary.wait_for_status(client, name, "running")
PrhLibrary.print_status(client)
@staticmethod
def ensure_container_is_exited(name):
client = docker.from_env()
if not PrhLibrary.is_in_status(client, name, "exited"):
print ("stopping container", name)
container = client.containers.get(name)
container.stop()
PrhLibrary.wait_for_status(client, name, "exited")
PrhLibrary.print_status(client)
@staticmethod
def print_status(client):
print("containers status")
for c in client.containers.list(all=True):
print(c.name, " ", c.status)
@staticmethod
def wait_for_status(client, name, status):
while not PrhLibrary.is_in_status(client, name, status):
print ("waiting for container: ", name, "to be in status: ", status)
time.sleep(3)
@staticmethod
def is_in_status(client, name, status):
return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
|