diff options
Diffstat (limited to 'tests/dcaegen2/bbs-testcases/resources/BbsLibrary.py')
-rw-r--r-- | tests/dcaegen2/bbs-testcases/resources/BbsLibrary.py | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/tests/dcaegen2/bbs-testcases/resources/BbsLibrary.py b/tests/dcaegen2/bbs-testcases/resources/BbsLibrary.py new file mode 100644 index 00000000..8dbdc5a3 --- /dev/null +++ b/tests/dcaegen2/bbs-testcases/resources/BbsLibrary.py @@ -0,0 +1,173 @@ +import json + +import docker +import time +from docker.utils.json_stream import json_stream +from collections import OrderedDict + + +class BbsLibrary(object): + + def __init__(self): + pass + + @staticmethod + def check_for_log(search_for): + client = docker.from_env() + container = client.containers.get('bbs') + + alog = container.logs(stream=False, tail=1000) + try: + alog = alog.decode() + except AttributeError: + pass + + found = alog.find(search_for) + if found != -1: + return True + else: + return False + + @staticmethod + def create_pnf_name_from_auth(json_file): + json_to_python = json.loads(json_file) + correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + return correlation_id + + @staticmethod + def get_invalid_auth_elements(json_file): + """ + Get the correlationId, oldState, newState, stateInterface, macAddress, swVersion elements + from the invalid message and place the elements into a JSON object (string) as fields for comparision + """ + json_to_python = json.loads(json_file) + correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + oldState = json_to_python.get("event").get("stateChangeFields").get("oldState") + newState = json_to_python.get("event").get("stateChangeFields").get("newState") + stateInterface = json_to_python.get("event").get("stateChangeFields").get("stateInterface") + macAddress = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("macAddress") + swVersion = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("swVersion") + if swVersion is None: + swVersion = "" + + inv_fields = OrderedDict() + + #inv_fields = dict() + inv_fields['correlationId'] = correlation_id + inv_fields['oldState'] = oldState + inv_fields['newState'] = newState + inv_fields['stateInterface'] = stateInterface + inv_fields['macAddress'] = macAddress + inv_fields['swVersion'] = swVersion + + # Transform the dictionary to JSON string + json_str = json.dumps(inv_fields) + + # Need to remove spaces between elements + json_str = json_str.replace(', ', ',') + return json_str + + @staticmethod + def get_invalid_update_elements(json_file): + """ + Get the correlationId, attachment-point, remote-id, cvlan, svlan, elements + from the invalid message and place the elements into a JSON object (string) as fields for comparision + """ + json_to_python = json.loads(json_file) + correlation_id = json_to_python.get("correlationId") + attachmentPoint = json_to_python.get("additionalFields").get("attachment-point") + remoteId = json_to_python.get("additionalFields").get("remote-id") + cvlan = json_to_python.get("additionalFields").get("cvlan") + svlan = json_to_python.get("additionalFields").get("svlan") + + inv_fields = OrderedDict() + #inv_fields = dict() + inv_fields['correlationId'] = correlation_id + inv_fields['attachment-point'] = attachmentPoint + inv_fields['remote-id'] = remoteId + inv_fields['cvlan'] = cvlan + inv_fields['svlan'] = svlan + + # Transform the dictionary to JSON string + json_str = json.dumps(inv_fields) + + # Need to remove spaces between elements + json_str = json_str.replace(', ', ',') + return json_str + + @staticmethod + def compare_policy(dmaap_policy, json_policy): + resp = False + try: + python_policy = json.loads(json_policy).pop() + except: + python_policy = "" + + try: + python_dmaap_policy = json.loads(dmaap_policy) + except: + python_dmaap_policy = "" + + try: + d_policy = python_dmaap_policy.get("policyName") + except: + d_policy = "" + + try: + j_policy = python_policy.get("policyName") + except: + return "False" + + resp = "False" + if (d_policy == j_policy): + resp = "True" + return resp + + @staticmethod + def create_pnf_name_from_update(json_file): + json_to_python = json.loads(json_file) + correlation_id = json_to_python.get("correlationId") + return correlation_id + + @staticmethod + def ensure_container_is_running(name): + + client = docker.from_env() + + if not BbsLibrary.is_in_status(client, name, "running"): + print ("starting container", name) + container = client.containers.get(name) + container.start() + BbsLibrary.wait_for_status(client, name, "running") + + BbsLibrary.print_status(client) + + @staticmethod + def ensure_container_is_exited(name): + + client = docker.from_env() + + if not BbsLibrary.is_in_status(client, name, "exited"): + print ("stopping container", name) + container = client.containers.get(name) + container.stop() + BbsLibrary.wait_for_status(client, name, "exited") + + BbsLibrary.print_status(client) + + @staticmethod + def print_status(client): + print("containers status") + for c in client.containers.list(all=True): + print(c.name, " ", c.status) + + @staticmethod + def wait_for_status(client, name, status): + while not BbsLibrary.is_in_status(client, name, status): + print ("waiting for container: ", name, "to be in status: ", status) + time.sleep(3) + + @staticmethod + def is_in_status(client, name, status): + return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1 + |