diff options
Diffstat (limited to 'tests/dcaegen2/prh-testcases/resources')
-rw-r--r-- | tests/dcaegen2/prh-testcases/resources/PrhLibrary.py | 75 | ||||
-rw-r--r-- | tests/dcaegen2/prh-testcases/resources/prh_library.robot | 5 |
2 files changed, 64 insertions, 16 deletions
diff --git a/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py b/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py index d413be58..1a3875b9 100644 --- a/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py +++ b/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py @@ -13,29 +13,75 @@ class PrhLibrary(object): def check_for_log(search_for): client = docker.from_env() container = client.containers.get('prh') + #print ("Check for log searches for pattern: ", search_for ) for line in container.logs(stream=True): + #print ("Check for log analysis line: ", line ) if search_for in line.strip(): return True else: return False @staticmethod - def create_pnf_ready_notification(json_file): + def create_pnf_ready_notification_from_ves(json_file): json_to_python = json.loads(json_file) - ipv4 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV4IpAddress") - ipv6 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in json_to_python["event"]["pnfRegistrationFields"] else "" - serial_number = json_to_python.get("event").get("pnfRegistrationFields").get("serial-number") if "serial-number" in json_to_python["event"]["pnfRegistrationFields"] else "" - equip_vendor = json_to_python.get("event").get("pnfRegistrationFields").get("equip-vendor") if "equip-vendor" in json_to_python["event"]["pnfRegistrationFields"] else "" - equip_model = json_to_python.get("event").get("pnfRegistrationFields").get("equip-model") if "equip-model" in json_to_python["event"]["pnfRegistrationFields"] else "" - equip_type = json_to_python.get("event").get("pnfRegistrationFields").get("equip-type") if "equip-type" in json_to_python["event"]["pnfRegistrationFields"] else "" - nf_role = json_to_python.get("event").get("pnfRegistrationFields").get("nf-role") if "nf-role" in json_to_python["event"]["pnfRegistrationFields"] else "" - sw_version = json_to_python.get("event").get("pnfRegistrationFields").get("sw-version") if "sw-version" in json_to_python["event"]["pnfRegistrationFields"] else "" - correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") - str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + equip_vendor + '","equip-model":"' + equip_model + '","equip-type":"' + equip_type + '","nf-role":"' + nf_role + '","sw-version":"' + sw_version + '"}' + ipv4 = PrhLibrary.extract_ip_v4(json_to_python) + ipv6 = PrhLibrary.extract_ip_v6(json_to_python) + correlation_id = PrhLibrary.extract_correlation_id(json_to_python) + serial_number = PrhLibrary.extract_serial_number(json_to_python) + vendor_name = PrhLibrary.extract_vendor_name(json_to_python) + model_number = PrhLibrary.extract_model_number(json_to_python) + unit_type = PrhLibrary.extract_unit_type(json_to_python) + + str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serialNumber":"' + serial_number + '","vendorName":"' + vendor_name + '","modelNumber":"' + model_number + '","unitType":"' + unit_type + '"}' python_to_json = json.dumps(str_json) return python_to_json.replace("\\", "")[1:-1] @staticmethod + def create_pnf_ready_notification_as_pnf_ready(json_file): + json_to_python = json.loads(json_file) + ipv4 = PrhLibrary.extract_ip_v4(json_to_python) + ipv6 = PrhLibrary.extract_ip_v6(json_to_python) + correlation_id = PrhLibrary.extract_correlation_id(json_to_python) + serial_number = PrhLibrary.extract_serial_number(json_to_python) + vendor_name = PrhLibrary.extract_vendor_name(json_to_python) + model_number = PrhLibrary.extract_model_number(json_to_python) + unit_type = PrhLibrary.extract_unit_type(json_to_python) + + nf_role = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else "" + + str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + vendor_name + '","equip-model":"' + model_number + '","equip-type":"' + unit_type + '","nf-role":"' + nf_role + '","sw-version":""}' + python_to_json = json.dumps(str_json) + return python_to_json.replace("\\", "")[1:-1] + + @staticmethod + def extract_ip_v4(content): + return content.get("event").get("pnfRegistrationFields").get("oamV4IpAddress") if "oamV4IpAddress" in content["event"]["pnfRegistrationFields"] else "" + + @staticmethod + def extract_ip_v6(content): + return content.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in content["event"]["pnfRegistrationFields"] else "" + + @staticmethod + def extract_correlation_id(content): + return content.get("event").get("commonEventHeader").get("sourceName") if "sourceName" in content["event"]["commonEventHeader"] else "" + + @staticmethod + def extract_serial_number(content): + return content.get("event").get("pnfRegistrationFields").get("serialNumber") if "serialNumber" in content["event"]["pnfRegistrationFields"] else "" + + @staticmethod + def extract_vendor_name(content): + return content.get("event").get("pnfRegistrationFields").get("vendorName") if "vendorName" in content["event"]["pnfRegistrationFields"] else "" + + @staticmethod + def extract_model_number(content): + return content.get("event").get("pnfRegistrationFields").get("modelNumber") if "modelNumber" in content["event"]["pnfRegistrationFields"] else "" + + @staticmethod + def extract_unit_type(content): + return content.get("event").get("pnfRegistrationFields").get("unitType") if "unitType" in content["event"]["pnfRegistrationFields"] else "" + + @staticmethod def create_pnf_name(json_file): json_to_python = json.loads(json_file) correlation_id = json_to_python.get("sourceName") @@ -83,6 +129,7 @@ class PrhLibrary(object): def create_invalid_notification(self, json_file): - return self.create_pnf_ready_notification(json_file).replace("\":", "\": ")\ - .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam", "oamV6IpAddress")\ - .replace("}", "\\n}") + invalidate_input = self.create_pnf_ready_notification_from_ves(json_file).replace("\":", "\": ") \ + .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam","oamV6IpAddress").replace("}",'') + invalidate_input__and_add_additional_fields = invalidate_input + ',"nfNamingCode": ""' + "," + '"softwareVersion": ""' +"\\n" + return invalidate_input__and_add_additional_fields diff --git a/tests/dcaegen2/prh-testcases/resources/prh_library.robot b/tests/dcaegen2/prh-testcases/resources/prh_library.robot index 71753875..105365c3 100644 --- a/tests/dcaegen2/prh-testcases/resources/prh_library.robot +++ b/tests/dcaegen2/prh-testcases/resources/prh_library.robot @@ -32,11 +32,12 @@ Valid event processing [Arguments] ${input_valid_event_in_dmaap} [Timeout] 30s ${data}= Get Data From File ${input_valid_event_in_dmaap} - ${posted_event_to_dmaap}= Create PNF_Ready notification ${data} + ${posted_event_to_dmaap}= create pnf ready notification from ves ${data} ${pnf_name}= Create PNF name ${data} Set PNF name in AAI ${pnf_name} Set event in DMaaP ${data} - Wait Until Keyword Succeeds 100x 300ms Check PNF_READY notification ${posted_event_to_dmaap} + ${expected_event_pnf_ready_in_dpaap}= create pnf ready_notification as pnf ready ${data} + Wait Until Keyword Succeeds 100x 300ms Check PNF_READY notification ${expected_event_pnf_ready_in_dpaap} Check PRH log [Arguments] ${searched_log} |