diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/__init__.py | 4 | ||||
-rw-r--r-- | tests/makefile | 2 | ||||
-rw-r--r-- | tests/monkey_psycopg2.py | 207 | ||||
-rw-r--r-- | tests/test_binding.py | 5 | ||||
-rw-r--r-- | tests/test_check_health.py | 65 | ||||
-rw-r--r-- | tests/test_config_notif.py | 435 | ||||
-rw-r--r-- | tests/test_get_logger.py | 35 | ||||
-rw-r--r-- | tests/test_htbtworker.py | 45 | ||||
-rw-r--r-- | tests/test_trapd_exit.py | 4 | ||||
-rw-r--r-- | tests/test_trapd_get_cbs_config.py | 4 | ||||
-rw-r--r-- | tests/test_trapd_http_session.py | 4 | ||||
-rw-r--r-- | tests/test_trapd_runtime_pid.py | 4 | ||||
-rw-r--r-- | tests/test_trapd_settings.py | 5 | ||||
-rw-r--r-- | tests/test_trapd_vnf_table.py | 4 |
14 files changed, 797 insertions, 26 deletions
diff --git a/tests/__init__.py b/tests/__init__.py index 1875bf6..8e53efc 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. # empty __init__.py so that pytest can add correct path to coverage report, -- per pytest diff --git a/tests/makefile b/tests/makefile new file mode 100644 index 0000000..9901727 --- /dev/null +++ b/tests/makefile @@ -0,0 +1,2 @@ +runtox: + cd .. && $(MAKE) runtox diff --git a/tests/monkey_psycopg2.py b/tests/monkey_psycopg2.py new file mode 100644 index 0000000..05f44b4 --- /dev/null +++ b/tests/monkey_psycopg2.py @@ -0,0 +1,207 @@ +# ============LICENSE_START==================================================== +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +""" + +This is a mock psycopg2 module. + +""" + +import psycopg2 + +# These FORCE variables are set by monkey_psycopg2.monkey_reset_forces(). +# If set, they will cause a connection failure, cursor failure, execute failure, +# commit failure, and close failure, respectively. +FORCE_CONNECT_FAILURE = False +FORCE_CURSOR_FAILURE = False +FORCE_EXECUTE_FAILURE = False +FORCE_COMMIT_FAILURE = False +FORCE_CLOSE_FAILURE = False + +# This variable is used by monkey_psycopg2.monkey_set_defaults(multiDbInfo) +# to set up default values to be returned by cursors statements. +DEFAULT_MULTI_DBINFO = { } + +class MockConn(object): + """ + mock Connection interface returned by psycopg2.connect() + """ + + def __init__(self, dbInfo = None): + self.curCmd = None + self.curCursor = None + self.monkey_setDbInfo(dbInfo) + + def monkey_setDbInfo(self, dbInfo): + """ + Set up a set of defaults for the cursors on "select" statements. + The outer scope is a string that will be matched against the currently-active + select statement being executed. + If there is a match, the specified values are returned by the cursor. + dbconn.monkey_setDbInfo({ + "hb_common": [ + [ 1, "sn1", 31, "st1" ], + [ 2, "sn2", 32, "st2" ] + ] + }) + """ + self.dbInfo = dbInfo + + def commit(self): + """ + mock commit. + Do nothing unless FORCE_COMMIT_FAILURE is set. + + Will raise an exception if value of 'FORCE_COMMIT_FAILURE' is true. + Used to force failure for certain code paths. + """ + if FORCE_COMMIT_FAILURE: + raise psycopg2.DatabaseError(f"Unable to commit: force_commit_failure=<{FORCE_COMMIT_FAILURE}>") + + def close(self): + """ + mock close + Do nothing unless FORCE_CLOSE_FAILURE is set. + + Will raise an exception if value of 'FORCE_CLOSE_FAILURE' is true. + Used to force failure for certain code paths. + """ + if FORCE_CLOSE_FAILURE: + raise psycopg2.DatabaseError(f"Unable to close: force_close_failure=<{FORCE_CLOSE_FAILURE}>") + + def __enter__(self): + """ + method needed to implement a context manager + """ + return self + + # pylint: disable=redefined-outer-name,redefined-builtin + def __exit__(self, type, value, traceback): + """ + method needed to implement a context manager + """ + pass + + def cursor(self): + """ + mock cursor + + Will raise an exception if value of 'FORCE_CURSOR_FAILURE' is true. + Used to force failure for certain code paths. + """ + if FORCE_CURSOR_FAILURE: + raise psycopg2.DatabaseError(f"Unable to return cursor: force_cursor_failure=<{FORCE_CURSOR_FAILURE}>") + + print(f"cursor()") + self.curCursor = None + return self + + def execute(self, cmd, args = None): + """ + mock execute + + Will raise an exception if value of 'FORCE_EXECUTE_FAILURE' is true. + Used to force failure for certain code paths. + + A side affect is that the cursor's values will be set based on a match + with the command (lower-cased) being executed. + """ + # pylint: disable=global-statement,no-self-use + + print(f"execute({cmd},{args})") + self.curCmd = cmd + if FORCE_EXECUTE_FAILURE: + raise Exception("postgres execute command throwing exception. cmd=<{}>".format(cmd)) + + if self.dbInfo: + curCmdLower = cmd.lower() + for cmd, val in self.dbInfo.items(): + print(f"cmd={cmd}, val={val}") + if cmd in curCmdLower: + self.curCursor = val + break + + def fetchone(self): + """ + return a single row from the current cursor + """ + if not self.curCursor: + print(f"fetchone() returning None") + return None + print(f"fetchone() returning {self.curCursor[0]}") + return self.curCursor[0] + + def fetchall(self): + """ + return all rows from the current cursor + """ + if not self.curCursor: + print(f"fetchall() returning None") + return None + print(f"fetchall() returning {self.curCursor}") + return self.curCursor + +def monkey_reset_forces(connect=False, cursor=False, execute=False, commit=False, close=False): + print(f"monkey_reset_forces({connect}, {cursor}, {execute}, {commit}, {close})") + global FORCE_CONNECT_FAILURE + FORCE_CONNECT_FAILURE = connect + global FORCE_CURSOR_FAILURE + FORCE_CURSOR_FAILURE = cursor + global FORCE_EXECUTE_FAILURE + FORCE_EXECUTE_FAILURE = cursor + global FORCE_COMMIT_FAILURE + FORCE_COMMIT_FAILURE = commit + global FORCE_CLOSE_FAILURE + FORCE_CLOSE_FAILURE = close + +def monkey_set_defaults(multiDbInfo): + """ + Set up a set of defaults for the cursors on "select" statements. + The outer scope gives a database name. + The next level is a string that will be matched against the currently-active + select statement being executed. + If both match, the specified values are returned by the cursor. + monkey_psycopg2.monkey_set_defaults({ + "testdb1": { + "hb_common": [ + [ 1, "sn1", 31, "st1" ], + [ 2, "sn2", 32, "st2" ] + ] + } + }) + """ + global DEFAULT_MULTI_DBINFO + DEFAULT_MULTI_DBINFO = multiDbInfo + +def monkey_connect(database=None, host=None, port=None, user=None, password=None): + """ + Mock psycopg2 connection. + Returns a mock connection. + + Will raise an exception if value of 'FORCE_CONNECT_FAILURE' is true. + (Used to force failure for certain code paths.) + + Also set up any DbInfo values, based on the database name. + (See monkey_set_defaults(), which must have been called prior to this being invoked.) + """ + if FORCE_CONNECT_FAILURE: + raise Exception("Unable to connect to the database. password=<{}> force_connect_failure=<{}>".format(password, FORCE_CONNECT_FAILURE)) + + if database in DEFAULT_MULTI_DBINFO: + return MockConn(DEFAULT_MULTI_DBINFO[database]) + else: + return MockConn() diff --git a/tests/test_binding.py b/tests/test_binding.py index 0ef6c5d..170e8b1 100644 --- a/tests/test_binding.py +++ b/tests/test_binding.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,8 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. import os import io @@ -37,7 +35,6 @@ from miss_htbt_service import db_monitoring from miss_htbt_service.mod.trapd_vnf_table import hb_properties import unittest - MODULE_EXTENSIONS = ('.py', '.pyc', '.pyo') def package_contents(package_name): diff --git a/tests/test_check_health.py b/tests/test_check_health.py new file mode 100644 index 0000000..a159212 --- /dev/null +++ b/tests/test_check_health.py @@ -0,0 +1,65 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019 Pantheon.tech. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +# code loosely based on https://stackoverflow.com/questions/25369068/python-how-to-unit-test-a-custom-http-request-handler + +from miss_htbt_service import check_health + +import io + +class MockSocket(object): + def getsockname(self): + return ('sockname',) + +class MockRequest(object): + _sock = MockSocket() + + def __init__(self, rqtype, path, body=None): + self._path = path + self._rqtype = rqtype if rqtype else "GET" + self._body = body + + def makefile(self, *args, **kwargs): + if args[0] == 'rb': + if self._rqtype == 'GET': + return io.BytesIO(bytes("%s %s HTTP/1.0" % (self._rqtype, self._path), 'utf-8')) + else: + return io.BytesIO(bytes("%s %s HTTP/1.0\r\nContent-Length: %s\r\n\r\n%s" % (self._rqtype, self._path, len(self._body), self._body), 'utf-8')) + elif args[0] == 'wb': + return io.BytesIO(b'') + else: + raise ValueError("Unknown file type to make", args, kwargs) + + def sendall(self, bstr): + pass + +class MockServer(object): + def __init__(self, rqtype, path, ip_port, Handler, body=None): + handler = Handler(MockRequest(rqtype, path, body), ip_port, self) + +def test_check_health_get(): + """ + test the check_health GET and POST handlers using a mock server + """ + server = MockServer('GET', '/', ('0.0.0.0', 8888), check_health.GetHandler) + +def test_check_health_post(): + """ + test the check_health GET and POST handlers using a mock server + """ + server = MockServer('POST', '/', ('0.0.0.0', 8888), check_health.GetHandler, + '{ "health": "" }') diff --git a/tests/test_config_notif.py b/tests/test_config_notif.py new file mode 100644 index 0000000..b59696e --- /dev/null +++ b/tests/test_config_notif.py @@ -0,0 +1,435 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from miss_htbt_service import config_notif +# from miss_htbt_service.mod.trapd_get_cbs_config import get_cbs_config +import miss_htbt_service.mod.trapd_get_cbs_config +import miss_htbt_service.mod.trapd_settings + +from . import monkey_psycopg2 +import psycopg2 +import tempfile, sys, json, os + +def assert_default_values(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval): + """ + used in the test_read_hb_properties*() tests + """ + assert(str(port_num) == "5432") + assert(str(user_name) == "postgres") + assert(str(db_name) == "postgres") + assert(str(password) == "postgres") + +def test_read_hb_properties_default(): + """ + run read_hb_properties_default() + """ + ( ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ) = config_notif.read_hb_properties_default() + assert_default_values(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) + +def test_read_hb_properties_success(): + """ + run read_hb_properties() to read properties from a file + """ + tmp = tempfile.NamedTemporaryFile(mode="w+") + testdata = { + "pg_ipAddress": "10.0.0.99", + "pg_portNum": 65432, + "pg_dbName": "dbname", + "pg_userName": "pguser", + "pg_passwd": "pgpswd", + "CBS_polling_allowed": True, + "CBS_polling_interval": 30, + "SERVICE_NAME": "service_name" + } + json.dump(testdata, tmp) + tmp.flush() + ( ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ) = config_notif.read_hb_properties(tmp.name) + assert(str(ip_address) == str(testdata["pg_ipAddress"])) + assert(str(port_num) == str(testdata["pg_portNum"])) + assert(str(user_name) == str(testdata["pg_userName"])) + assert(str(password) == str(testdata["pg_passwd"])) + assert(str(db_name) == str(testdata["pg_dbName"])) + assert(str(cbs_polling_required) == str(testdata["CBS_polling_allowed"])) + assert(str(cbs_polling_interval) == str(testdata["CBS_polling_interval"])) + assert(str(os.environ['SERVICE_NAME']) == str(testdata["SERVICE_NAME"])) + +def test_read_hb_properties_fail_bad_json(): + """ + failure case for read_hb_properties: bad json in the file + """ + tmp = tempfile.NamedTemporaryFile(mode="w+") + print("bad json", file=tmp) + tmp.flush() + ( ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ) = config_notif.read_hb_properties(tmp.name) + assert_default_values(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) + +def test_read_hb_properties_fail_missing_parameter(): + """ + failure case for read_hb_properties: CBS_polling_allowed is missing + """ + tmp = tempfile.NamedTemporaryFile(mode="w+") + testdata = { + "pg_ipAddress": "10.0.0.99", + "pg_portNum": 65432, + "pg_dbName": "dbname", + "pg_userName": "pguser", + "pg_passwd": "pgpswd", + # "CBS_polling_allowed": True, # missing CBS_polling_allowed + "CBS_polling_interval": 30, + "SERVICE_NAME": "service_name" + } + json.dump(testdata, tmp) + tmp.flush() + ( ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ) = config_notif.read_hb_properties(tmp.name) + assert_default_values(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval) + +def test_postgres_db_open(monkeypatch): + """ + test postgres_db_open() + """ + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname") + assert(type(dbconn) is monkey_psycopg2.MockConn) + +def test_postgres_db_open_fail(monkeypatch): + """ + failure ase for postgres_db_open() + """ + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces(connect=True) + dbconn = config_notif.postgres_db_open("test", "badpassword", "testsite", 5432, "dbname") + assert(type(dbconn) is not monkey_psycopg2.MockConn) + +def test_db_table_creation_check(monkeypatch): + """ + test db_table_creation_check() + """ + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname") + dbconn.monkey_setDbInfo({ "select * from information_schema.tables": [ [ "testtable" ] ] }) + assert(type(dbconn) is monkey_psycopg2.MockConn) + ret = config_notif.db_table_creation_check(dbconn, "testtable") + assert(ret == True) + ret2 = config_notif.db_table_creation_check(dbconn, "missingtable") + monkey_psycopg2.monkey_reset_forces(cursor=True) + ret3 = config_notif.db_table_creation_check(dbconn, "testtable") + assert(ret3 is None) + +def test_commit_and_close_db(monkeypatch): + """ + test commit_and_close_db() + """ + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname") + assert(type(dbconn) is monkey_psycopg2.MockConn) + print("commit_and_close_db(): no forced failures") + ret = config_notif.commit_and_close_db(dbconn) + assert(ret == True) + +def test_commit_and_close_db_fail1(monkeypatch): + """ + failure case for commit_and_close_db(): dbconn.close() fails + """ + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname") + assert(type(dbconn) is monkey_psycopg2.MockConn) + print("commit_and_close_db() - close failure") + monkey_psycopg2.monkey_reset_forces(close=True) + ret = config_notif.commit_and_close_db(dbconn) + assert(ret == False) + +def test_commit_and_close_db_fail2(monkeypatch): + """ + failure case for commit_and_close_db(): dbconn.commit() fails + """ + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname") + assert(type(dbconn) is monkey_psycopg2.MockConn) + print("commit_and_close_db() - commit failure") + monkey_psycopg2.monkey_reset_forces(commit=True) + ret = config_notif.commit_and_close_db(dbconn) + assert(ret == False) + +def test_read_hb_properties_default(monkeypatch): + """ + test read_hb_properties_default() + """ + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + monkey_psycopg2.monkey_set_defaults({ + "testdb1": { + "hb_common": [ + [ 1, "sn1", 31, "st1" ], + [ 2, "sn2", 32, "st2" ] + ] + } + }) + + output = config_notif.read_hb_common("test", "testpswd", "testsite", 5432, "testdb1") + assert(output[0] == 1) + assert(output[1] == "st1") + assert(output[2] == "sn1") + assert(output[3] == 31) + +def test_update_hb_common(monkeypatch): + """ + test update_hb_common() + """ + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + output = config_notif.update_hb_common(None, 1234, "st1234", "test", "testpswd", "testsite", 5432, "testdb1") + assert(output == True) + +def monkeypatch_get_cbs_config_False(): + """ + monkeypatch for get_cbs_config() to force it to return False + Required side affect: c_config is set to a json value + """ + print("monkeypatch_get_cbs_config_False()") + miss_htbt_service.mod.trapd_settings.c_config = { "patch": "false" } + return False + +def monkeypatch_get_cbs_config_True(): + """ + monkeypatch for get_cbs_config() to force it to return False + Required side affect: c_config is set to a json value + """ + print("monkeypatch_get_cbs_config_True()") + miss_htbt_service.mod.trapd_settings.c_config = { "patch": "true" } + return True + +def test_fetch_json_file_get_cbs_config_is_true(monkeypatch): + """ + test fetch_json_file() with get_cbs_config() returning True + """ + monkeypatch.setattr(miss_htbt_service.mod.trapd_get_cbs_config, 'get_cbs_config', monkeypatch_get_cbs_config_True) + tmp1 = tempfile.NamedTemporaryFile(mode="w+") + tmp2 = tempfile.NamedTemporaryFile(mode="w+") + output = config_notif.fetch_json_file(download_json = tmp1.name, config_json = tmp2.name) + assert(output == tmp1.name) + with open(tmp1.name, "r") as fp: + j1 = json.load(fp) + print(f"j1={j1}") + assert("patch" in j1 and j1["patch"] == "true") + +def test_fetch_json_file_get_cbs_config_is_false(monkeypatch): + """ + test fetch_json_file() with get_cbs_config() returning False + """ + monkeypatch.setattr(miss_htbt_service.mod.trapd_get_cbs_config, 'get_cbs_config', monkeypatch_get_cbs_config_False) + tmp1 = tempfile.NamedTemporaryFile(mode="w+") + tmp2 = tempfile.NamedTemporaryFile(mode="w+") + output = config_notif.fetch_json_file(download_json = tmp1.name, config_json = tmp2.name) + assert(output == tmp2.name) + +FETCH_JSON_FILE = None + +def monkeypatch_fetch_json_file(): + """ + Monkeypatch for fetch_json_file() to test config_notif_run() + """ + print("monkeypatch_fetch_json_file()") + return FETCH_JSON_FILE + +def monkeypatch_return_False(*args, **kwargs): + """ + Monkeypatch that can be used to force a function to return False + """ + print("monkeypatch_return_False()") + return False + + +def test_config_notif_run_good(monkeypatch): + """ + test config_notif_run() + everything good: "dbname" found (from below JSON info), "hb_common" listed in tables + and hb_common has data. + """ + monkeypatch.setattr(miss_htbt_service.config_notif, 'fetch_json_file', monkeypatch_fetch_json_file) + + tmp = tempfile.NamedTemporaryFile(mode="w+") + global FETCH_JSON_FILE + FETCH_JSON_FILE = tmp.name + + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + + monkey_psycopg2.monkey_set_defaults({ + "dbname": { + "from information_schema.tables": [ + [ "hb_common" ] + ], + "from hb_common": [ + [ 1, "sn1", 31, "st1" ], + [ 2, "sn2", 32, "st2" ] + ] + } + }) + + testdata = { + "pg_ipAddress": "10.0.0.99", + "pg_portNum": 65432, + "pg_dbName": "dbname", + "pg_userName": "pguser", + "pg_passwd": "pgpswd", + "CBS_polling_allowed": True, + "CBS_polling_interval": 30, + "SERVICE_NAME": "service_name" + } + json.dump(testdata, tmp) + tmp.flush() + + output = config_notif.config_notif_run() + print(f"output={output}") + assert(output == True) + +def test_config_notif_run_fail1(monkeypatch): + """ + test config_notif_run() + Failure case 1: "dbname" NOT found (from below JSON info), "hb_common" listed in tables + and hb_common has data. + """ + monkeypatch.setattr(miss_htbt_service.config_notif, 'fetch_json_file', monkeypatch_fetch_json_file) + + tmp = tempfile.NamedTemporaryFile(mode="w+") + global FETCH_JSON_FILE + FETCH_JSON_FILE = tmp.name + + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + + monkey_psycopg2.monkey_set_defaults({ + "dbnameNOTHERE": { + "from information_schema.tables": [ + [ "hb_common" ] + ], + "from hb_common": [ + [ 1, "sn1", 31, "st1" ], + [ 2, "sn2", 32, "st2" ] + ] + } + }) + + testdata = { + "pg_ipAddress": "10.0.0.99", + "pg_portNum": 65432, + "pg_dbName": "dbname", + "pg_userName": "pguser", + "pg_passwd": "pgpswd", + "CBS_polling_allowed": True, + "CBS_polling_interval": 30, + "SERVICE_NAME": "service_name" + } + json.dump(testdata, tmp) + tmp.flush() + + output = config_notif.config_notif_run() + print(f"output={output}") + assert(output is None) + +def test_config_notif_run_fail2(monkeypatch): + """ + test config_notif_run() + Failure case 2: "dbname" found (from below JSON info), "hb_common" NOT listed in tables + and hb_common has data. + """ + monkeypatch.setattr(miss_htbt_service.config_notif, 'fetch_json_file', monkeypatch_fetch_json_file) + + tmp = tempfile.NamedTemporaryFile(mode="w+") + global FETCH_JSON_FILE + FETCH_JSON_FILE = tmp.name + + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + + monkey_psycopg2.monkey_set_defaults({ + "dbname": { + "from information_schema.tables": [ + [ "hb_commonNOTHERE" ] + ], + "from hb_common": [ + [ 1, "sn1", 31, "st1" ], + [ 2, "sn2", 32, "st2" ] + ] + } + }) + + testdata = { + "pg_ipAddress": "10.0.0.99", + "pg_portNum": 65432, + "pg_dbName": "dbname", + "pg_userName": "pguser", + "pg_passwd": "pgpswd", + "CBS_polling_allowed": True, + "CBS_polling_interval": 30, + "SERVICE_NAME": "service_name" + } + json.dump(testdata, tmp) + tmp.flush() + + output = config_notif.config_notif_run() + print(f"output={output}") + assert(output is None) + +def test_config_notif_run_fail3(monkeypatch): + """ + test config_notif_run() + Failure case 3: "dbname" found (from below JSON info), "hb_common" listed in tables + and update_hb_common() fails + """ + monkeypatch.setattr(miss_htbt_service.config_notif, 'fetch_json_file', monkeypatch_fetch_json_file) + monkeypatch.setattr(miss_htbt_service.config_notif, 'update_hb_common', monkeypatch_return_False) + + tmp = tempfile.NamedTemporaryFile(mode="w+") + global FETCH_JSON_FILE + FETCH_JSON_FILE = tmp.name + + monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect) + monkey_psycopg2.monkey_reset_forces() + + monkey_psycopg2.monkey_set_defaults({ + "dbname": { + "from information_schema.tables": [ + [ "hb_common" ] + ], + "from hb_common": [ + [ 1, "sn1", 31, "st1" ], + [ 2, "sn2", 32, "st2" ] + ] + } + }) + + testdata = { + "pg_ipAddress": "10.0.0.99", + "pg_portNum": 65432, + "pg_dbName": "dbname", + "pg_userName": "pguser", + "pg_passwd": "pgpswd", + "CBS_polling_allowed": True, + "CBS_polling_interval": 30, + "SERVICE_NAME": "service_name" + } + json.dump(testdata, tmp) + tmp.flush() + + output = config_notif.config_notif_run() + print(f"output={output}") + assert(output == False) diff --git a/tests/test_get_logger.py b/tests/test_get_logger.py new file mode 100644 index 0000000..a643e0f --- /dev/null +++ b/tests/test_get_logger.py @@ -0,0 +1,35 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from miss_htbt_service import get_logger + +import os + +def test_get_logger(): + try: + os.remove("hb_logs.txt") + except: + pass + log = get_logger.get_logger() + log.info("hi there") + +def test_get_logger_node(): + try: + os.remove("hb_logs.txt") + except: + pass + log = get_logger.get_logger("node") + log.info("hi there node") diff --git a/tests/test_htbtworker.py b/tests/test_htbtworker.py new file mode 100644 index 0000000..97e61b4 --- /dev/null +++ b/tests/test_htbtworker.py @@ -0,0 +1,45 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +from miss_htbt_service import htbtworker + +import os, tempfile, json + +def run_test(i): + """ + read_json_file() opens the file CWD/prefix/test{j}.json and returns the json value found there + """ + j = i + 1 + tdir = tempfile.TemporaryDirectory() + prefix = "../../../../../../../../../../../../.." + pdir = f"{prefix}{tdir.name}" + fname = f"{tdir.name}/test{j}.json" + with open(fname, "w") as fp: + json.dump({ "test": i }, fp) + assert(os.path.isfile(f"{tdir.name}/test{j}.json")) + assert(os.path.isfile(f"{pdir}/test{j}.json")) + cfg = htbtworker.read_json_file(i, prefix=pdir) + assert(cfg["test"] == i) + +def test_read_json_file_0(): + run_test(0) + +def test_read_json_file_1(): + run_test(1) + +def test_read_json_file_2(): + run_test(2) + diff --git a/tests/test_trapd_exit.py b/tests/test_trapd_exit.py index 4fa8586..abb6bf4 100644 --- a/tests/test_trapd_exit.py +++ b/tests/test_trapd_exit.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,8 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. import pytest import unittest diff --git a/tests/test_trapd_get_cbs_config.py b/tests/test_trapd_get_cbs_config.py index 92d0dde..8691a89 100644 --- a/tests/test_trapd_get_cbs_config.py +++ b/tests/test_trapd_get_cbs_config.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,8 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. import pytest import unittest diff --git a/tests/test_trapd_http_session.py b/tests/test_trapd_http_session.py index 00ee867..948dd5e 100644 --- a/tests/test_trapd_http_session.py +++ b/tests/test_trapd_http_session.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,8 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. import pytest import unittest diff --git a/tests/test_trapd_runtime_pid.py b/tests/test_trapd_runtime_pid.py index 222ce58..ecb063c 100644 --- a/tests/test_trapd_runtime_pid.py +++ b/tests/test_trapd_runtime_pid.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,8 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. import pytest import unittest diff --git a/tests/test_trapd_settings.py b/tests/test_trapd_settings.py index 53b24c8..33fb0ab 100644 --- a/tests/test_trapd_settings.py +++ b/tests/test_trapd_settings.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,15 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. import pytest import unittest from miss_htbt_service.mod import trapd_settings as tds - pid_file="/tmp/test_pid_file" pid_file_dne="/tmp/test_pid_file_NOT" diff --git a/tests/test_trapd_vnf_table.py b/tests/test_trapd_vnf_table.py index f38d4af..4f5cace 100644 --- a/tests/test_trapd_vnf_table.py +++ b/tests/test_trapd_vnf_table.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= # org.onap.dcae # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # Copyright (c) 2019 Pantheon.tech. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,8 +17,6 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -# ## Author Prakask H (ph553f) """ test_trapd_vnf_table contains test cases related to DB Tables and cbs polling. |