From aeae5597c7dcfa4cf432009c4668dd77aa6b84a1 Mon Sep 17 00:00:00 2001 From: "Ladue, David (dl3158)" Date: Fri, 14 Sep 2018 12:36:11 -0400 Subject: v3 changes and netsnmp compat update Change-Id: Ia69f70c5e128e800623feb7c1473b6729e52ba29 Issue-ID: DCAEGEN2-630 Signed-off-by: Ladue, David (dl3158) --- snmptrap/mod/trapd_vb_types.py | 89 ++++++++++++++++++++++++++++++++++++++++++ snmptrap/snmptrapd.py | 3 +- tests/test_trapd_vb_types.py | 45 +++++++++++++++++++++ 3 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 snmptrap/mod/trapd_vb_types.py create mode 100644 tests/test_trapd_vb_types.py diff --git a/snmptrap/mod/trapd_vb_types.py b/snmptrap/mod/trapd_vb_types.py new file mode 100644 index 0000000..2d01a30 --- /dev/null +++ b/snmptrap/mod/trapd_vb_types.py @@ -0,0 +1,89 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +""" +module for converting varbind types from Net-SNMP to PYSNMP + +- converts pysnmp vb type name to net-snmp for backward compatibility + +""" + +__docformat__ = 'restructuredtext' + +import json +import os +import sys +import string +import time +import traceback +import collections +import pprint + +import trapd_settings as tds +from trapd_exit import cleanup_and_exit +from trapd_io import stdout_logger, ecomp_logger + +prog_name = os.path.basename(__file__) + +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +# module: load_snmpv3_credentials +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + +_pysnmp_to_netsnmp_vb_type = { + 'Integer' : 'integer', + 'Gauge32' : 'unsigned', + 'Counter32' : 'counter32', + 'OctetString' : 'octet', + 'py_type_5' : 'hex', + 'py_type_6' : 'decimal', + 'Null' : 'null', + 'ObjectIdentifier' : 'oid', + 'TimeTicks' : 'timeticks', + 'IpAddress' : 'ipaddress', + 'Bits' : 'bits' + } + +default_vb_type = "octet" + +def pysnmp_to_netsnmp_varbind_convert (_pysnmp_vb_type): + """ + Convert pysnmp varbind types to Net-SNMP nomenclature + to maintain backward compatibilty with existing solutions + :Parameters: + _pysnmp_vb_type: varbind type as presented from pysnmp + API + :Exceptions: + if _pysnmp_vb_type isn't found in dictionary, return + type that was defined by pysnmp in original call + """ + + # lookup _pysnmp_vb_type in conversion dictionary + try: + msg = ("checking for netsnmp equiv of varbind type: %s" \ + % _pysnmp_vb_type) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + _netsnmp_vb_type = _pysnmp_to_netsnmp_vb_type[_pysnmp_vb_type] + return _netsnmp_vb_type + except Exception as e: + # if not found, return original pysnmp type + msg = ("%s not configured as pysnmp varbind type" \ + % _pysnmp_vb_type) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + return default_vb_type diff --git a/snmptrap/snmptrapd.py b/snmptrap/snmptrapd.py index 70dc668..325aa55 100644 --- a/snmptrap/snmptrapd.py +++ b/snmptrap/snmptrapd.py @@ -74,6 +74,7 @@ from trapd_get_cbs_config import get_cbs_config from trapd_exit import cleanup_and_exit from trapd_http_session import init_session_obj, close_session_obj, reset_session_obj from trapd_snmpv3 import load_snmpv3_credentials +from trapd_vb_types import pysnmp_to_netsnmp_varbind_convert from trapd_io import roll_all_logs, open_eelf_logs, roll_file, open_file, close_file, ecomp_logger, stdout_logger prog_name = os.path.basename(__file__) @@ -531,7 +532,7 @@ def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): _individual_vb_dict.clear() _individual_vb_dict['varbind_oid'] = vb_oid.prettyPrint() - _individual_vb_dict['varbind_type'] = vb_type + _individual_vb_dict['varbind_type'] = pysnmp_to_netsnmp_varbind_convert(vb_type) _individual_vb_dict['varbind_value'] = vb_val.prettyPrint() _individual_vb_json_str = json.dumps(_individual_vb_dict) diff --git a/tests/test_trapd_vb_types.py b/tests/test_trapd_vb_types.py new file mode 100644 index 0000000..4af9235 --- /dev/null +++ b/tests/test_trapd_vb_types.py @@ -0,0 +1,45 @@ +import pytest +import json +import unittest +import os + +from onap_dcae_cbs_docker_client.client import get_config +from trapd_exit import cleanup_and_exit +from trapd_io import stdout_logger, ecomp_logger +import trapd_settings as tds +import trapd_snmpv3 + +from pysnmp.entity import engine, config + +class test_trapd_vb_types(unittest.TestCase): + """ + Test snmpv3 module + """ + + def trapd_vb_type_conversions(self): + """ + Test that pysnmp varbind types convert to netsnmp + """ + # del os.environ['CBS_SIM_JSON'] + # result = trapd_get_cbs_config.get_cbs_config() + # print("result: %s" % result) + # compare = str(result).startswith("{'snmptrap': ") + # self.assertEqual(compare, False) + + tds.c_config = json.loads("{ \"snmptrapd\": { \"version\": \"1.4.0\", \"title\": \"ONAP SNMP Trap Receiver\" }, \"protocols\": { \"transport\": \"udp\", \"ipv4_interface\": \"0.0.0.0\", \"ipv4_port\": 6162, \"ipv6_interface\": \"::1\", \"ipv6_port\": 6162 }, \"cache\": { \"dns_cache_ttl_seconds\": 60 }, \"publisher\": { \"http_timeout_milliseconds\": 1500, \"http_retries\": 3, \"http_milliseconds_between_retries\": 750, \"http_primary_publisher\": \"true\", \"http_peer_publisher\": \"unavailable\", \"max_traps_between_publishes\": 10, \"max_milliseconds_between_publishes\": 10000 }, \"streams_publishes\": { \"sec_fault_unsecure\": { \"type\": \"message_router\", \"aaf_password\": null, \"dmaap_info\": { \"location\": \"mtl5\", \"client_id\": null, \"client_role\": null, \"topic_url\": \"http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP\" }, \"aaf_username\": null } }, \"files\": { \"runtime_base_dir\": \"/tmp/opt/app/snmptrap\", \"log_dir\": \"logs\", \"data_dir\": \"data\", \"pid_dir\": \"tmp\", \"arriving_traps_log\": \"snmptrapd_arriving_traps.log\", \"snmptrapd_diag\": \"snmptrapd_prog_diag.log\", \"traps_stats_log\": \"snmptrapd_stats.csv\", \"perm_status_file\": \"snmptrapd_status.log\", \"eelf_base_dir\": \"/tmp/opt/app/snmptrap/logs\", \"eelf_error\": \"error.log\", \"eelf_debug\": \"debug.log\", \"eelf_audit\": \"audit.log\", \"eelf_metrics\": \"metrics.log\", \"roll_frequency\": \"day\", \"minimum_severity_to_log\": 2 }, \"trap_config\": { \"sw_interval_in_seconds\": 60, \"notify_oids\": { \".1.3.6.1.4.1.9.0.1\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.2\": { \"sw_high_water_in_interval\": 101, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.3\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.4\": { \"sw_high_water_in_interval\": 10, \"sw_low_water_in_interval\": 3, \"category\": \"logonly\" } } } }") + + # del os.environ['CBS_SIM_JSON'] + # result = trapd_get_cbs_config.get_cbs_config() + # print("result: %s" % result) + # compare = str(result).startswith("{'snmptrap': ") + # self.assertEqual(compare, False) + + good_varbind_types = ["Integer", "Unsigned32", "Counter32", "OctetString", "ObjectIdentifier", "TimeTicks", "IpAddress"] + result = pysnmp_to_netsnmp_varbind_convert("Integer") + + assert pytest_wrapped_sys_exit.type == SystemExit + self.assertEqual(result, "integer") + + +if __name__ == '__main__': + unittest.main() -- cgit 1.2.3-korg