aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2/prh-testcases/resources
diff options
context:
space:
mode:
Diffstat (limited to 'tests/dcaegen2/prh-testcases/resources')
-rw-r--r--tests/dcaegen2/prh-testcases/resources/PrhLibrary.py75
-rw-r--r--tests/dcaegen2/prh-testcases/resources/prh_library.robot5
2 files changed, 64 insertions, 16 deletions
diff --git a/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py b/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py
index d413be58..1a3875b9 100644
--- a/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py
+++ b/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py
@@ -13,29 +13,75 @@ class PrhLibrary(object):
def check_for_log(search_for):
client = docker.from_env()
container = client.containers.get('prh')
+ #print ("Check for log searches for pattern: ", search_for )
for line in container.logs(stream=True):
+ #print ("Check for log analysis line: ", line )
if search_for in line.strip():
return True
else:
return False
@staticmethod
- def create_pnf_ready_notification(json_file):
+ def create_pnf_ready_notification_from_ves(json_file):
json_to_python = json.loads(json_file)
- ipv4 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV4IpAddress")
- ipv6 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in json_to_python["event"]["pnfRegistrationFields"] else ""
- serial_number = json_to_python.get("event").get("pnfRegistrationFields").get("serial-number") if "serial-number" in json_to_python["event"]["pnfRegistrationFields"] else ""
- equip_vendor = json_to_python.get("event").get("pnfRegistrationFields").get("equip-vendor") if "equip-vendor" in json_to_python["event"]["pnfRegistrationFields"] else ""
- equip_model = json_to_python.get("event").get("pnfRegistrationFields").get("equip-model") if "equip-model" in json_to_python["event"]["pnfRegistrationFields"] else ""
- equip_type = json_to_python.get("event").get("pnfRegistrationFields").get("equip-type") if "equip-type" in json_to_python["event"]["pnfRegistrationFields"] else ""
- nf_role = json_to_python.get("event").get("pnfRegistrationFields").get("nf-role") if "nf-role" in json_to_python["event"]["pnfRegistrationFields"] else ""
- sw_version = json_to_python.get("event").get("pnfRegistrationFields").get("sw-version") if "sw-version" in json_to_python["event"]["pnfRegistrationFields"] else ""
- correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
- str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + equip_vendor + '","equip-model":"' + equip_model + '","equip-type":"' + equip_type + '","nf-role":"' + nf_role + '","sw-version":"' + sw_version + '"}'
+ ipv4 = PrhLibrary.extract_ip_v4(json_to_python)
+ ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
+ correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
+ serial_number = PrhLibrary.extract_serial_number(json_to_python)
+ vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
+ model_number = PrhLibrary.extract_model_number(json_to_python)
+ unit_type = PrhLibrary.extract_unit_type(json_to_python)
+
+ str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serialNumber":"' + serial_number + '","vendorName":"' + vendor_name + '","modelNumber":"' + model_number + '","unitType":"' + unit_type + '"}'
python_to_json = json.dumps(str_json)
return python_to_json.replace("\\", "")[1:-1]
@staticmethod
+ def create_pnf_ready_notification_as_pnf_ready(json_file):
+ json_to_python = json.loads(json_file)
+ ipv4 = PrhLibrary.extract_ip_v4(json_to_python)
+ ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
+ correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
+ serial_number = PrhLibrary.extract_serial_number(json_to_python)
+ vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
+ model_number = PrhLibrary.extract_model_number(json_to_python)
+ unit_type = PrhLibrary.extract_unit_type(json_to_python)
+
+ nf_role = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else ""
+
+ str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + vendor_name + '","equip-model":"' + model_number + '","equip-type":"' + unit_type + '","nf-role":"' + nf_role + '","sw-version":""}'
+ python_to_json = json.dumps(str_json)
+ return python_to_json.replace("\\", "")[1:-1]
+
+ @staticmethod
+ def extract_ip_v4(content):
+ return content.get("event").get("pnfRegistrationFields").get("oamV4IpAddress") if "oamV4IpAddress" in content["event"]["pnfRegistrationFields"] else ""
+
+ @staticmethod
+ def extract_ip_v6(content):
+ return content.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in content["event"]["pnfRegistrationFields"] else ""
+
+ @staticmethod
+ def extract_correlation_id(content):
+ return content.get("event").get("commonEventHeader").get("sourceName") if "sourceName" in content["event"]["commonEventHeader"] else ""
+
+ @staticmethod
+ def extract_serial_number(content):
+ return content.get("event").get("pnfRegistrationFields").get("serialNumber") if "serialNumber" in content["event"]["pnfRegistrationFields"] else ""
+
+ @staticmethod
+ def extract_vendor_name(content):
+ return content.get("event").get("pnfRegistrationFields").get("vendorName") if "vendorName" in content["event"]["pnfRegistrationFields"] else ""
+
+ @staticmethod
+ def extract_model_number(content):
+ return content.get("event").get("pnfRegistrationFields").get("modelNumber") if "modelNumber" in content["event"]["pnfRegistrationFields"] else ""
+
+ @staticmethod
+ def extract_unit_type(content):
+ return content.get("event").get("pnfRegistrationFields").get("unitType") if "unitType" in content["event"]["pnfRegistrationFields"] else ""
+
+ @staticmethod
def create_pnf_name(json_file):
json_to_python = json.loads(json_file)
correlation_id = json_to_python.get("sourceName")
@@ -83,6 +129,7 @@ class PrhLibrary(object):
def create_invalid_notification(self, json_file):
- return self.create_pnf_ready_notification(json_file).replace("\":", "\": ")\
- .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam", "oamV6IpAddress")\
- .replace("}", "\\n}")
+ invalidate_input = self.create_pnf_ready_notification_from_ves(json_file).replace("\":", "\": ") \
+ .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam","oamV6IpAddress").replace("}",'')
+ invalidate_input__and_add_additional_fields = invalidate_input + ',"nfNamingCode": ""' + "," + '"softwareVersion": ""' +"\\n"
+ return invalidate_input__and_add_additional_fields
diff --git a/tests/dcaegen2/prh-testcases/resources/prh_library.robot b/tests/dcaegen2/prh-testcases/resources/prh_library.robot
index 71753875..105365c3 100644
--- a/tests/dcaegen2/prh-testcases/resources/prh_library.robot
+++ b/tests/dcaegen2/prh-testcases/resources/prh_library.robot
@@ -32,11 +32,12 @@ Valid event processing
[Arguments] ${input_valid_event_in_dmaap}
[Timeout] 30s
${data}= Get Data From File ${input_valid_event_in_dmaap}
- ${posted_event_to_dmaap}= Create PNF_Ready notification ${data}
+ ${posted_event_to_dmaap}= create pnf ready notification from ves ${data}
${pnf_name}= Create PNF name ${data}
Set PNF name in AAI ${pnf_name}
Set event in DMaaP ${data}
- Wait Until Keyword Succeeds 100x 300ms Check PNF_READY notification ${posted_event_to_dmaap}
+ ${expected_event_pnf_ready_in_dpaap}= create pnf ready_notification as pnf ready ${data}
+ Wait Until Keyword Succeeds 100x 300ms Check PNF_READY notification ${expected_event_pnf_ready_in_dpaap}
Check PRH log
[Arguments] ${searched_log}