import json import re import docker import time import datetime class PrhLibrary(object): ROBOT_LIBRARY_SCOPE = 'TEST SUITE' ROBOT_LISTENER_API_VERSION = 2 def __init__(self): self.ROBOT_LIBRARY_LISTENER = self def _start_test(self, name, attrs): # http://robotframework.org/robotframework/latest/RobotFrameworkUserGuide.html#test-libraries-as-listeners self.test_start_time = self.get_current_utc_datetime() @staticmethod def get_current_utc_datetime(): return datetime.datetime.utcnow() def get_docker_logs_since_test_start(self, container_id): return self.get_docker_logs(container_id, self.test_start_time) @staticmethod def get_docker_logs(container_id, since=None): container = PrhLibrary.__get_docker_container(container_id) return container.logs(stream=False, since=since) def wait_for_one_of_docker_log_entries(self, container_id, searched_entries): print("Looking for: %s" % searched_entries) container = PrhLibrary.__get_docker_container(container_id) print("Log lines:") for line in container.logs(stream=True, since=self.test_start_time): print(line) for searched_entry in searched_entries: if searched_entry in line.decode(encoding='utf-8').strip(): return True else: return False def wait_for_log_entry_with_json_message(self, prefix, json_message): print("Looking for:") print("Prefix: " + str(prefix)) print("Json: " + str(json_message)) try: decoded_message = json.loads(json_message) except json.JSONDecodeError: print("Could not decode given message") return False pattern = re.compile(prefix + "(.*)$") container = PrhLibrary.__get_docker_container('prh') print("Log lines:") for line in container.logs(stream=True, since=self.test_start_time): print(line) if PrhLibrary.__same_json_in_log(decoded_message, line, pattern): return True else: return False @staticmethod def create_invalid_notification(json_file): output = {} input = json.loads(json_file) output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input) output["oamV4IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV4IpAddress") output["oamV6IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV6IpAddress") output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber") output["vendorName"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName") output["modelNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber") output["unitType"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType") output['nfNamingCode'] = '' output['softwareVersion'] = '' output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input) return json.dumps(output) @staticmethod def create_pnf_ready_notification_as_pnf_ready(json_file): output = {} input = json.loads(json_file)[0] output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input) output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber") output["equip-vendor"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName") output["equip-model"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber") output["equip-type"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType") output["nf-role"] = PrhLibrary.__extract_nf_role(input) output["sw-version"] = "" output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input) return json.dumps(output) @staticmethod def ensure_container_is_running(name): client = docker.from_env() if not PrhLibrary.is_in_status(client, name, "running"): print ("starting container", name) container = client.containers.get(name) container.start() PrhLibrary.wait_for_status(client, name, "running") PrhLibrary.print_status(client) @staticmethod def ensure_container_is_exited(name): client = docker.from_env() if not PrhLibrary.is_in_status(client, name, "exited"): print ("stopping container", name) container = client.containers.get(name) container.stop() PrhLibrary.wait_for_status(client, name, "exited") PrhLibrary.print_status(client) @staticmethod def print_status(client): print("containers status") for c in client.containers.list(all=True): print(c.name, " ", c.status) @staticmethod def wait_for_status(client, name, status): while not PrhLibrary.is_in_status(client, name, status): print ("waiting for container: ", name, "to be in status: ", status) time.sleep(3) @staticmethod def is_in_status(client, name, status): return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1 @staticmethod def create_pnf_name(json_file): json_to_python = json.loads(json_file) correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",' return correlation_id @staticmethod def __get_additional_fields_as_key_value_pairs(content): return content.get("event").get("pnfRegistrationFields").get( "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {} @staticmethod def __extract_value_from_pnfRegistrationFields(content, key): return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else '' @staticmethod def __extract_correlation_id_value(content): return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else '' @staticmethod def __extract_nf_role(content): return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else '' @staticmethod def __same_json_in_log(decoded_message, line, pattern): extracted_json = PrhLibrary.__extract_json(line, pattern) if extracted_json is not None: print("Found json: " + extracted_json) try: if json.loads(extracted_json) == decoded_message: return True except json.JSONDecodeError: print("Could not decode") return False @staticmethod def __extract_json(line, pattern): full_message = PrhLibrary.__extract_full_message_from_line(line) if full_message is not None: match = pattern.match(full_message.decode(encoding='utf-8')) if match: return match.group(1).replace("\\n", "\n").replace("\\t", "\t") return None @staticmethod def __extract_full_message_from_line(line): split = line.split(bytes('|', 'utf-8')) if len(split) > 3: return split[3] return None @staticmethod def __get_docker_container(container_id): docker_client = docker.from_env() return docker_client.containers.get(container_id)