summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py4
-rw-r--r--tests/makefile2
-rw-r--r--tests/monkey_psycopg2.py207
-rw-r--r--tests/test_binding.py5
-rw-r--r--tests/test_check_health.py65
-rw-r--r--tests/test_config_notif.py435
-rw-r--r--tests/test_get_logger.py35
-rw-r--r--tests/test_htbtworker.py45
-rw-r--r--tests/test_trapd_exit.py4
-rw-r--r--tests/test_trapd_get_cbs_config.py4
-rw-r--r--tests/test_trapd_http_session.py4
-rw-r--r--tests/test_trapd_runtime_pid.py4
-rw-r--r--tests/test_trapd_settings.py5
-rw-r--r--tests/test_trapd_vnf_table.py4
14 files changed, 797 insertions, 26 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
index 1875bf6..8e53efc 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
# empty __init__.py so that pytest can add correct path to coverage report, -- per pytest
diff --git a/tests/makefile b/tests/makefile
new file mode 100644
index 0000000..9901727
--- /dev/null
+++ b/tests/makefile
@@ -0,0 +1,2 @@
+runtox:
+ cd .. && $(MAKE) runtox
diff --git a/tests/monkey_psycopg2.py b/tests/monkey_psycopg2.py
new file mode 100644
index 0000000..05f44b4
--- /dev/null
+++ b/tests/monkey_psycopg2.py
@@ -0,0 +1,207 @@
+# ============LICENSE_START====================================================
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+"""
+
+This is a mock psycopg2 module.
+
+"""
+
+import psycopg2
+
+# These FORCE variables are set by monkey_psycopg2.monkey_reset_forces().
+# If set, they will cause a connection failure, cursor failure, execute failure,
+# commit failure, and close failure, respectively.
+FORCE_CONNECT_FAILURE = False
+FORCE_CURSOR_FAILURE = False
+FORCE_EXECUTE_FAILURE = False
+FORCE_COMMIT_FAILURE = False
+FORCE_CLOSE_FAILURE = False
+
+# This variable is used by monkey_psycopg2.monkey_set_defaults(multiDbInfo)
+# to set up default values to be returned by cursors statements.
+DEFAULT_MULTI_DBINFO = { }
+
+class MockConn(object):
+ """
+ mock Connection interface returned by psycopg2.connect()
+ """
+
+ def __init__(self, dbInfo = None):
+ self.curCmd = None
+ self.curCursor = None
+ self.monkey_setDbInfo(dbInfo)
+
+ def monkey_setDbInfo(self, dbInfo):
+ """
+ Set up a set of defaults for the cursors on "select" statements.
+ The outer scope is a string that will be matched against the currently-active
+ select statement being executed.
+ If there is a match, the specified values are returned by the cursor.
+ dbconn.monkey_setDbInfo({
+ "hb_common": [
+ [ 1, "sn1", 31, "st1" ],
+ [ 2, "sn2", 32, "st2" ]
+ ]
+ })
+ """
+ self.dbInfo = dbInfo
+
+ def commit(self):
+ """
+ mock commit.
+ Do nothing unless FORCE_COMMIT_FAILURE is set.
+
+ Will raise an exception if value of 'FORCE_COMMIT_FAILURE' is true.
+ Used to force failure for certain code paths.
+ """
+ if FORCE_COMMIT_FAILURE:
+ raise psycopg2.DatabaseError(f"Unable to commit: force_commit_failure=<{FORCE_COMMIT_FAILURE}>")
+
+ def close(self):
+ """
+ mock close
+ Do nothing unless FORCE_CLOSE_FAILURE is set.
+
+ Will raise an exception if value of 'FORCE_CLOSE_FAILURE' is true.
+ Used to force failure for certain code paths.
+ """
+ if FORCE_CLOSE_FAILURE:
+ raise psycopg2.DatabaseError(f"Unable to close: force_close_failure=<{FORCE_CLOSE_FAILURE}>")
+
+ def __enter__(self):
+ """
+ method needed to implement a context manager
+ """
+ return self
+
+ # pylint: disable=redefined-outer-name,redefined-builtin
+ def __exit__(self, type, value, traceback):
+ """
+ method needed to implement a context manager
+ """
+ pass
+
+ def cursor(self):
+ """
+ mock cursor
+
+ Will raise an exception if value of 'FORCE_CURSOR_FAILURE' is true.
+ Used to force failure for certain code paths.
+ """
+ if FORCE_CURSOR_FAILURE:
+ raise psycopg2.DatabaseError(f"Unable to return cursor: force_cursor_failure=<{FORCE_CURSOR_FAILURE}>")
+
+ print(f"cursor()")
+ self.curCursor = None
+ return self
+
+ def execute(self, cmd, args = None):
+ """
+ mock execute
+
+ Will raise an exception if value of 'FORCE_EXECUTE_FAILURE' is true.
+ Used to force failure for certain code paths.
+
+ A side affect is that the cursor's values will be set based on a match
+ with the command (lower-cased) being executed.
+ """
+ # pylint: disable=global-statement,no-self-use
+
+ print(f"execute({cmd},{args})")
+ self.curCmd = cmd
+ if FORCE_EXECUTE_FAILURE:
+ raise Exception("postgres execute command throwing exception. cmd=<{}>".format(cmd))
+
+ if self.dbInfo:
+ curCmdLower = cmd.lower()
+ for cmd, val in self.dbInfo.items():
+ print(f"cmd={cmd}, val={val}")
+ if cmd in curCmdLower:
+ self.curCursor = val
+ break
+
+ def fetchone(self):
+ """
+ return a single row from the current cursor
+ """
+ if not self.curCursor:
+ print(f"fetchone() returning None")
+ return None
+ print(f"fetchone() returning {self.curCursor[0]}")
+ return self.curCursor[0]
+
+ def fetchall(self):
+ """
+ return all rows from the current cursor
+ """
+ if not self.curCursor:
+ print(f"fetchall() returning None")
+ return None
+ print(f"fetchall() returning {self.curCursor}")
+ return self.curCursor
+
+def monkey_reset_forces(connect=False, cursor=False, execute=False, commit=False, close=False):
+ print(f"monkey_reset_forces({connect}, {cursor}, {execute}, {commit}, {close})")
+ global FORCE_CONNECT_FAILURE
+ FORCE_CONNECT_FAILURE = connect
+ global FORCE_CURSOR_FAILURE
+ FORCE_CURSOR_FAILURE = cursor
+ global FORCE_EXECUTE_FAILURE
+ FORCE_EXECUTE_FAILURE = cursor
+ global FORCE_COMMIT_FAILURE
+ FORCE_COMMIT_FAILURE = commit
+ global FORCE_CLOSE_FAILURE
+ FORCE_CLOSE_FAILURE = close
+
+def monkey_set_defaults(multiDbInfo):
+ """
+ Set up a set of defaults for the cursors on "select" statements.
+ The outer scope gives a database name.
+ The next level is a string that will be matched against the currently-active
+ select statement being executed.
+ If both match, the specified values are returned by the cursor.
+ monkey_psycopg2.monkey_set_defaults({
+ "testdb1": {
+ "hb_common": [
+ [ 1, "sn1", 31, "st1" ],
+ [ 2, "sn2", 32, "st2" ]
+ ]
+ }
+ })
+ """
+ global DEFAULT_MULTI_DBINFO
+ DEFAULT_MULTI_DBINFO = multiDbInfo
+
+def monkey_connect(database=None, host=None, port=None, user=None, password=None):
+ """
+ Mock psycopg2 connection.
+ Returns a mock connection.
+
+ Will raise an exception if value of 'FORCE_CONNECT_FAILURE' is true.
+ (Used to force failure for certain code paths.)
+
+ Also set up any DbInfo values, based on the database name.
+ (See monkey_set_defaults(), which must have been called prior to this being invoked.)
+ """
+ if FORCE_CONNECT_FAILURE:
+ raise Exception("Unable to connect to the database. password=<{}> force_connect_failure=<{}>".format(password, FORCE_CONNECT_FAILURE))
+
+ if database in DEFAULT_MULTI_DBINFO:
+ return MockConn(DEFAULT_MULTI_DBINFO[database])
+ else:
+ return MockConn()
diff --git a/tests/test_binding.py b/tests/test_binding.py
index 0ef6c5d..170e8b1 100644
--- a/tests/test_binding.py
+++ b/tests/test_binding.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import os
import io
@@ -37,7 +35,6 @@ from miss_htbt_service import db_monitoring
from miss_htbt_service.mod.trapd_vnf_table import hb_properties
import unittest
-
MODULE_EXTENSIONS = ('.py', '.pyc', '.pyo')
def package_contents(package_name):
diff --git a/tests/test_check_health.py b/tests/test_check_health.py
new file mode 100644
index 0000000..a159212
--- /dev/null
+++ b/tests/test_check_health.py
@@ -0,0 +1,65 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019 Pantheon.tech. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+# code loosely based on https://stackoverflow.com/questions/25369068/python-how-to-unit-test-a-custom-http-request-handler
+
+from miss_htbt_service import check_health
+
+import io
+
+class MockSocket(object):
+ def getsockname(self):
+ return ('sockname',)
+
+class MockRequest(object):
+ _sock = MockSocket()
+
+ def __init__(self, rqtype, path, body=None):
+ self._path = path
+ self._rqtype = rqtype if rqtype else "GET"
+ self._body = body
+
+ def makefile(self, *args, **kwargs):
+ if args[0] == 'rb':
+ if self._rqtype == 'GET':
+ return io.BytesIO(bytes("%s %s HTTP/1.0" % (self._rqtype, self._path), 'utf-8'))
+ else:
+ return io.BytesIO(bytes("%s %s HTTP/1.0\r\nContent-Length: %s\r\n\r\n%s" % (self._rqtype, self._path, len(self._body), self._body), 'utf-8'))
+ elif args[0] == 'wb':
+ return io.BytesIO(b'')
+ else:
+ raise ValueError("Unknown file type to make", args, kwargs)
+
+ def sendall(self, bstr):
+ pass
+
+class MockServer(object):
+ def __init__(self, rqtype, path, ip_port, Handler, body=None):
+ handler = Handler(MockRequest(rqtype, path, body), ip_port, self)
+
+def test_check_health_get():
+ """
+ test the check_health GET and POST handlers using a mock server
+ """
+ server = MockServer('GET', '/', ('0.0.0.0', 8888), check_health.GetHandler)
+
+def test_check_health_post():
+ """
+ test the check_health GET and POST handlers using a mock server
+ """
+ server = MockServer('POST', '/', ('0.0.0.0', 8888), check_health.GetHandler,
+ '{ "health": "" }')
diff --git a/tests/test_config_notif.py b/tests/test_config_notif.py
new file mode 100644
index 0000000..b59696e
--- /dev/null
+++ b/tests/test_config_notif.py
@@ -0,0 +1,435 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from miss_htbt_service import config_notif
+# from miss_htbt_service.mod.trapd_get_cbs_config import get_cbs_config
+import miss_htbt_service.mod.trapd_get_cbs_config
+import miss_htbt_service.mod.trapd_settings
+
+from . import monkey_psycopg2
+import psycopg2
+import tempfile, sys, json, os
+
+def assert_default_values(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval):
+ """
+ used in the test_read_hb_properties*() tests
+ """
+ assert(str(port_num) == "5432")
+ assert(str(user_name) == "postgres")
+ assert(str(db_name) == "postgres")
+ assert(str(password) == "postgres")
+
+def test_read_hb_properties_default():
+ """
+ run read_hb_properties_default()
+ """
+ ( ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ) = config_notif.read_hb_properties_default()
+ assert_default_values(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval)
+
+def test_read_hb_properties_success():
+ """
+ run read_hb_properties() to read properties from a file
+ """
+ tmp = tempfile.NamedTemporaryFile(mode="w+")
+ testdata = {
+ "pg_ipAddress": "10.0.0.99",
+ "pg_portNum": 65432,
+ "pg_dbName": "dbname",
+ "pg_userName": "pguser",
+ "pg_passwd": "pgpswd",
+ "CBS_polling_allowed": True,
+ "CBS_polling_interval": 30,
+ "SERVICE_NAME": "service_name"
+ }
+ json.dump(testdata, tmp)
+ tmp.flush()
+ ( ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ) = config_notif.read_hb_properties(tmp.name)
+ assert(str(ip_address) == str(testdata["pg_ipAddress"]))
+ assert(str(port_num) == str(testdata["pg_portNum"]))
+ assert(str(user_name) == str(testdata["pg_userName"]))
+ assert(str(password) == str(testdata["pg_passwd"]))
+ assert(str(db_name) == str(testdata["pg_dbName"]))
+ assert(str(cbs_polling_required) == str(testdata["CBS_polling_allowed"]))
+ assert(str(cbs_polling_interval) == str(testdata["CBS_polling_interval"]))
+ assert(str(os.environ['SERVICE_NAME']) == str(testdata["SERVICE_NAME"]))
+
+def test_read_hb_properties_fail_bad_json():
+ """
+ failure case for read_hb_properties: bad json in the file
+ """
+ tmp = tempfile.NamedTemporaryFile(mode="w+")
+ print("bad json", file=tmp)
+ tmp.flush()
+ ( ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ) = config_notif.read_hb_properties(tmp.name)
+ assert_default_values(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval)
+
+def test_read_hb_properties_fail_missing_parameter():
+ """
+ failure case for read_hb_properties: CBS_polling_allowed is missing
+ """
+ tmp = tempfile.NamedTemporaryFile(mode="w+")
+ testdata = {
+ "pg_ipAddress": "10.0.0.99",
+ "pg_portNum": 65432,
+ "pg_dbName": "dbname",
+ "pg_userName": "pguser",
+ "pg_passwd": "pgpswd",
+ # "CBS_polling_allowed": True, # missing CBS_polling_allowed
+ "CBS_polling_interval": 30,
+ "SERVICE_NAME": "service_name"
+ }
+ json.dump(testdata, tmp)
+ tmp.flush()
+ ( ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval ) = config_notif.read_hb_properties(tmp.name)
+ assert_default_values(ip_address, port_num, user_name, password, db_name, cbs_polling_required, cbs_polling_interval)
+
+def test_postgres_db_open(monkeypatch):
+ """
+ test postgres_db_open()
+ """
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+ dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname")
+ assert(type(dbconn) is monkey_psycopg2.MockConn)
+
+def test_postgres_db_open_fail(monkeypatch):
+ """
+ failure ase for postgres_db_open()
+ """
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces(connect=True)
+ dbconn = config_notif.postgres_db_open("test", "badpassword", "testsite", 5432, "dbname")
+ assert(type(dbconn) is not monkey_psycopg2.MockConn)
+
+def test_db_table_creation_check(monkeypatch):
+ """
+ test db_table_creation_check()
+ """
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+ dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname")
+ dbconn.monkey_setDbInfo({ "select * from information_schema.tables": [ [ "testtable" ] ] })
+ assert(type(dbconn) is monkey_psycopg2.MockConn)
+ ret = config_notif.db_table_creation_check(dbconn, "testtable")
+ assert(ret == True)
+ ret2 = config_notif.db_table_creation_check(dbconn, "missingtable")
+ monkey_psycopg2.monkey_reset_forces(cursor=True)
+ ret3 = config_notif.db_table_creation_check(dbconn, "testtable")
+ assert(ret3 is None)
+
+def test_commit_and_close_db(monkeypatch):
+ """
+ test commit_and_close_db()
+ """
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+ dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname")
+ assert(type(dbconn) is monkey_psycopg2.MockConn)
+ print("commit_and_close_db(): no forced failures")
+ ret = config_notif.commit_and_close_db(dbconn)
+ assert(ret == True)
+
+def test_commit_and_close_db_fail1(monkeypatch):
+ """
+ failure case for commit_and_close_db(): dbconn.close() fails
+ """
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+ dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname")
+ assert(type(dbconn) is monkey_psycopg2.MockConn)
+ print("commit_and_close_db() - close failure")
+ monkey_psycopg2.monkey_reset_forces(close=True)
+ ret = config_notif.commit_and_close_db(dbconn)
+ assert(ret == False)
+
+def test_commit_and_close_db_fail2(monkeypatch):
+ """
+ failure case for commit_and_close_db(): dbconn.commit() fails
+ """
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+ dbconn = config_notif.postgres_db_open("test", "testpswd", "testsite", 5432, "dbname")
+ assert(type(dbconn) is monkey_psycopg2.MockConn)
+ print("commit_and_close_db() - commit failure")
+ monkey_psycopg2.monkey_reset_forces(commit=True)
+ ret = config_notif.commit_and_close_db(dbconn)
+ assert(ret == False)
+
+def test_read_hb_properties_default(monkeypatch):
+ """
+ test read_hb_properties_default()
+ """
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+ monkey_psycopg2.monkey_set_defaults({
+ "testdb1": {
+ "hb_common": [
+ [ 1, "sn1", 31, "st1" ],
+ [ 2, "sn2", 32, "st2" ]
+ ]
+ }
+ })
+
+ output = config_notif.read_hb_common("test", "testpswd", "testsite", 5432, "testdb1")
+ assert(output[0] == 1)
+ assert(output[1] == "st1")
+ assert(output[2] == "sn1")
+ assert(output[3] == 31)
+
+def test_update_hb_common(monkeypatch):
+ """
+ test update_hb_common()
+ """
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+ output = config_notif.update_hb_common(None, 1234, "st1234", "test", "testpswd", "testsite", 5432, "testdb1")
+ assert(output == True)
+
+def monkeypatch_get_cbs_config_False():
+ """
+ monkeypatch for get_cbs_config() to force it to return False
+ Required side affect: c_config is set to a json value
+ """
+ print("monkeypatch_get_cbs_config_False()")
+ miss_htbt_service.mod.trapd_settings.c_config = { "patch": "false" }
+ return False
+
+def monkeypatch_get_cbs_config_True():
+ """
+ monkeypatch for get_cbs_config() to force it to return False
+ Required side affect: c_config is set to a json value
+ """
+ print("monkeypatch_get_cbs_config_True()")
+ miss_htbt_service.mod.trapd_settings.c_config = { "patch": "true" }
+ return True
+
+def test_fetch_json_file_get_cbs_config_is_true(monkeypatch):
+ """
+ test fetch_json_file() with get_cbs_config() returning True
+ """
+ monkeypatch.setattr(miss_htbt_service.mod.trapd_get_cbs_config, 'get_cbs_config', monkeypatch_get_cbs_config_True)
+ tmp1 = tempfile.NamedTemporaryFile(mode="w+")
+ tmp2 = tempfile.NamedTemporaryFile(mode="w+")
+ output = config_notif.fetch_json_file(download_json = tmp1.name, config_json = tmp2.name)
+ assert(output == tmp1.name)
+ with open(tmp1.name, "r") as fp:
+ j1 = json.load(fp)
+ print(f"j1={j1}")
+ assert("patch" in j1 and j1["patch"] == "true")
+
+def test_fetch_json_file_get_cbs_config_is_false(monkeypatch):
+ """
+ test fetch_json_file() with get_cbs_config() returning False
+ """
+ monkeypatch.setattr(miss_htbt_service.mod.trapd_get_cbs_config, 'get_cbs_config', monkeypatch_get_cbs_config_False)
+ tmp1 = tempfile.NamedTemporaryFile(mode="w+")
+ tmp2 = tempfile.NamedTemporaryFile(mode="w+")
+ output = config_notif.fetch_json_file(download_json = tmp1.name, config_json = tmp2.name)
+ assert(output == tmp2.name)
+
+FETCH_JSON_FILE = None
+
+def monkeypatch_fetch_json_file():
+ """
+ Monkeypatch for fetch_json_file() to test config_notif_run()
+ """
+ print("monkeypatch_fetch_json_file()")
+ return FETCH_JSON_FILE
+
+def monkeypatch_return_False(*args, **kwargs):
+ """
+ Monkeypatch that can be used to force a function to return False
+ """
+ print("monkeypatch_return_False()")
+ return False
+
+
+def test_config_notif_run_good(monkeypatch):
+ """
+ test config_notif_run()
+ everything good: "dbname" found (from below JSON info), "hb_common" listed in tables
+ and hb_common has data.
+ """
+ monkeypatch.setattr(miss_htbt_service.config_notif, 'fetch_json_file', monkeypatch_fetch_json_file)
+
+ tmp = tempfile.NamedTemporaryFile(mode="w+")
+ global FETCH_JSON_FILE
+ FETCH_JSON_FILE = tmp.name
+
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+
+ monkey_psycopg2.monkey_set_defaults({
+ "dbname": {
+ "from information_schema.tables": [
+ [ "hb_common" ]
+ ],
+ "from hb_common": [
+ [ 1, "sn1", 31, "st1" ],
+ [ 2, "sn2", 32, "st2" ]
+ ]
+ }
+ })
+
+ testdata = {
+ "pg_ipAddress": "10.0.0.99",
+ "pg_portNum": 65432,
+ "pg_dbName": "dbname",
+ "pg_userName": "pguser",
+ "pg_passwd": "pgpswd",
+ "CBS_polling_allowed": True,
+ "CBS_polling_interval": 30,
+ "SERVICE_NAME": "service_name"
+ }
+ json.dump(testdata, tmp)
+ tmp.flush()
+
+ output = config_notif.config_notif_run()
+ print(f"output={output}")
+ assert(output == True)
+
+def test_config_notif_run_fail1(monkeypatch):
+ """
+ test config_notif_run()
+ Failure case 1: "dbname" NOT found (from below JSON info), "hb_common" listed in tables
+ and hb_common has data.
+ """
+ monkeypatch.setattr(miss_htbt_service.config_notif, 'fetch_json_file', monkeypatch_fetch_json_file)
+
+ tmp = tempfile.NamedTemporaryFile(mode="w+")
+ global FETCH_JSON_FILE
+ FETCH_JSON_FILE = tmp.name
+
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+
+ monkey_psycopg2.monkey_set_defaults({
+ "dbnameNOTHERE": {
+ "from information_schema.tables": [
+ [ "hb_common" ]
+ ],
+ "from hb_common": [
+ [ 1, "sn1", 31, "st1" ],
+ [ 2, "sn2", 32, "st2" ]
+ ]
+ }
+ })
+
+ testdata = {
+ "pg_ipAddress": "10.0.0.99",
+ "pg_portNum": 65432,
+ "pg_dbName": "dbname",
+ "pg_userName": "pguser",
+ "pg_passwd": "pgpswd",
+ "CBS_polling_allowed": True,
+ "CBS_polling_interval": 30,
+ "SERVICE_NAME": "service_name"
+ }
+ json.dump(testdata, tmp)
+ tmp.flush()
+
+ output = config_notif.config_notif_run()
+ print(f"output={output}")
+ assert(output is None)
+
+def test_config_notif_run_fail2(monkeypatch):
+ """
+ test config_notif_run()
+ Failure case 2: "dbname" found (from below JSON info), "hb_common" NOT listed in tables
+ and hb_common has data.
+ """
+ monkeypatch.setattr(miss_htbt_service.config_notif, 'fetch_json_file', monkeypatch_fetch_json_file)
+
+ tmp = tempfile.NamedTemporaryFile(mode="w+")
+ global FETCH_JSON_FILE
+ FETCH_JSON_FILE = tmp.name
+
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+
+ monkey_psycopg2.monkey_set_defaults({
+ "dbname": {
+ "from information_schema.tables": [
+ [ "hb_commonNOTHERE" ]
+ ],
+ "from hb_common": [
+ [ 1, "sn1", 31, "st1" ],
+ [ 2, "sn2", 32, "st2" ]
+ ]
+ }
+ })
+
+ testdata = {
+ "pg_ipAddress": "10.0.0.99",
+ "pg_portNum": 65432,
+ "pg_dbName": "dbname",
+ "pg_userName": "pguser",
+ "pg_passwd": "pgpswd",
+ "CBS_polling_allowed": True,
+ "CBS_polling_interval": 30,
+ "SERVICE_NAME": "service_name"
+ }
+ json.dump(testdata, tmp)
+ tmp.flush()
+
+ output = config_notif.config_notif_run()
+ print(f"output={output}")
+ assert(output is None)
+
+def test_config_notif_run_fail3(monkeypatch):
+ """
+ test config_notif_run()
+ Failure case 3: "dbname" found (from below JSON info), "hb_common" listed in tables
+ and update_hb_common() fails
+ """
+ monkeypatch.setattr(miss_htbt_service.config_notif, 'fetch_json_file', monkeypatch_fetch_json_file)
+ monkeypatch.setattr(miss_htbt_service.config_notif, 'update_hb_common', monkeypatch_return_False)
+
+ tmp = tempfile.NamedTemporaryFile(mode="w+")
+ global FETCH_JSON_FILE
+ FETCH_JSON_FILE = tmp.name
+
+ monkeypatch.setattr(psycopg2, 'connect', monkey_psycopg2.monkey_connect)
+ monkey_psycopg2.monkey_reset_forces()
+
+ monkey_psycopg2.monkey_set_defaults({
+ "dbname": {
+ "from information_schema.tables": [
+ [ "hb_common" ]
+ ],
+ "from hb_common": [
+ [ 1, "sn1", 31, "st1" ],
+ [ 2, "sn2", 32, "st2" ]
+ ]
+ }
+ })
+
+ testdata = {
+ "pg_ipAddress": "10.0.0.99",
+ "pg_portNum": 65432,
+ "pg_dbName": "dbname",
+ "pg_userName": "pguser",
+ "pg_passwd": "pgpswd",
+ "CBS_polling_allowed": True,
+ "CBS_polling_interval": 30,
+ "SERVICE_NAME": "service_name"
+ }
+ json.dump(testdata, tmp)
+ tmp.flush()
+
+ output = config_notif.config_notif_run()
+ print(f"output={output}")
+ assert(output == False)
diff --git a/tests/test_get_logger.py b/tests/test_get_logger.py
new file mode 100644
index 0000000..a643e0f
--- /dev/null
+++ b/tests/test_get_logger.py
@@ -0,0 +1,35 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from miss_htbt_service import get_logger
+
+import os
+
+def test_get_logger():
+ try:
+ os.remove("hb_logs.txt")
+ except:
+ pass
+ log = get_logger.get_logger()
+ log.info("hi there")
+
+def test_get_logger_node():
+ try:
+ os.remove("hb_logs.txt")
+ except:
+ pass
+ log = get_logger.get_logger("node")
+ log.info("hi there node")
diff --git a/tests/test_htbtworker.py b/tests/test_htbtworker.py
new file mode 100644
index 0000000..97e61b4
--- /dev/null
+++ b/tests/test_htbtworker.py
@@ -0,0 +1,45 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from miss_htbt_service import htbtworker
+
+import os, tempfile, json
+
+def run_test(i):
+ """
+ read_json_file() opens the file CWD/prefix/test{j}.json and returns the json value found there
+ """
+ j = i + 1
+ tdir = tempfile.TemporaryDirectory()
+ prefix = "../../../../../../../../../../../../.."
+ pdir = f"{prefix}{tdir.name}"
+ fname = f"{tdir.name}/test{j}.json"
+ with open(fname, "w") as fp:
+ json.dump({ "test": i }, fp)
+ assert(os.path.isfile(f"{tdir.name}/test{j}.json"))
+ assert(os.path.isfile(f"{pdir}/test{j}.json"))
+ cfg = htbtworker.read_json_file(i, prefix=pdir)
+ assert(cfg["test"] == i)
+
+def test_read_json_file_0():
+ run_test(0)
+
+def test_read_json_file_1():
+ run_test(1)
+
+def test_read_json_file_2():
+ run_test(2)
+
diff --git a/tests/test_trapd_exit.py b/tests/test_trapd_exit.py
index 4fa8586..abb6bf4 100644
--- a/tests/test_trapd_exit.py
+++ b/tests/test_trapd_exit.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import pytest
import unittest
diff --git a/tests/test_trapd_get_cbs_config.py b/tests/test_trapd_get_cbs_config.py
index 92d0dde..8691a89 100644
--- a/tests/test_trapd_get_cbs_config.py
+++ b/tests/test_trapd_get_cbs_config.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import pytest
import unittest
diff --git a/tests/test_trapd_http_session.py b/tests/test_trapd_http_session.py
index 00ee867..948dd5e 100644
--- a/tests/test_trapd_http_session.py
+++ b/tests/test_trapd_http_session.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import pytest
import unittest
diff --git a/tests/test_trapd_runtime_pid.py b/tests/test_trapd_runtime_pid.py
index 222ce58..ecb063c 100644
--- a/tests/test_trapd_runtime_pid.py
+++ b/tests/test_trapd_runtime_pid.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import pytest
import unittest
diff --git a/tests/test_trapd_settings.py b/tests/test_trapd_settings.py
index 53b24c8..33fb0ab 100644
--- a/tests/test_trapd_settings.py
+++ b/tests/test_trapd_settings.py
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,15 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import pytest
import unittest
from miss_htbt_service.mod import trapd_settings as tds
-
pid_file="/tmp/test_pid_file"
pid_file_dne="/tmp/test_pid_file_NOT"
diff --git a/tests/test_trapd_vnf_table.py b/tests/test_trapd_vnf_table.py
index f38d4af..4f5cace 100644
--- a/tests/test_trapd_vnf_table.py
+++ b/tests/test_trapd_vnf_table.py
@@ -1,7 +1,7 @@
# ============LICENSE_START=======================================================
# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# Copyright (c) 2019 Pantheon.tech. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,8 +17,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-#
## Author Prakask H (ph553f)
"""
test_trapd_vnf_table contains test cases related to DB Tables and cbs polling.