From 4b6b34642374103b8ccf938eb9e970232bfb63ae Mon Sep 17 00:00:00 2001 From: "Hansen, Tony (th1395)" Date: Wed, 17 Aug 2022 21:36:24 +0000 Subject: CodeCoverage improvement (60% to 90%) Change-Id: I579e9d3fedbd6cc2589d189b121fa7dadfb6f7f1 Signed-off-by: Hansen, Tony (th1395) Issue-ID: DCAEGEN2-3158 Signed-off-by: Hansen, Tony (th1395) --- snmptrap/snmptrapd.py | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) (limited to 'snmptrap/snmptrapd.py') diff --git a/snmptrap/snmptrapd.py b/snmptrap/snmptrapd.py index ddb6b32..d8d86e5 100644 --- a/snmptrap/snmptrapd.py +++ b/snmptrap/snmptrapd.py @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (c) 2017-2021 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2022 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -167,6 +167,7 @@ def load_all_configs(_signum, _frame): def resolve_ip(_loc_ip_addr_str): try: + if int(tds.dns_cache_ip_expires[_loc_ip_addr_str] < int(time.time())): raise Exception( "cache expired for %s at %d - updating value" @@ -515,20 +516,21 @@ def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): # tds.trap_dict["notify OID"] = "." + str(enterprise) + ".0." + str(specific_trap) # tds.trap_dict["notify OID len"] = tds.trap_dict["notify OID"].count('.') # tds.trap_dict["sysUptime"] = variables['pdu']['time-stamp'].prettyPrint() + + elif snmp_version == 2: + tds.trap_dict["protocol version"] = "v2c" + + elif snmp_version == 3: + tds.trap_dict["protocol version"] = "v3" + # tds.trap_dict["security level"] = str(variables['securityLevel']) + # tds.trap_dict["context name"] = str( + # variables['contextName'].prettyPrint()) + # tds.trap_dict["security name"] = str(variables['securityName']) + # tds.trap_dict["security engine"] = str( + # variables['contextEngineId'].prettyPrint()) + else: - if snmp_version == 2: - tds.trap_dict["protocol version"] = "v2c" - else: - if snmp_version == 3: - tds.trap_dict["protocol version"] = "v3" - # tds.trap_dict["security level"] = str(variables['securityLevel']) - # tds.trap_dict["context name"] = str( - # variables['contextName'].prettyPrint()) - # tds.trap_dict["security name"] = str(variables['securityName']) - # tds.trap_dict["security engine"] = str( - # variables['contextEngineId'].prettyPrint()) - else: - tds.trap_dict["protocol version"] = "unknown" + tds.trap_dict["protocol version"] = "unknown" # tds.trap_dict['time received'] = epoch_msecond tds.trap_dict["time received"] = epoch_second @@ -672,15 +674,6 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, msg = "processing varbinds for %s" % (tds.trap_dict["uuid"]) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) - # help(snmp_engine) - # print(snmp_engine) - # help(varBinds) - # print(varBinds) - # help(cbCtx) - # print(cbCtx) - # for key, val in cbCtx: - # print(key, val) - # FMDL update reset location when batching publishes pdu_varbind_count = 0 payload_varbinds = 0 @@ -735,6 +728,7 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, if tds.first_trap: tds.all_traps_json_str = curr_trap_json_str tds.trap_uuids_in_buffer = tds.trap_dict["uuid"] + tds.first_trap = False else: tds.trap_uuids_in_buffer = tds.trap_uuids_in_buffer + ", " + tds.trap_dict["uuid"] -- cgit 1.2.3-korg