aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLadue, David (dl3158) <dl3158@att.com>2018-09-14 12:36:11 -0400
committerVijay Venkatesh Kumar <vv770d@att.com>2018-09-14 22:45:39 +0000
commitaeae5597c7dcfa4cf432009c4668dd77aa6b84a1 (patch)
tree649ae039cf0f6c51539d538fd78ac881202a86cc
parent09b8dc81f96a1eecca0c3908a02705218a80354d (diff)
v3 changes and netsnmp compat update
Change-Id: Ia69f70c5e128e800623feb7c1473b6729e52ba29 Issue-ID: DCAEGEN2-630 Signed-off-by: Ladue, David (dl3158) <dl3158@att.com>
-rw-r--r--snmptrap/mod/trapd_vb_types.py89
-rw-r--r--snmptrap/snmptrapd.py3
-rw-r--r--tests/test_trapd_vb_types.py45
3 files changed, 136 insertions, 1 deletions
diff --git a/snmptrap/mod/trapd_vb_types.py b/snmptrap/mod/trapd_vb_types.py
new file mode 100644
index 0000000..2d01a30
--- /dev/null
+++ b/snmptrap/mod/trapd_vb_types.py
@@ -0,0 +1,89 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+"""
+module for converting varbind types from Net-SNMP to PYSNMP
+
+- converts pysnmp vb type name to net-snmp for backward compatibility
+
+"""
+
+__docformat__ = 'restructuredtext'
+
+import json
+import os
+import sys
+import string
+import time
+import traceback
+import collections
+import pprint
+
+import trapd_settings as tds
+from trapd_exit import cleanup_and_exit
+from trapd_io import stdout_logger, ecomp_logger
+
+prog_name = os.path.basename(__file__)
+
+# # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
+# module: load_snmpv3_credentials
+# # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
+
+_pysnmp_to_netsnmp_vb_type = {
+ 'Integer' : 'integer',
+ 'Gauge32' : 'unsigned',
+ 'Counter32' : 'counter32',
+ 'OctetString' : 'octet',
+ 'py_type_5' : 'hex',
+ 'py_type_6' : 'decimal',
+ 'Null' : 'null',
+ 'ObjectIdentifier' : 'oid',
+ 'TimeTicks' : 'timeticks',
+ 'IpAddress' : 'ipaddress',
+ 'Bits' : 'bits'
+ }
+
+default_vb_type = "octet"
+
+def pysnmp_to_netsnmp_varbind_convert (_pysnmp_vb_type):
+ """
+ Convert pysnmp varbind types to Net-SNMP nomenclature
+ to maintain backward compatibilty with existing solutions
+ :Parameters:
+ _pysnmp_vb_type: varbind type as presented from pysnmp
+ API
+ :Exceptions:
+ if _pysnmp_vb_type isn't found in dictionary, return
+ type that was defined by pysnmp in original call
+ """
+
+ # lookup _pysnmp_vb_type in conversion dictionary
+ try:
+ msg = ("checking for netsnmp equiv of varbind type: %s" \
+ % _pysnmp_vb_type)
+ ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+ _netsnmp_vb_type = _pysnmp_to_netsnmp_vb_type[_pysnmp_vb_type]
+ return _netsnmp_vb_type
+ except Exception as e:
+ # if not found, return original pysnmp type
+ msg = ("%s not configured as pysnmp varbind type" \
+ % _pysnmp_vb_type)
+ ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
+ return default_vb_type
diff --git a/snmptrap/snmptrapd.py b/snmptrap/snmptrapd.py
index 70dc668..325aa55 100644
--- a/snmptrap/snmptrapd.py
+++ b/snmptrap/snmptrapd.py
@@ -74,6 +74,7 @@ from trapd_get_cbs_config import get_cbs_config
from trapd_exit import cleanup_and_exit
from trapd_http_session import init_session_obj, close_session_obj, reset_session_obj
from trapd_snmpv3 import load_snmpv3_credentials
+from trapd_vb_types import pysnmp_to_netsnmp_varbind_convert
from trapd_io import roll_all_logs, open_eelf_logs, roll_file, open_file, close_file, ecomp_logger, stdout_logger
prog_name = os.path.basename(__file__)
@@ -531,7 +532,7 @@ def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val):
_individual_vb_dict.clear()
_individual_vb_dict['varbind_oid'] = vb_oid.prettyPrint()
- _individual_vb_dict['varbind_type'] = vb_type
+ _individual_vb_dict['varbind_type'] = pysnmp_to_netsnmp_varbind_convert(vb_type)
_individual_vb_dict['varbind_value'] = vb_val.prettyPrint()
_individual_vb_json_str = json.dumps(_individual_vb_dict)
diff --git a/tests/test_trapd_vb_types.py b/tests/test_trapd_vb_types.py
new file mode 100644
index 0000000..4af9235
--- /dev/null
+++ b/tests/test_trapd_vb_types.py
@@ -0,0 +1,45 @@
+import pytest
+import json
+import unittest
+import os
+
+from onap_dcae_cbs_docker_client.client import get_config
+from trapd_exit import cleanup_and_exit
+from trapd_io import stdout_logger, ecomp_logger
+import trapd_settings as tds
+import trapd_snmpv3
+
+from pysnmp.entity import engine, config
+
+class test_trapd_vb_types(unittest.TestCase):
+ """
+ Test snmpv3 module
+ """
+
+ def trapd_vb_type_conversions(self):
+ """
+ Test that pysnmp varbind types convert to netsnmp
+ """
+ # del os.environ['CBS_SIM_JSON']
+ # result = trapd_get_cbs_config.get_cbs_config()
+ # print("result: %s" % result)
+ # compare = str(result).startswith("{'snmptrap': ")
+ # self.assertEqual(compare, False)
+
+ tds.c_config = json.loads("{ \"snmptrapd\": { \"version\": \"1.4.0\", \"title\": \"ONAP SNMP Trap Receiver\" }, \"protocols\": { \"transport\": \"udp\", \"ipv4_interface\": \"0.0.0.0\", \"ipv4_port\": 6162, \"ipv6_interface\": \"::1\", \"ipv6_port\": 6162 }, \"cache\": { \"dns_cache_ttl_seconds\": 60 }, \"publisher\": { \"http_timeout_milliseconds\": 1500, \"http_retries\": 3, \"http_milliseconds_between_retries\": 750, \"http_primary_publisher\": \"true\", \"http_peer_publisher\": \"unavailable\", \"max_traps_between_publishes\": 10, \"max_milliseconds_between_publishes\": 10000 }, \"streams_publishes\": { \"sec_fault_unsecure\": { \"type\": \"message_router\", \"aaf_password\": null, \"dmaap_info\": { \"location\": \"mtl5\", \"client_id\": null, \"client_role\": null, \"topic_url\": \"http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP\" }, \"aaf_username\": null } }, \"files\": { \"runtime_base_dir\": \"/tmp/opt/app/snmptrap\", \"log_dir\": \"logs\", \"data_dir\": \"data\", \"pid_dir\": \"tmp\", \"arriving_traps_log\": \"snmptrapd_arriving_traps.log\", \"snmptrapd_diag\": \"snmptrapd_prog_diag.log\", \"traps_stats_log\": \"snmptrapd_stats.csv\", \"perm_status_file\": \"snmptrapd_status.log\", \"eelf_base_dir\": \"/tmp/opt/app/snmptrap/logs\", \"eelf_error\": \"error.log\", \"eelf_debug\": \"debug.log\", \"eelf_audit\": \"audit.log\", \"eelf_metrics\": \"metrics.log\", \"roll_frequency\": \"day\", \"minimum_severity_to_log\": 2 }, \"trap_config\": { \"sw_interval_in_seconds\": 60, \"notify_oids\": { \".1.3.6.1.4.1.9.0.1\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.2\": { \"sw_high_water_in_interval\": 101, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.3\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.4\": { \"sw_high_water_in_interval\": 10, \"sw_low_water_in_interval\": 3, \"category\": \"logonly\" } } } }")
+
+ # del os.environ['CBS_SIM_JSON']
+ # result = trapd_get_cbs_config.get_cbs_config()
+ # print("result: %s" % result)
+ # compare = str(result).startswith("{'snmptrap': ")
+ # self.assertEqual(compare, False)
+
+ good_varbind_types = ["Integer", "Unsigned32", "Counter32", "OctetString", "ObjectIdentifier", "TimeTicks", "IpAddress"]
+ result = pysnmp_to_netsnmp_varbind_convert("Integer")
+
+ assert pytest_wrapped_sys_exit.type == SystemExit
+ self.assertEqual(result, "integer")
+
+
+if __name__ == '__main__':
+ unittest.main()