aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_htbtworker.py
blob: 4066a9f599186e5f3cc0cc166239e5a69701135d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# ============LICENSE_START=======================================================
# Copyright (c) 2020-2023 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================


import htbtworker
import os
import tempfile
import json
import unittest
from unittest.mock import *
from _pytest.outcomes import skip


class Test_htbtworker(unittest.TestCase):
    def setUp(self):
        htbtworker.configjsonfile = (os.path.dirname(__file__)) + "/test-config.json"

    @patch("requests.get")
    @patch("htbtworker.check_process_reconfiguration", return_value=False)
    @patch("htbtworker.get_eventnamelist")
    @patch("htbtworker.sql_executor")
    def test_process_msg(self, mock1, mock2, mock3, sqlmock1):
        """
        Test to verify event processing using mock
        TBD - Negative test
        """

        status = True
        dmaap_data = [
            {
                "event": {
                    "commonEventHeader": {
                        "startEpochMicrosec": 1544608845841,
                        "sourceId": "VNFB_SRC5",
                        "eventId": "mvfs10",
                        "nfcNamingCode": "VNFB",
                        "timeZoneOffset": "UTC-05:30",
                        "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234",
                        "eventType": "platform",
                        "priority": "Normal",
                        "version": "4.0.2",
                        "reportingEntityName": "ibcx0001vm002oam001",
                        "sequence": 1000,
                        "domain": "heartbeat",
                        "lastEpochMicrosec": 1544608845841,
                        "eventName": "Heartbeat_vFW",
                        "vesEventListenerVersion": "7.0.2",
                        "sourceName": "SOURCE_NAME2",
                        "nfNamingCode": "VNFB",
                    },
                    "heartbeatFields": {"heartbeatInterval": 20, "heartbeatFieldsVersion": "3.0"},
                }
            }
        ]

        mock_resp = Mock()
        mock_resp.configure_mock(**{"text": json.dumps(dmaap_data)})
        mock3.return_value = [("Heartbeat_vDNS", "Heartbeat_vFW")]
        mock1.return_value = mock_resp

        filename = "test-config.json"
        htbtworker.process_msg(filename, number_of_iterations=1)
        self.assertEqual(status, True)

    def test_parse_event(self):
        """
        test_parse_event() opens the file test1.json and returns attributes
        """
        filename = (os.path.dirname(__file__)) + "/test1.json"
        with open(filename, "r") as fp:
            data = fp.read()
        srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data)
        self.assertEqual(srcname, "SOURCE_NAME1")
        self.assertEqual(event_name, "Heartbeat_vDNS")

        filename = (os.path.dirname(__file__)) + "/test4.json"
        with open(filename, "r") as fp:
            data = fp.read()
        srcname, lastepo, seqnum, event_name = htbtworker.parse_event(data)
        self.assertEqual(srcname, "zalp1bmdns01cmd010")
        self.assertEqual(event_name, "Heartbeat_vDNS")

    @patch("htbtworker.sql_executor")
    def test_create_and_check_vnf2_table(self, mock_settings):
        """
        Test to verify existence of given table
        """
        mock_cursor = Mock()
        mock_cursor.configure_mock(**{"fetchone.return_value": [("vnf_table_2")]})
        mock_settings.return_value = mock_cursor
        status = htbtworker.check_and_create_vnf2_table()
        self.assertEqual(status, True)

        with patch("htbtworker.sql_executor", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
            status = htbtworker.check_and_create_vnf2_table()
            self.assertEqual(False, status)

    @patch("htbtworker.sql_executor")
    def test_new_vnf_entry(self, sql_mock):
        """
        Check to verify if new node entry is made for tracking HB
        """
        status = True
        htbtworker.new_vnf_entry("Heartbeat_vDNS", "TESTNODE", 1548313727714000, "TESTNODE", 1)
        self.assertEqual(status, True)

    @patch("htbtworker.sql_executor")
    def test_get_eventnamelist(self, sql_mock):
        """
        Test to verify eventname list is returned from vnf_table_1
        TBD - List comparison
        """
        eventname_list = [("Heartbeat_vDNS", "Heartbeat_vFW")]
        mock_cursor = Mock()
        mock_cursor.configure_mock(**{"fetchall.return_value": eventname_list})
        sql_mock.return_value = mock_cursor
        return_list = htbtworker.get_eventnamelist()
        self.assertIn("Heartbeat_vDNS", return_list)

    @patch("htbtworker.postgres_db_open")
    def test_sql_executor(self, db_mock):
        """
        Test sql executor wrapper method
        """
        htbtworker.sql_executor("SELECT * FROM information_schema.tables WHERE table_name = %s", "vnf_table_2")
        htbtworker.sql_executor("INSERT into information_schema.tables,")
        connection_db = db_mock
        with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
            flag = htbtworker.commit_and_close_db(connection_db)
            self.assertEqual(False, flag)

    @patch("psycopg2.connect")
    def test_postgres_db_open(self, mock):
        """
        Test wrapper for postgres db connection
        """
        conn = htbtworker.postgres_db_open()
        self.assertIsNotNone(conn)

    @patch("misshtbtd.read_hb_common")
    def test_check_process_reconfiguration(self, mock):
        """
        Test if DB is in reconfiguration state
        """
        mock.return_value = ("1234", "RUNNING", "XYZ", 1234)
        flag = htbtworker.check_process_reconfiguration("test", "test", "x.x.x.x", "1234", "test_db")
        self.assertEqual(False, flag)

    @patch("htbtworker.postgres_db_open")
    def test_commit_and_close_db(self, db_mock):
        """
        Test commit and close db
        """
        connection_db = db_mock
        flag = htbtworker.commit_and_close_db(connection_db)
        self.assertEqual(True, flag)
        with patch("htbtworker.postgres_db_open.commit", new=Mock(side_effect=htbtworker.psycopg2.DatabaseError())):
            flag = htbtworker.commit_and_close_db(connection_db)
            self.assertEqual(False, flag)


if __name__ == "__main__":  # pragma: no cover
    unittest.main()