From 9c9a86fe301431ca353f8c249164a854db9399e0 Mon Sep 17 00:00:00 2001 From: "Ladue, David (dl3158)" Date: Fri, 14 Feb 2020 12:02:30 -0500 Subject: copyright banner changes Change-Id: I64f089e36fb8b21ed1719696ceeded1590ba8f8c Signed-off-by: Ladue, David (dl3158) Issue-ID: DCAEGEN2-2068 Signed-off-by: Ladue, David (dl3158) --- snmptrap/snmptrapd.py | 405 +++++++++++++++++++++++++++++++------------------- 1 file changed, 253 insertions(+), 152 deletions(-) (limited to 'snmptrap/snmptrapd.py') diff --git a/snmptrap/snmptrapd.py b/snmptrap/snmptrapd.py index 8a824ef..5f46f34 100644 --- a/snmptrap/snmptrapd.py +++ b/snmptrap/snmptrapd.py @@ -1,7 +1,5 @@ -# ============LICENSE_START=======================================================) -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# ============LICENSE_START======================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,11 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -# """ -dcae_snmptrapd is responsible for SNMP trap receipt and publishing activities. +snmptrapd is responsible for SNMP trap receipt and publishing activities. It's behavior is controlled by CBS (config binding service) using a JSON construct obtained via a "get_config" call or (for testing/standalone purposes) a file specified using the env variable "CBS_SIM_JSON". @@ -28,7 +23,7 @@ As traps arrive they are decomposed and transformed into a JSON message which is published to a dmaap instance that has been defined by controller. :Parameters: - usage: snmptrapd.py + usage: snmptrapd.py [-v] :Keywords: onap dcae snmp trap publish dmaap """ @@ -67,15 +62,23 @@ from pysnmp.entity.rfc3413 import ntfrcv from pysnmp.proto.api import v2c from pysnmp import debug -# dcae_snmptrap +# snmptrap import trapd_settings as tds + from trapd_runtime_pid import save_pid, rm_pid from trapd_get_cbs_config import get_cbs_config from trapd_exit import cleanup_and_exit -from trapd_http_session import init_session_obj, close_session_obj, reset_session_obj +from trapd_http_session import init_session_obj, close_session_obj,\ + reset_session_obj from trapd_snmpv3 import load_snmpv3_credentials from trapd_vb_types import pysnmp_to_netsnmp_varbind_convert -from trapd_io import roll_all_logs, open_eelf_logs, roll_file, open_file, close_file, ecomp_logger, stdout_logger +from trapd_io import roll_all_logs, open_eelf_logs, roll_file, open_file,\ + close_file, ecomp_logger, stdout_logger + +import trapd_stormwatch_settings as sws +import trapd_stormwatch as stormwatch + +import trapd_stats_settings as stats prog_name = os.path.basename(__file__) verbose = False @@ -97,7 +100,7 @@ def usage_err(): """ print('Incorrect usage invoked. Correct usage:') - print(' %s' % prog_name) + print(' %s [-v]' % prog_name) cleanup_and_exit(1, "undefined") @@ -126,38 +129,83 @@ def load_all_configs(_signum, _frame): if int(_signum) != 0: msg = ("received signal %s at frame %s; re-reading configs" % (_signum, _frame)) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) - # re-request config (from broker, or local json file - # if broker not present) + # re-request config from broker: if not get_cbs_config(): msg = "Error (re)loading CBS config - FATAL ERROR, exiting" stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) else: - current_runtime_config_file_name = tds.c_config['files']['runtime_base_dir'] + \ - "/tmp/current_config.json" + current_runtime_config_file_name = ( + tds.c_config['files']['runtime_base_dir'] + + "/tmp/current_config.json") if int(_signum) != 0: - msg = "updated config logged to : %s" % current_runtime_config_file_name + msg = "updated config logged to : %s" % \ + current_runtime_config_file_name else: - msg = "current config logged to : %s" % current_runtime_config_file_name - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) + msg = "current config logged to : %s" % \ + current_runtime_config_file_name + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, + msg) - with open(current_runtime_config_file_name, 'w') as outfile: - json.dump(tds.c_config, outfile) + with open(current_runtime_config_file_name, 'w') as outfile: + json.dump(tds.c_config, outfile) # reset http session based on latest config tds.http_requ_session = reset_session_obj(tds.http_requ_session) - # FMDL: add with stormWatch # reload sw participating entries, reset counter dictionary - # sw.interval_in_seconds, sw.participant_oid_dict = load_sw_participant_dict(tds.c_config['trap_config']) - # sw.counter_dict = init_counter_dict() + traps_configured = stormwatch.sw_load_trap_config(tds.c_config) + msg = "encountered %d trap configurations in CBS/json config" % \ + traps_configured + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, + msg) - # if here, config re-read successfully + tds.last_minute = datetime.datetime.now().minute + tds.last_hour = datetime.datetime.now().hour + tds.last_day = datetime.datetime.now().day + + # if here, configs re-read successfully return True + +# # # # # # # # # # # # # +# fx: resolve_ip +# # # # # # # # # # # # # +def resolve_ip(_loc_ip_addr_str): + + try: + if int(tds.dns_cache_ip_expires[_loc_ip_addr_str] < int(time.time())): + raise Exception('cache expired for %s at %d - updating value' % + (_loc_ip_addr_str, + (tds.dns_cache_ip_expires[_loc_ip_addr_str]))) + else: + agent_fqdn = tds.dns_cache_ip_to_name[_loc_ip_addr_str] + + except Exception as e: + msg = "dns cache expired or missing for %s - refreshing" % \ + _loc_ip_addr_str + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, + tds.CODE_GENERAL, msg) + try: + agent_fqdn, alias, addresslist = socket.gethostbyaddr( + _loc_ip_addr_str) + except Exception as e: + agent_fqdn = _loc_ip_addr_str + + tds.dns_cache_ip_to_name[_loc_ip_addr_str] = agent_fqdn + tds.dns_cache_ip_expires[_loc_ip_addr_str] = ( + time.time() + int(tds.c_config['cache']['dns_cache_ttl_seconds'])) + msg = "cache for %s (%s) updated - set to expire at %d" % \ + (agent_fqdn, _loc_ip_addr_str, + tds.dns_cache_ip_expires[_loc_ip_addr_str]) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, + tds.CODE_GENERAL, msg) + + return agent_fqdn + # # # # # # # # # # # # # # fx: log_all_arriving_traps # # # # # # # # # # # # # @@ -185,29 +233,29 @@ def log_all_arriving_traps(): # always log arriving trap try: - # going for: - # 1520971776 Tue Mar 13 16:09:36 2018; 1520971776 2018-03-13 16:09:36 DCAE-COLLECTOR-UCSNMP 15209717760049 .1.3.6.1.4.1.2636.4.1.6 gfpmt5pcs10.oss.att.com 135.91.10.139 12.123.1.240 12.123.1.240 2 varbinds: [0] .1.3.6.1.2.1.1.3.0 {10} 1212058366 140 days, 6:49:43.66 [1] .1.3.6.1.6.3.1.1.4.1.0 {6} .1.3.6.1.4.1.2636.4.1.6 [2] .1.3.6.1.4.1.2636.3.1.15.1.1.2.4.0.0 {2} 2 [3] .1.3.6.1.4.1.2636.3.1.15.1.2.2.4.0.0 {2} 4 [4] .1.3.6.1.4.1.2636.3.1.15.1.3.2.4.0.0 {2} 0 [5] .1.3.6.1.4.1.2636.3.1.15.1.4.2.4.0.0 {2} 0 [6] .1.3.6.1.4.1.2636.3.1.15.1.5.2.4.0.0 {4} PEM 3 [7] .1.3.6.1.4.1.2636.3.1.15.1.6.2.4.0.0 {2} 7 [8] .1.3.6.1.4.1.2636.3.1.15.1.7.2.4.0.0 {2} 4 [9] .1.3.6.1.6.3.18.1.3.0 {7} 12.123.1.240 - - tds.arriving_traps_fd.write('%s %s; %s %s %s %s %s %s %s %s %s %s\n' % - (tds.trap_dict["time received"], - time.strftime( - "%a %b %d %H:%M:%S %Y", time.localtime(time.time())), - time.strftime("%a %b %d %H:%M:%S %Y", time.localtime( - tds.trap_dict["time received"])), - tds.trap_dict["trap category"], - tds.trap_dict["epoch_serno"], - tds.trap_dict["notify OID"], - tds.trap_dict["agent name"], - tds.trap_dict["agent address"], - tds.trap_dict["cambria.partition"], - tds.trap_dict["protocol version"], - tds.trap_dict["uuid"], - tds.all_vb_json_str)) + time_now=int(round(time.time(),0)) + arrived_epoch_time_int=int(tds.trap_dict["time received"]) + tds.arriving_traps_fd.write('%d %s; %s %s %s %s %s %s %s %s %s %s %s %s %s\n' % + (time_now, + datetime.datetime.fromtimestamp(time_now).strftime("%a %b %d %H:%M:%S %Y"), + tds.trap_dict["time received"], + datetime.datetime.fromtimestamp(arrived_epoch_time_int).strftime("%a %b %d %H:%M:%S %Y"), + tds.trap_dict["trap category"], + tds.trap_dict["epoch_serno"], + tds.trap_dict["notify OID"], + tds.trap_dict["agent name"], + tds.trap_dict["agent address"], + tds.trap_dict["pdu agent name"], + tds.trap_dict["pdu agent address"], + tds.trap_dict["cambria.partition"], + tds.trap_dict["protocol version"], + tds.trap_dict["uuid"], + tds.all_vb_str)) except Exception as e: msg = "Error writing to %s : %s - arriving trap %s NOT LOGGED" % ( tds.arriving_traps_filename, str(e), tds.trap_dict["uuid"]) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_CRIT, tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg) # # # # # # # # # # # # # @@ -227,12 +275,12 @@ def log_published_messages(_post_data_enclosed): tds.json_traps_fd.write('%s\n' % _post_data_enclosed) msg = "successfully logged json for %s to %s" % ( tds.trap_dict["uuid"], tds.json_traps_filename) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) except Exception as e: msg = "Error writing to %s : %s - trap %s NOT LOGGED" % ( tds.json_traps_filename, str(e), tds.trap_dict["uuid"]) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_CRIT, tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg) # # # # # # # # # # # # # @@ -255,19 +303,19 @@ def post_dmaap(): if tds.http_requ_session is None: msg = "tds.http_requ_session is None - getting new (%s)" % tds.http_requ_session - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) tds.http_requ_session = init_session_obj() # if only 1 trap, ship as-is if tds.traps_since_last_publish == 1: - post_data_enclosed = tds.all_traps_str + post_data_enclosed = tds.all_traps_json_str else: # otherwise, add brackets around package - post_data_enclosed = '[' + tds.all_traps_str + ']' + post_data_enclosed = '[' + tds.all_traps_json_str + ']' msg = "post_data_enclosed: %s" % (post_data_enclosed) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) k = 0 dmaap_pub_success = False @@ -277,7 +325,7 @@ def post_dmaap(): if tds.c_config['streams_publishes']['sec_fault_unsecure']['aaf_username'] == "" or tds.c_config['streams_publishes']['sec_fault_unsecure']['aaf_username'] is None: msg = "%d trap(s) : %s - attempt %d (unsecure)" % ( tds.traps_since_last_publish, tds.trap_uuids_in_buffer, k) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) http_resp = tds.http_requ_session.post(tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url'], post_data_enclosed, headers=http_headers, @@ -285,7 +333,7 @@ def post_dmaap(): else: msg = "%d trap(s) : %s - attempt %d (secure)" % ( tds.traps_since_last_publish, tds.trap_uuids_in_buffer, k) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) http_resp = tds.http_requ_session.post(tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url'], post_data_enclosed, auth=(tds.c_config['streams_publishes']['sec_fault_unsecure']['aaf_username'], @@ -309,7 +357,7 @@ def post_dmaap(): tds.CODE_GENERAL, msg) except OSError as e: - msg = "OS exception while attempting to post %s attempt %s: (%s) %s %s" % ( + msg = "OS exception while attempting to post %s attempt %d: (%s) %s %s" % ( tds.trap_uuids_in_buffer, k, e.errno, e.strerror, str(e)) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) @@ -322,10 +370,10 @@ def post_dmaap(): k += 1 - if k < tds.c_config['publisher']['http_retries']: + if k < int(tds.c_config['publisher']['http_retries']): msg = "sleeping %.4f seconds and retrying" % ( tds.seconds_between_retries) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) time.sleep(tds.seconds_between_retries) else: @@ -334,12 +382,12 @@ def post_dmaap(): if not dmaap_pub_success: msg = "ALL publish attempts failed for traps %s to URL %s "\ % (tds.trap_uuids_in_buffer, tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url']) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_CRIT, tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg) # FMDL: This currently tries, then logs error and trashes buffer if all dmaap attempts fail. Better way? tds.traps_since_last_publish = 0 tds.trap_uuids_in_buffer = "" - tds.all_traps_str = "" + tds.all_traps_json_str = "" tds.first_trap = True # # # # # # # # # # # # # # # # # # # @@ -410,6 +458,7 @@ def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): tds.traps_in_epoch = 0 tds.last_epoch_second = epoch_second traps_in_epoch_04d = format(tds.traps_in_epoch, '04d') + tds.trap_dict['epoch_arrived'] = epoch_second tds.trap_dict['epoch_serno'] = int( (str(epoch_second) + str(traps_in_epoch_04d))) @@ -418,36 +467,22 @@ def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): # ip and hostname ip_addr_str = str(variables['transportAddress'][0]) + # set agent address and name to source of packet, OVERWRITE if + # .1.3.6.1.6.3.18.1.3.0 varbind encountered later in trap processing tds.trap_dict["agent address"] = ip_addr_str + tds.trap_dict["agent name"] = resolve_ip(ip_addr_str) + # set overridden/logical address and name to source of packet so we know + # original value if .1.3.6.1.6.3.18.1.3.0 shows up + # NOTE: This does NOT change ever, label may change to + # "overridden agent..." in the future for truth in nameing + tds.trap_dict["pdu agent address"] = tds.trap_dict["agent address"] + tds.trap_dict["pdu agent name"] = tds.trap_dict["agent name"] + + # log arrival now that we have agent addr + msg = 'trap from %s %s, assigned uuid: %s' % \ + (ip_addr_str, tds.trap_dict["agent name"], tds.trap_dict["uuid"]) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) - msg = 'snmp trap arrived from %s, assigned uuid: %s' % \ - (ip_addr_str, tds.trap_dict["uuid"]) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, tds.CODE_GENERAL, msg) - - try: - if int(tds.dns_cache_ip_expires[ip_addr_str] < int(time.time())): - raise Exception('cache expired for %s at %d - updating value' % - (ip_addr_str, (tds.dns_cache_ip_expires[ip_addr_str]))) - else: - tds.trap_dict["agent name"] = tds.dns_cache_ip_to_name[ip_addr_str] - except Exception as e: - msg = "dns cache expired or missing for %s - refreshing" % ip_addr_str - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, - tds.CODE_GENERAL, msg) - try: - agent_fqdn, alias, addresslist = socket.gethostbyaddr(ip_addr_str) - except Exception as e: - agent_fqdn = ip_addr_str - - tds.trap_dict["agent name"] = agent_fqdn - - tds.dns_cache_ip_to_name[ip_addr_str] = agent_fqdn - tds.dns_cache_ip_expires[ip_addr_str] = ( - time.time() + tds.c_config['cache']['dns_cache_ttl_seconds']) - msg = "cache for %s (%s) updated - set to expire at %d" % \ - (agent_fqdn, ip_addr_str, tds.dns_cache_ip_expires[ip_addr_str]) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, - tds.CODE_GENERAL, msg) tds.trap_dict["cambria.partition"] = str(tds.trap_dict["agent name"]) # do not include cleartext community in pub @@ -491,6 +526,42 @@ def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): # # # # # # # # # # # # # # # # # # # # fx: request_observer for community string rewrite # # # # # # # # # # # # # # # # # # # + +def add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val): + """ + Called for each varbind, adds individual attributes of varbind instance to + all_vb_str for logging. + :Parameters: + vb_idx + index to specific varbind being processed + vb_oid + the varbind oid + vb_type + the varbind type + vb_val + the value of the varbind + :Exceptions: + none + :Keywords: + varbind extract log + :Variables: + """ + + if vb_idx == 0: + tds.all_vb_str = 'varbinds:' + + tds.all_vb_str = tds.all_vb_str + " [" + str(vb_idx) + "] " \ + + str(vb_oid) + " {" + vb_type + "} " + str(vb_val.prettyPrint()) + + # try: + # tds.all_vb_str = tds.all_vb_str + " [" + str(vb_idx) + "] " + vb_oid + " {" + vb_type + "} " + vb_val + # return 0 + # except Exception as e: + # msg = "unable to add varbind to log string: %s" % (str(e)) + # stdout_logger(msg) + # ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) + # return 1 + def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): """ Called for each varbind, adds individual attributes of varbind instance to @@ -509,6 +580,7 @@ def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): :Variables: """ + agent_override_oid = ".1.3.6.1.6.3.18.1.3.0" _individual_vb_dict = {} # if first varbind (sysUptime), always return immediately as @@ -516,10 +588,21 @@ def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): if vb_idx == 0: return 0 + _vb_oid = "." + str(vb_oid.prettyPrint()) + _vb_value = vb_val.prettyPrint() + _vb_type = pysnmp_to_netsnmp_varbind_convert(vb_type) + # if second varbind, use as notifyOID for all snmp versions if vb_idx == 1: - tds.trap_dict["notify OID"] = "." + str(vb_val.prettyPrint()) - tds.trap_dict["notify OID len"] = tds.trap_dict["notify OID"].count('.') + tds.trap_dict["notify OID"] = "." + _vb_value + tds.trap_dict["notify OID len"] = tds.trap_dict["notify OID"].count( + '.') + return 0 + + # if override varbind OID, use value as agent address + if _vb_oid == agent_override_oid: + tds.trap_dict["agent address"] = _vb_value + tds.trap_dict["agent name"] = resolve_ip(_vb_value) return 0 # for SNMPv1 traps, skip varbinds 2, 3 and 4: @@ -529,7 +612,7 @@ def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): if tds.trap_dict["protocol version"] == "v1": if vb_idx < 5: return 0 - + if tds.first_varbind: tds.all_vb_json_str = ', \"varbinds\": [' tds.first_varbind = False @@ -537,9 +620,9 @@ def add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val): tds.all_vb_json_str = tds.all_vb_json_str + " ," _individual_vb_dict.clear() - _individual_vb_dict['varbind_oid'] = "." + vb_oid.prettyPrint() - _individual_vb_dict['varbind_type'] = pysnmp_to_netsnmp_varbind_convert(vb_type) - _individual_vb_dict['varbind_value'] = vb_val.prettyPrint() + _individual_vb_dict['varbind_oid'] = _vb_oid + _individual_vb_dict['varbind_type'] = _vb_type + _individual_vb_dict['varbind_value'] = _vb_value _individual_vb_json_str = json.dumps(_individual_vb_dict) @@ -570,7 +653,7 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, :Variables: """ msg = "processing varbinds for %s" % (tds.trap_dict["uuid"]) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) # help(snmp_engine) # print(snmp_engine) @@ -582,16 +665,20 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, # print(key, val) # FMDL update reset location when batching publishes - pdu_varbinds = 0 + pdu_varbind_count = 0 payload_varbinds = 0 tds.all_vb_json_str = "" + tds.all_vb_str = " varbinds:" tds.first_varbind = True # iterate over varbinds, add to json struct for vb_oid, vb_val in varBinds: - varbinds_added = add_varbind_to_json(pdu_varbinds, vb_oid, vb_val.__class__.__name__, vb_val) + log_ret = add_varbind_to_log_string( + pdu_varbind_count, vb_oid, vb_val.__class__.__name__, vb_val) + varbinds_added = add_varbind_to_json( + pdu_varbind_count, vb_oid, vb_val.__class__.__name__, vb_val) payload_varbinds += varbinds_added - pdu_varbinds += 1 + pdu_varbind_count += 1 curr_trap_json_str = json.dumps(tds.trap_dict) # now have everything except varbinds in "curr_trap_json_str" @@ -611,7 +698,7 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, curr_trap_json_str = curr_trap_json_str + '}' msg = "trap %s : %s" % (tds.trap_dict["uuid"], curr_trap_json_str) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) # always log arriving traps log_all_arriving_traps() @@ -620,29 +707,41 @@ def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, tds.traps_since_last_publish += 1 milliseconds_since_last_publish = (time.time() - tds.last_pub_time) * 1000 - msg = "adding %s to buffer" % (tds.trap_dict["uuid"]) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, tds.CODE_GENERAL, msg) - if tds.first_trap: - tds.all_traps_str = curr_trap_json_str - tds.trap_uuids_in_buffer = tds.trap_dict["uuid"] - tds.first_trap = False + # only add to publish buffer if stormwatch is NOT active + if stormwatch.sw_storm_active(tds.trap_dict["agent address"], tds.trap_dict["notify OID"]): + msg = "stormwatch active - deflecting notification %s from %s" % ( + tds.trap_dict["notify OID"], tds.trap_dict["agent address"]) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, + tds.CODE_GENERAL, msg) else: - tds.trap_uuids_in_buffer = tds.trap_uuids_in_buffer + \ - ', ' + tds.trap_dict["uuid"] - tds.all_traps_str = tds.all_traps_str + ', ' + curr_trap_json_str + msg = "adding %s to buffer" % (tds.trap_dict["uuid"]) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, + tds.CODE_GENERAL, msg) + if tds.first_trap: + tds.all_traps_json_str = curr_trap_json_str + tds.trap_uuids_in_buffer = tds.trap_dict["uuid"] + tds.first_trap = False + else: + tds.trap_uuids_in_buffer = tds.trap_uuids_in_buffer + \ + ', ' + tds.trap_dict["uuid"] + tds.all_traps_json_str = tds.all_traps_json_str + ', ' + curr_trap_json_str - # publish to dmaap after last varbind is processed - if tds.traps_since_last_publish >= tds.c_config['publisher']['max_traps_between_publishes']: + if tds.traps_since_last_publish >= int(tds.c_config['publisher']['max_traps_between_publishes']): msg = "num traps since last publish (%d) exceeds threshold (%d) - publish traps" % ( - tds.traps_since_last_publish, tds.c_config['publisher']['max_traps_between_publishes']) - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, + tds.traps_since_last_publish, int(tds.c_config['publisher']['max_traps_between_publishes'])) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) post_dmaap() - elif milliseconds_since_last_publish >= tds.c_config['publisher']['max_milliseconds_between_publishes']: + elif milliseconds_since_last_publish >= int(tds.c_config['publisher']['max_milliseconds_between_publishes']): msg = "num milliseconds since last publish (%.0f) exceeds threshold - publish traps" % milliseconds_since_last_publish - ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_DETAILED, + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) post_dmaap() + else: + msg = "neither milliseconds_since_last_publish (%.0f) or traps_since_last_publish (%d) exceed threshold - continue" % ( + milliseconds_since_last_publish, tds.traps_since_last_publish) + ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, + tds.CODE_GENERAL, msg) # # # # # # # # # # # # # @@ -658,37 +757,39 @@ if __name__ == "__main__": help="verbose logging") parser.add_argument('-?', action="store_true", dest="usage_requested", help="show command line use") - + # parse args args = parser.parse_args() - + # set vars from args verbose = args.verbose usage_requested = args.usage_requested - + # if usage, just display and exit if usage_requested: usage_err() - + # init vars tds.init() - + sws.init() + stats.init() + # FMDL: add with stormWatch # init sw vars - # sw.init() - + stormwatch.sw_init() + # Set initial startup hour for rolling logfile tds.last_hour = datetime.datetime.now().hour - + # get config binding service (CBS) values (either broker, or json file override) load_all_configs(0, 0) msg = "%s : %s version %s starting" % ( prog_name, tds.c_config['snmptrapd']['title'], tds.c_config['snmptrapd']['version']) stdout_logger(msg) - + # open various ecomp logs open_eelf_logs() - + # bump up logging level if overridden at command line if verbose: msg = "WARNING: '-v' argument present. All diagnostic messages will be logged. This can slow things down, use only when needed." @@ -698,7 +799,6 @@ if __name__ == "__main__": # debug.setLogger(debug.Debug('dsp', 'msgproc')) debug.setLogger(debug.Debug('all')) - # name and open arriving trap log tds.arriving_traps_filename = tds.c_config['files']['runtime_base_dir'] + "/" + \ tds.c_config['files']['log_dir'] + "/" + \ @@ -707,7 +807,7 @@ if __name__ == "__main__": msg = ("arriving traps logged to: %s" % tds.arriving_traps_filename) stdout_logger(msg) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) - + # name and open json trap log tds.json_traps_filename = tds.c_config['files']['runtime_base_dir'] + "/" + tds.c_config['files']['log_dir'] + "/" + "DMAAP_" + ( tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url'].split('/')[-1]) + ".json" @@ -715,38 +815,38 @@ if __name__ == "__main__": msg = ("published traps logged to: %s" % tds.json_traps_filename) stdout_logger(msg) ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) - + # setup signal handling for config reload signal.signal(signal.SIGUSR1, load_all_configs) - + # save current PID for future/external reference tds.pid_file_name = tds.c_config['files']['runtime_base_dir'] + \ '/' + tds.c_config['files']['pid_dir'] + '/' + prog_name + ".pid" msg = "Runtime PID file: %s" % tds.pid_file_name ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) rc = save_pid(tds.pid_file_name) - + # Get the event loop for this thread loop = asyncio.get_event_loop() - + # Create SNMP engine with autogenerated engineID pre-bound # to socket transport dispatcher snmp_engine = engine.SnmpEngine() - + # # # # # # # # # # # # # Transport setup # # # # # # # # # # # # - + # UDP over IPv4 try: ipv4_interface = tds.c_config['protocols']['ipv4_interface'] - ipv4_port = tds.c_config['protocols']['ipv4_port'] - + ipv4_port = int(tds.c_config['protocols']['ipv4_port']) + try: # FIXME: this doesn't appear to throw an exception even if # the userID is unable (perms) to bind to port # - # We may need to open raw port using other + # We may need to open raw port using other # means to confirm proper privileges (then # close it and reopen w/ pysnmp api) config.addTransport( @@ -756,23 +856,23 @@ if __name__ == "__main__": (ipv4_interface, ipv4_port)) ) except Exception as e: - msg = "Unable to bind to %s:%s - %s" % ( + msg = "Unable to bind to %s:%d - %s" % ( ipv4_interface, ipv4_port, str(e)) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, + tds.CODE_GENERAL, msg) stdout_logger(msg) cleanup_and_exit(1, tds.pid_file_name) - + except Exception as e: msg = "IPv4 interface and/or port not specified in config - not listening for IPv4 traps" stdout_logger(msg) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) - - + # UDP over IPv6 try: ipv6_interface = tds.c_config['protocols']['ipv6_interface'] - ipv6_port = tds.c_config['protocols']['ipv6_port'] - + ipv6_port = int(tds.c_config['protocols']['ipv6_port']) + try: config.addTransport( snmp_engine, @@ -781,27 +881,27 @@ if __name__ == "__main__": (ipv6_interface, ipv6_port)) ) except Exception as e: - msg = "Unable to bind to %s:%s - %s" % ( + msg = "Unable to bind to %s:%d - %s" % ( ipv6_interface, ipv6_port, str(e)) stdout_logger(msg) - ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, tds.CODE_GENERAL, msg) + ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_FATAL, + tds.CODE_GENERAL, msg) cleanup_and_exit(1, tds.pid_file_name) - + except Exception as e: msg = "IPv6 interface and/or port not specified in config - not listening for IPv6 traps" stdout_logger(msg) ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_WARN, tds.CODE_GENERAL, msg) - - + # # # # # # # # # # # # # SNMPv1/2c setup # # # # # # # # # # # # - + # SecurityName <-> CommunityName mapping # to restrict trap reception to only those with specific community # strings config.addV1System(snmp_engine, 'my-area', 'public') - + # register comm_string_rewrite_observer for message arrival snmp_engine.observer.registerObserver( comm_string_rewrite_observer, @@ -811,7 +911,8 @@ if __name__ == "__main__": # # # # # # # # # # # # # SNMPv3 setup # # # # # # # # # # # # - config, snmp_engine=load_snmpv3_credentials(config, snmp_engine, tds.c_config) + config, snmp_engine = load_snmpv3_credentials( + config, snmp_engine, tds.c_config) # register snmp_engine_observer_cb for message arrival snmp_engine.observer.registerObserver( @@ -819,12 +920,12 @@ if __name__ == "__main__": 'rfc3412.receiveMessage:request', 'rfc3412.returnResponsePdu', ) - + # Register SNMP Application at the SNMP engine ntfrcv.NotificationReceiver(snmp_engine, notif_receiver_cb) - + snmp_engine.transportDispatcher.jobStarted(1) # loop forever - + # Run I/O dispatcher which will receive traps try: snmp_engine.transportDispatcher.runDispatcher() -- cgit 1.2.3-korg