From ba9a312ec831588c579ef08d8bc1783a0569df75 Mon Sep 17 00:00:00 2001 From: "Ladue, David (dl3158)" Date: Tue, 23 Jan 2018 10:25:08 -0500 Subject: new controller Change-Id: Ib53332f702d2f5aa19ea6044f9ce02c9167f5c61 Issue-ID: DCAEGEN2-271 Signed-off-by: Ladue, David (dl3158) --- LICENSE.txt | 70 ++-- README.md | 4 +- etc/trap.conf | 3 + etc/trapd.yaml | 34 ++ etc/trapd_logging.yaml | 40 ++ logs/.blank | 0 setup.py | 68 +++ src/dcae_snmptrapd.py | 933 +++++++++++++++++++++--------------------- src/dcae_snmptrapd.sh | 67 ++- src/mod/trapd_dcae_logger.py | 68 +++ src/mod/trapd_dmaap_config.py | 104 +++++ src/mod/trapd_exit.py | 63 +++ src/mod/trapd_http_session.py | 62 +++ src/mod/trapd_perm_status.py | 61 +++ src/mod/trapd_runtime_pid.py | 91 ++++ src/mod/trapd_trap_config.py | 98 +++++ src/mod/trapd_yaml_config.py | 193 +++++++++ 17 files changed, 1427 insertions(+), 532 deletions(-) create mode 100644 etc/trap.conf create mode 100644 etc/trapd.yaml create mode 100644 etc/trapd_logging.yaml create mode 100644 logs/.blank create mode 100644 setup.py create mode 100644 src/mod/trapd_dcae_logger.py create mode 100644 src/mod/trapd_dmaap_config.py create mode 100644 src/mod/trapd_exit.py create mode 100644 src/mod/trapd_http_session.py create mode 100644 src/mod/trapd_perm_status.py create mode 100644 src/mod/trapd_runtime_pid.py create mode 100644 src/mod/trapd_trap_config.py create mode 100644 src/mod/trapd_yaml_config.py diff --git a/LICENSE.txt b/LICENSE.txt index 69d5fc1..22d3915 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,40 +1,30 @@ -/* -* ============LICENSE_START========================================== -* =================================================================== -* Copyright © 2017 AT&T Intellectual Property. All rights reserved. -* =================================================================== -* -* Unless otherwise specified, all software contained herein is licensed -* under the Apache License, Version 2.0 (the “License”); -* you may not use this software except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* -* -* Unless otherwise specified, all documentation contained herein is licensed -* under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -* you may not use this documentation except in compliance with the License. -* You may obtain a copy of the License at -* -* https://creativecommons.org/licenses/by/4.0/ -* -* Unless required by applicable law or agreed to in writing, documentation -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* ============LICENSE_END============================================ -* -* ECOMP is a trademark and service mark of AT&T Intellectual Property. -* -*/ - +============LICENSE_START======================================================= +org.onap.dcae +================================================================================ +Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +================================================================================ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +=================================================================== +=================================================================== +Licensed under the Creative Commons License, Attribution 4.0 Intl. (the "License"); +you may not use this documentation except in compliance with the License. +You may obtain a copy of the License at + https://creativecommons.org/licenses/by/4.0/ + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END========================================================= +ECOMP is a trademark and service mark of AT&T Intellectual Property. diff --git a/README.md b/README.md index f85232d..bb79a0f 100644 --- a/README.md +++ b/README.md @@ -16,5 +16,7 @@ mvn clean install ### to be completed -The application is bundled into a docker image installed by the DCAE Controller. Following is the process to creating the image +The application is bundled into a docker image installed by the DCAE Controller. Following is the process to creating the image: + + diff --git a/etc/trap.conf b/etc/trap.conf new file mode 100644 index 0000000..56c6c3a --- /dev/null +++ b/etc/trap.conf @@ -0,0 +1,3 @@ +# +.1.3.6.1.4.1.74.2.46.12.1.1 DCAE-SNMP-TRAPS +* DCAE-SNMP-TRAPS diff --git a/etc/trapd.yaml b/etc/trapd.yaml new file mode 100644 index 0000000..06e1323 --- /dev/null +++ b/etc/trapd.yaml @@ -0,0 +1,34 @@ +snmptrap: '2.0' +info: + version: 2.1 + title: ONAP SNMP Trap Receiver + +protocol: + transport: udp + ipv4_interface: 0.0.0.0 + ipv4_port: 6164 + ipv6_interface: ::1 + ipv6_port: 6164 + dns_cache_ttl_seconds: 60 + +files: + runtime_base_dir: /opt/app/snmptrap + log_dir: /opt/app/snmptrap/logs + data_dir: /opt/app/snmptrap/data + pid_dir: /var/tmp + snmptrapd_diag: /opt/app/snmptrap/logs/dcae_snmptrap.log + trap_conf: /opt/app/snmptrap/etc/trap.conf + raw_traps_log: /opt/app/snmptrap/logs/trapd.log + published_traps_dir: /opt/app/snmptrap/logs + trap_stats_log: /opt/app/snmptrap/logs/trapd_stats.csv + perm_status_file: /opt/app/snmptrap/logs/trapd.perm_status.log + +dmaap: + dmaap_conf: /etc/dcae/dmaap.conf + http_timeout: 5 + http_retries: 3 + http_secs_between_retries: .75 + primary_publisher: true + peer_publisher: null + max_traps_between_publish: 50 + max_milliseconds_between_publish: 3500 diff --git a/etc/trapd_logging.yaml b/etc/trapd_logging.yaml new file mode 100644 index 0000000..586e3cb --- /dev/null +++ b/etc/trapd_logging.yaml @@ -0,0 +1,40 @@ +version: 1 +disable_existing_loggers: False +formatters: + simple: + format: "%(levelname)s|%(asctime)s|%(name)s|%(process)d|%(funcName)s|'%(message)s" + +handlers: + console: + class: logging.StreamHandler + level: DEBUG + formatter: simple + stream: ext://sys.stdout + + info_file_handler: + class: logging.handlers.RotatingFileHandler + level: INFO + formatter: simple + filename: info.log + maxBytes: 10480000 # 10MB + backupCount: 10 + encoding: utf8 + + error_file_handler: + class: logging.handlers.RotatingFileHandler + level: ERROR + formatter: simple + filename: errors.log + maxBytes: 6144000 # 6MB + backupCount: 10 + encoding: utf8 + +loggers: + my_module: + level: ERROR + handlers: [console] + propagate: no + +root: + level: INFO + handlers: [console, info_file_handler, error_file_handler] diff --git a/logs/.blank b/logs/.blank new file mode 100644 index 0000000..e69de29 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..6b40be5 --- /dev/null +++ b/setup.py @@ -0,0 +1,68 @@ +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import argparse +import array +import asyncio +import collections +import datetime +import errno +from pysnmp.carrier.asyncio.dgram import udp, udp6 +from pysnmp.entity import engine, config +from pysnmp.entity.rfc3413 import ntfrcv +from pysnmp.proto.api import v2c +import json +import logging +import logging.handlers +import os +import pprint +import re +import requests +import signal +import socket +import string +import sys +import time +import traceback +from trapd_dmaap_config import read_dmaap_config +from trapd_exit import cleanup_and_exit +from trapd_http_session import init_session_obj +from trapd_perm_status import log_to_perm_status +from trapd_runtime_pid import save_pid, rm_pid +from trapd_trap_config import read_trap_config +from trapd_yaml_config import read_yaml_config +import unicodedata +import uuid as uuid_mod +import yaml + +install_reqs = parse_requirements("requirements.txt", session=PipSession()) +reqs = [str(ir.req) for ir in install_reqs] + +setup( + name = "dcaegen2_collectors_snmptrap", + description = "snmp trap receiver for a DCAE docker image", + version = "1.0", + packages=find_packages(), + author = "Dave LaDue", + author_email = "dl3158@att.com", + license='Apache 2', + keywords = "", + url = "", + install_requires=reqs +) diff --git a/src/dcae_snmptrapd.py b/src/dcae_snmptrapd.py index 8821d49..ee69fec 100644 --- a/src/dcae_snmptrapd.py +++ b/src/dcae_snmptrapd.py @@ -1,5 +1,4 @@ -# -# ============LICENSE_START======================================================= +# ============LICENSE_START=======================================================) # org.onap.dcae # ================================================================================ # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. @@ -7,9 +6,9 @@ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,24 +18,37 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. # +""" +dcae_snmptrapd is responsible for SNMP trap receipt and publishing activities. +It's behavior is controlled by several configs, the primary being: + + ../etc/trapd.yaml + +As traps arrive they are decomposed and transformed into a JSON message which +is published to a dmaap instance that has been defined by controller. + +:Parameters: + usage: dcae_snmptrapd.py -c [-v] +:Keywords: + onap dcae snmp trap publish dmaap +""" +__docformat__ = 'restructuredtext' + +# basics import argparse -from array import * -import asyncio +import array +import asyncio from collections import Counter import datetime -import json +import errno +import json import logging import logging.handlers -from optparse import OptionParser import os import pprint -from pysnmp.entity import engine, config -from pysnmp.carrier.asyncio.dgram import udp -# from pysnmp.carrier.asyncore.dgram import udp -from pysnmp.entity.rfc3413 import ntfrcv -from pysnmp.proto.api import v2c import requests +import re import sys import signal import string @@ -47,95 +59,123 @@ import unicodedata import uuid as uuid_mod import yaml -prog_name=os.path.basename(__file__) +# pysnmp +from pysnmp.entity import engine, config +from pysnmp.carrier.asyncio.dgram import udp, udp6 +# from pysnmp.carrier.asyncore.dgram import udp +from pysnmp.entity.rfc3413 import ntfrcv +from pysnmp.proto.api import v2c + +# gen2 controller +from onap_dcae_cbs_docker_client.client import get_config + +# dcae_snmptrap +from trapd_runtime_pid import save_pid, rm_pid +from trapd_yaml_config import read_yaml_config +from trapd_trap_config import read_trap_config +from trapd_dmaap_config import read_dmaap_config +from trapd_exit import cleanup_and_exit +from trapd_http_session import init_session_obj +from trapd_perm_status import log_to_perm_status + +prog_name = os.path.basename(__file__) traps_in_second = 0 last_epoch_second = 0 -ueb_partition = "" - -# -# yc_ -> "yaml config_" -if you see this prefix, it came from conf file -# protocol -yc_transport = "" -yc_interface = "" -yc_port = 162 -yc_dns_cache_ttl_seconds = 0 - -# files -yc_runtime_base_dir = "" -yc_log_dir = "" -yc_data_dir = "" -yc_pid_dir = "" -yc_dcae_snmptrapd_diag = "" -yc_raw_traps_log = "" -yc_published_traps_dir = "" -yc_trap_stats_log = "" -yc_perm_status_file = "" - -# ueb -yc_dmaap_conf = "" -yc_http_timeout = 5.0 -yc_primary_publisher = "" -yc_peer_publisher = "" -yc_max_traps_between_publish = 0 # max number of traps to batch before publishing -yc_max_milliseconds_between_publish = 0 # max number of seconds between publishing -# - # dmaap_url = "" dmaap_user_name = "" dmaap_p_var = "" dmaap_stream_id = "" +dmaap_host = "" # -# Requests session object (ueb and dmaap). -dmaap_request_session = "" +# Requests session object +dmaap_requests_session = None http_headers = {"Content-type": "application/json"} -# FIXME: temp resource for UEB publishes -ueb_url = "" - # # # dns_cache_ip_to_name # key [ip address] -> fqdn -# dns_cache_ip_expires -# key [ip address] -> epoch this entry expires at +# dns_cache_ip_expires +# key [ip address] -> epoch time this entry expires and must be reloaded dns_cache_ip_to_name = {} dns_cache_ip_expires = {} # +# +num_trap_conf_entries = 0 +trap_conf_dict = {} +# + +pid_file_name = "" + # logging dcae_logger = logging.getLogger('dcae_logger') handler = "" +dcae_logger_max_bytes = 60000000 +dcae_logger_num_archives = 10 + +undefined = "undefined" +rc = 0 +usage_requested = False + +json_fd = None +last_hour = -1 + +verbose = False + +trap_dict = {} + +# # # # # # # # # # # +# fx: usage_err +# # # # # # # # # # # +def usage_err(): + """ + Notify of incorrect (argument) usage + :Parameters: + none + :Exceptions: + none + :Keywords: + usage args + """ + + print('Correct usage:') + print(' %s -c [-v]' % prog_name) + cleanup_and_exit(1, "undefined") + # # # # # # # # # # # # # # # # # # # # fx: setup dcae_logger custom logger # # # # # # # # # # ## # # # # # # # -def setup_dcae_logger(): +def setup_dcae_logger(_yc_trapd_diag): """ - Setup custom logger for dcae_snmptrapd that incorporates - a rotating file handler with 10 backups of diagnostic - log file. + Setup custom logger for dcae_snmptrapd that incorporates + a rotating file handler with 10 backups of diagnostic messages :Parameters: - none + _yc_trapd_diag - the full path output filename :Exceptions: none :Keywords: - logging + logging rotation """ - global dcae_logger, verbose + global dcae_logger global handler date_fmt = '%m/%d/%Y %H:%M:%S' - yc_dcae_snmptrapd_diag_bak = "%s.bak" % (yc_dcae_snmptrapd_diag) - if os.path.isfile(yc_dcae_snmptrapd_diag): - os.rename(yc_dcae_snmptrapd_diag, yc_dcae_snmptrapd_diag_bak) + _yc_trapd_diag_bak = "%s.bak" % (_yc_trapd_diag) + if os.path.isfile(_yc_trapd_diag): + os.rename(_yc_trapd_diag, _yc_trapd_diag_bak) - handler = logging.handlers.RotatingFileHandler(yc_dcae_snmptrapd_diag, maxBytes=60000000, backupCount=10) + # handler = logging.handlers.RotatingFileHandler(yc_trapd_diag, maxBytes=60000000, backupCount=10) + handler = logging.handlers.RotatingFileHandler(_yc_trapd_diag, + maxBytes=dcae_logger_max_bytes, + backupCount=dcae_logger_num_archives) # set logLevel - valid values NOTSET, DEBUG, INFO, WARNING, ERROR, CRITICAL handler.setLevel(logging.DEBUG) @@ -147,429 +187,295 @@ def setup_dcae_logger(): handler.setFormatter(formatter) dcae_logger.addHandler(handler) - if os.path.isfile(yc_dcae_snmptrapd_diag): - os.chmod(yc_dcae_snmptrapd_diag, 0o640) - - if os.path.isfile(yc_dcae_snmptrapd_diag_bak): - os.chmod(yc_dcae_snmptrapd_diag_bak, 0o640) + if os.path.isfile(_yc_trapd_diag): + os.chmod(_yc_trapd_diag, 0o640) - -# # # # # # # # # # # # # -# fx: save_pid - save PID of running process -# # # # # # # # # # # # # -def save_pid(loc_pid_file_name): - """ - Save the current process ID in a file for external - access. - :Parameters: - loc_pid_file_name - filename including full path to write current process ID to - :Exceptions: - file open - this function will throw an exception if unable to open loc_pid_file_name - :Keywords: - pid /var/run - """ + if os.path.isfile(_yc_trapd_diag_bak): + os.chmod(_yc_trapd_diag_bak, 0o640) - try: - pid_fd = open(loc_pid_file_name, 'w') - pid_fd.write('%d' % os.getpid()) - pid_fd.close() - except: - print("Error saving PID file %s :" % loc_pid_file_name) - else: - print("PID file %s" % loc_pid_file_name) - -# # # # # # # # # # # # # -# fx: rm_pid - remove PID of running process -# # # # # # # # # # # # # -def rm_pid(loc_pid_file_name): - """ - Remove the current process ID file before exiting. - :Parameters: - loc_pid_file_name - filename that contains current process ID to be removed - :Exceptions: - file open - this function will throw an exception if unable to find or remove - loc_pid_file_name - :Keywords: - pid /var/run +# # # # # # # # # # # # # # # # # # # +# fx: load_all_configs +# FIXME: currently on hold for load and signal handling convergence +# # # # # # # # # # ## # # # # # # # +def load_all_configs(_signum, _frame): """ + Calls individual functions to read various config files required. This + function is called directly (e.g. at startup) and is also registered + with signal handling (e.g. kill -sigusr1 ) - try: - if os.path.isfile(loc_pid_file_name): - os.remove(loc_pid_file_name) - except: - print("Error removing PID file %s" % loc_pid_file_name) - -# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # -# function: get_yaml_cfg -# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # -def get_yaml_cfg(loc_yaml_conf_file): - """ - Load all sorts of goodies from yaml config file. :Parameters: - loc_yaml_conf_file - filename including full path to yaml config file + signum and frame (only present when called via signal to running process) :Exceptions: - file open - this function will throw an exception if unable to open - loc_yaml_conf_file (fatal error) or any of the required - values are not found in the loc_yaml_conf_file (fatal error) + none :Keywords: - yaml config runtime protocol files ueb + config files :Variables: - yc_transport - protocol transport for snmp traps (udp|tcp) - yc_interface - what interface to listen for traps on - yc_port - what port to listen for traps on - yc_dns_cache_ttl_seconds - how many seconds an entry remains in DNS cache prior to refresh - yc_runtime_base_dir - base directory of dcae_snmptrapd application - yc_log_dir - log directory of dcae_snmptrapd application - yc_data_dir - data directory of dcae_snmptrapd application - yc_pid_dir - directory where running PID file will be written (filename /.pid) - yc_dcae_snmptrapd_diag - program diagnostic log, auto rotated and archived via python handler - yc_raw_traps_log - file to write raw trap data to - yc_published_traps_dir - file to write json formatted trap data for successful publishes (only!) - yc_trap_stats_log - file to write trap stats (traps per second, by OID, by agent) - yc_perm_status_file - file to write trap stats (traps per second, by OID, by agent) - yc_dmaap_conf - file (full path) of yaml config entries referenced at runtime, passed as - runtime command argument "-c - yc_http_timeout - http timeout in seconds for dmaap publish attempt - yc_primary_publisher - boolean defining whether local instance is primary (future use) - yc_peer_publisher - identity of peer publisher in case this one fails (future use) - yc_max_traps_between_publish - if batching publishes, max number of traps to queue before http post - yc_max_milliseconds_between_publish - if batching publishes, max number of milliseconds between http post - Note: using the batch feature creates an opportunity for trap loss if - traps stop arriving and the process exits (traps in queue will remain - there until another trap arrives and kicks of the evaluation of max_traps - or max_milliseconds above). + yaml_conf_file + dmaap_requests_session """ - global yc_transport, yc_port, yc_interface, yc_dns_cache_ttl_seconds, yc_runtime_base_dir, yc_log_dir, yc_data_dir, yc_pid_dir, yc_dcae_snmptrapd_diag, yc_raw_traps_log, yc_published_traps_dir, yc_trap_stats_log, yc_perm_status_file, yc_dmaap_conf, yc_http_timeout, yc_primary_publisher, yc_peer_publisher, yc_max_traps_between_publish, yc_max_milliseconds_between_publish + if int(_signum) != 0: + dcae_logger.info("%s Received signal %s at frame %s; re-reading config file" + % (prog_name, _signum, _frame)) + else: + dcae_logger("Reading config files") - with open(loc_yaml_conf_file, 'r') as yaml_fd: - cfg_data = yaml.load(yaml_fd) + # FIXME: should be re-reading all configs here - # ONAP FIXME: split try into per-section except loops below - try: - # protocol - yc_transport = (cfg_data['protocol']['transport']) - yc_interface = (cfg_data['protocol']['interface']) - yc_port = int(cfg_data['protocol']['port']) - yc_dns_cache_ttl_seconds = int(cfg_data['protocol']['dns_cache_ttl_seconds']) - - # files and directories - yc_runtime_base_dir = (cfg_data['files']['runtime_base_dir']) - yc_log_dir = (cfg_data['files']['log_dir']) - yc_data_dir = (cfg_data['files']['data_dir']) - yc_pid_dir = (cfg_data['files']['pid_dir']) - yc_dcae_snmptrapd_diag = (cfg_data['files']['dcae_snmptrapd_diag']) - yc_raw_traps_log =(cfg_data['files']['raw_traps_log']) - yc_published_traps_dir =(cfg_data['files']['published_traps_dir']) - yc_trap_stats_log =(cfg_data['files']['trap_stats_log']) - yc_perm_status_file = (cfg_data['files']['perm_status_file']) - - # ueb - yc_dmaap_conf = (cfg_data['ueb']['dmaap_conf']) - yc_http_timeout = (cfg_data['ueb']['http_timeout']) - yc_primary_publisher = (cfg_data['ueb']['primary_publisher']) - yc_peer_publisher = (cfg_data['ueb']['peer_publisher']) - yc_max_traps_between_publish = (cfg_data['ueb']['max_traps_between_publish']) - yc_max_milliseconds_between_publish = (cfg_data['ueb']['max_milliseconds_between_publish']) + # Initialize dmaap requests session object. Close existing session + # if applicable. + if dmaap_requests_session != None: + dmaap_requests_session.close() + dmaap_requests_session = init_session_obj(dcae_logger) - except: - print("ERROR reading config %s" % loc_yaml_conf_file) - raise - cleanup_and_exit(1) - - # print back for confirmation - print("Read config: %s" % loc_yaml_conf_file) - print(" protocol section:") - print(" transport: %s" % yc_transport) - print(" interface: %s" % yc_interface) - print(" port: %s" % yc_port) - print(" dns_cache_ttl_seconds: %s" % yc_dns_cache_ttl_seconds) - print(" files section:") - print(" runtime_base_dir: %s" % yc_runtime_base_dir) - print(" log_dir: %s" % yc_log_dir) - print(" data_dir: %s" % yc_data_dir) - print(" pid_dir: %s" % yc_pid_dir) - print(" dcae_snmptrapd_diag: %s" % yc_dcae_snmptrapd_diag) - print(" raw_traps_log: %s" % yc_raw_traps_log) - print(" published_traps_dir: %s" % yc_published_traps_dir) - print(" trap_stats_log: %s" % yc_trap_stats_log) - print(" perm_status_file: %s" % yc_perm_status_file) - print(" ueb section:") - print(" dmaap_config_file: %s" % yc_dmaap_conf) - print(" http_timeout: %s" % yc_http_timeout) - print(" primary_publisher: %s" % yc_primary_publisher) - print(" peer_publisher: %s" % yc_peer_publisher) - print(" max_traps_between_publish: %s" % yc_max_traps_between_publish) - print(" max_milliseconds_between_publish: %s" % yc_max_milliseconds_between_publish) + return _yaml_config_values -# # # # # # # # # # # -# fx: get_dmaap_cfg -# # # # # # # # # # # -def get_dmaap_cfg(): + +# # # # # # # # # # # # # +# fx: rename_json_log +# # # # # # # # # # # # # +def rename_json_log(_outputFname): """ - Load dmaap config /etc/dcae/dmaap.conf file (legacy controller) + Renames JSON output file to include ISO-formatted date suffix :Parameters: - none + signum and frame (only present when called via signal to running process) :Exceptions: - file open - this function will throw an exception if unable to open - yc_dmaap_conf(fatal error) + none :Keywords: - legacy controller dmaap.conf + json log :Variables: - yc_dmaap_conf - full path filename of dmaap_conf file provided by previous - generation controller + json log filename """ - global dmaap_url, dmaap_user_name, dmaap_p_var, dmaap_stream_id + global json_fd - if os.path.isfile(yc_dmaap_conf): - dcae_logger.debug ('Reading DMaaP config file %s ' % - yc_dmaap_conf) - else: - dcae_logger.error ('DMaaP config file %s does NOT exist - exiting' - % (yc_dmaap_conf)) - cleanup_and_exit(1) + # check if outputfile exists; if it does, move to timestamped version + _outputFnameBak = "%s.%s" % (_outputFname, + datetime.datetime.fromtimestamp(time.time()). + fromtimestamp(time.time()). + strftime('%Y-%m-%dT%H:%M:%S')) - with open(yc_dmaap_conf) as dmaap_config_fd: - dmaapCfgData = json.load(dmaap_config_fd) + # close existing file + close_json_log() + if os.path.isfile(_outputFname): + dcae_logger.debug('Renaming %s to %s' % + (_outputFname, _outputFnameBak)) + os.rename(_outputFname, _outputFnameBak) + else: + dcae_logger.error("Unable to move %s to %s - source file does not exist" % + (_outputFname, _outputFnameBak)) + # open new (empty) log file try: - dmaap_url = dmaapCfgData [0]["dmaapUrl"] - dmaap_user_name = dmaapCfgData [0]["dmaapUserName"] - dmaap_p_var = dmaapCfgData [0]["dmaapPassword"] - dmaap_stream_id = dmaapCfgData [0]["dmaapStreamId"] + json_fd = open_json_log() except: - dcae_logger.error ('DMaaP config file %s has missing data - exiting' - % (yc_dmaap_conf)) - cleanup_and_exit(1) + dcae_logger.exception( + "Error opening new json log file %s - exiting " % _outputFname) + sys.exit(1) - dcae_logger.debug('dmaap_url: %s' % (dmaap_url)) - dcae_logger.debug('dmaap_user_name: %s' % (dmaap_user_name)) - dcae_logger.debug('dmaap_p_var: -') - dcae_logger.debug('dmaap_stream_id: %s' % (dmaap_stream_id)) +# # # # # # # # # # # # # +# fx: open_json_log +# # # # # # # # # # # # # +def open_json_log(): + + try: + # open append mode just in case so nothing is lost, but should be + # non-existent file + _json_fd = open(json_log_filename, 'a') + dcae_logger.exception("Opened %s append mode: " % json_log_filename) + return _json_fd + except: + dcae_logger.exception( + "Error opening %s append mode: " % json_log_filename) + sys.exit(1) - dmaap_config_fd.close() # # # # # # # # # # # # # -# fx: init_session_obj +# fx: close_json_log # # # # # # # # # # # # # -def init_session_obj(): - """ - Initializes and returns a http request session object for later use - :Parameters: - none - :Exceptions: - session object creation - this function will throw an exception if unable to create - a new session object - :Keywords: - http request session - :Variables: - none - """ +def close_json_log(): + + global json_fd try: - s = requests.Session() - dcae_logger.debug("New requests session has been initialized") + json_fd.close() except: - dcae_logger.error("Failed to create new requests session") + dcae_logger.error("ERROR closing json audit file %s - results " + "indeterminate" % (json_log_filename)) - return s +# # # # # # # # # # # # # +# fx: log_published_messages +# # # # # # # # # # # # # -# # # # # # # # # # # # # # # # # # # -# fx: load_cfg -# # # # # # # # # # ## # # # # # # # -def load_cfg(_signum, _frame): - """ - Calls individual functions to read various config files required. This - function is called directly (e.g. at startup) and is also registered - with signal handling (e.g. kill -sigusr1 ) - - :Parameters: - signum and frame (only present when called via signal to running process) - :Exceptions: - none - :Keywords: - config files - :Variables: - yaml_conf_file - dmaap_request_session - """ +def log_published_messages(loc_post_data_enclosed): - global dmaap_request_session + # FIXME: should keep data dictionary of Fd's open, and reference those vs. + # repeatedly opening append-mode + # open json audit log file - if int(_signum) != 0: - print("%s Received signal %s at frame %s; re-reading config file" - % (prog_name, _signum, _frame)) - else: - print("Reading config files") + global json_fd, last_hour - # always get yaml config values - get_yaml_cfg(yaml_conf_file) + # close output file, backup current and move new one into place on day change + dcae_logger.info('%.4f adding %s to json log' % + (time.time(), trap_dict["uuid"])) + curr_hour = datetime.datetime.now().hour + if curr_hour < last_hour: + rename_json_log(json_log_filename) + json_fd = open_json_log(json_log_filename) - # Initialize dmaap requests session object. Close existing session - # if applicable. - get_dmaap_cfg() - if dmaap_request_session: - dmaap_request_session.close() - dmaap_request_session = init_session_obj() - # dcae_logger.debug("dmaap_request_session: %s" % dmaap_request_session) + try: + m = loc_post_data_enclosed + '\n' + json_fd.write('%s' % str(m)) + except Exception as e: + dcae_logger.error("ERROR writing json audit file %s - message NOT LOGGED: %s" + % (json_log_filename, str(e))) + last_hour = curr_hour + dcae_logger.info('%.4f logged %s' % (time.time(), trap_dict["uuid"])) -# # # # # # # # # # # # # # # # # # # -# fx: post_ueb -# temporarily publish to UEB to validate json format -# # # # # # # # # # # # # # # # # # # -def post_ueb(loc_json_msg): +# # # # # # # # # # # # # +# fx: post_dmaap +# # # # # # # # # # # # # + + +def post_dmaap(dmaap_url, dmaap_user_name, dmaap_p_var, dmaap_stream_id, dmaap_host, uuid, traps_json_string): """ - This function is only present for lab testing, to allow easier unit tests - vs. depend on (a) legacy controller or (b) next gen controller existence + Publish trap daata in json format to dmaap :Parameters: - loc_json_msg - json string of trap attributes to publish + dmaap_url + base url for http post + dmaap_user_name + username for http post + dmaap_p_var + access credential for http post + dmaap_stream_id + appended to dmaap_url, equiv to "topic" + dmaap_host + target dmaap server to submit http post + uuid + unique ID associated with this trap + traps_json_string + json format string to include in http post :Exceptions: none :Keywords: - UEB non-AAF legacy http post + http post dmaap json message :Variables: """ - global dmaap_request_session + global http_resp, dmaap_requests_session, last_pub_time - post_data_enclosed = '[' + loc_json_msg + ']' + if dmaap_requests_session == None: + dmaap_requests_session = init_session_obj(dcae_logger) - try: - http_resp = dmaap_request_session.post(ueb_url, headers=http_headers, data=post_data_enclosed, - timeout=7) - dcae_logger.debug("Response from %s: %s dmaap_request_sesson: %s" % (ueb_url, http_resp.status_code, dmaap_request_session)) - if http_resp.status_code == requests.codes.ok : - dcae_logger.debug("trap published successfully") - else: - dcae_logger.debug("DMAAP returned non-normal response - ERROR") - except: - dcae_logger.debug("Response from %s on topic %s: %s dmaap_request_session: %s") - -# # # # # # # # # # # # # -# fx: post_dmaap -# # # # # # # # # # # # # -def post_dmaap(topic, post_topics_idx, loc_num_traps_to_publish_in_topic): - - global http_resp, dmaap_url, dmaap_user_name, post_data_by_topics, drs, \ - last_pub_time - - post_data_enclosed = '[' + post_data_by_topics[post_topics_idx] + ']' - - # This is for logging purposes only. - dmaap_host = dmaap_url.split('/')[2][:-5] + post_data_enclosed = '[' + traps_json_string + ']' k = 0 dmaap_pub_success = False - while not dmaap_pub_success and k < num_pub_attempts: + if verbose: + print('%.4f starting publish of %s' % (time.time(), trap_dict["uuid"])) + dcae_logger.info('%.4f starting publish of %s' % + (time.time(), trap_dict["uuid"])) + while not dmaap_pub_success and k < yaml_config_values.yc_http_retries: try: - dcae_logger.debug("Attempt %d to %s, %d traps in msg, dmaap_url: " - "%s dmaap_user_name: %s post_data: %s" - % (k, dmaap_host, - loc_num_traps_to_publish_in_topic, dmaap_url, - dmaap_user_name, - post_data_enclosed)) + dcae_logger.debug("Attempt %d to %s dmaap_url: " + "%s dmaap_user_name: %s post_data: %s" + % (k, dmaap_host, + dmaap_url, + dmaap_user_name, + post_data_enclosed)) # below disable_warnings required until python updated: # https://github.com/shazow/urllib3/issues/497 - requests.packages.urllib3.disable_warnings() - - http_resp = drs.post(dmaap_url, post_data_enclosed, - auth=(dmaap_user_name, dmaap_p_var), - headers=http_headers, - timeout=ueb_http_timeout) - dcae_logger.debug("Response from %s on topic %s: %s drs: %s" - % (dmaap_host, topic, http_resp.status_code, drs)) + # requests.packages.urllib3.disable_warnings() + http_resp = dmaap_requests_session.post(dmaap_url, post_data_enclosed, + auth=(dmaap_user_name, + dmaap_p_var), + headers=http_headers, + timeout=yaml_config_values.yc_http_timeout) + dcae_logger.debug("Response from %s on stream %s: %s dmaap_requests_session: %s" + % (dmaap_host, dmaap_stream_id, http_resp.status_code, dmaap_requests_session)) + if verbose: + print('%.4f published %s successfully' % + (time.time(), trap_dict["uuid"])) + dcae_logger.info('%.4f published %s successfully' % + (time.time(), trap_dict["uuid"])) if http_resp.status_code == requests.codes.ok: - dcae_logger.debug("%d traps published" - % loc_num_traps_to_publish_in_topic) - log_published_messages("DMAAP", topic, post_data_enclosed) + dcae_logger.debug("Response from %s: %s dmaap_request_sesson: %s" % ( + dmaap_url, http_resp.status_code, dmaap_requests_session)) + log_published_messages(post_data_enclosed) last_pub_time = time.time() dmaap_pub_success = True + break else: - dcae_logger.debug("Response (non-200) detail from %s on topic " - "%s: %s" % (dmaap_host, topic, http_resp.text)) + dcae_logger.debug("Response (non-200) detail from %s on stream " + "%s: %s" % (dmaap_host, dmaap_stream_id, http_resp.text)) + except OSError as e: + dcae_logger.debug("Exception while posting message to host: %s, stream: %s, dmaap_requests_session: %s, exception: %s %s" + % (dmaap_host, dmaap_stream_id, dmaap_requests_session, e.errno, e.strerror)) except requests.exceptions.RequestException as e: dcae_logger.error("Exception while posting to %s topic %s: -->%s<--" - % (dmaap_host, topic, e)) + % (dmaap_host, dmaap_stream_id, e)) k += 1 - # No point in waiting just to log "ALL publish attempts failed" msg - if k < num_pub_attempts: - time.sleep(sleep_between_retries) + if k < yaml_config_values.yc_http_retries: + dcae_logger.error("sleeping %s and retrying" % + yaml_config_values.yc_http_secs_between_retries) + time.sleep(yaml_config_values.yc_http_secs_between_retries) else: + dcae_logger.error("exhausted all attempts - giving up") break + if verbose: + print('%.4f exiting post_dmaap for %s' % + (time.time(), trap_dict["uuid"])) + dcae_logger.info('%.4f exiting post_dmaap for %s' % + (time.time(), trap_dict["uuid"])) if not dmaap_pub_success: - uuid = uuid_mod.uuid1() - dcae_logger.error("ALL publish attempts failed to DMAAP server: %s, " - "topic: %s, %d trap(s) not published, message: %s" - % (dmaap_host, topic, loc_num_traps_to_publish_in_topic, - post_data_by_topics[post_topics_idx])) - - # Set epoch_serno range for topic - ret_list = set_topic_serno_range(topic) - fes = ret_list[0] - les = ret_list[1] - - perm_msg = "CRITICAL: [%s] ALL publish attempts failed to DMAPP server: "\ - "%s, topic: %s, %d trap(s) not published in epoch_serno "\ - "range: %d - %d\n" \ - % (uuid, dmaap_host, topic, loc_num_traps_to_publish_in_topic, - fes, les) - + # uuid = uuid_mod.uuid1() + perm_msg = "CRITICAL: publish failure to DMAAP server: "\ + "%s, stream: %s trap: %s" % ( + dmaap_host, dmaap_stream_id, uuid) + dcae_logger.error(perm_msg) dcae_logger.error("SEND-TO-PERM-STATUS: %s" % perm_msg) - log_to_perm_status(perm_msg) + log_to_perm_status( + yaml_config_values.yc_perm_status_file, perm_msg, dcae_logger) + dcae_logger.info("%.4f %s" % (time.time(), perm_msg)) + if verbose: + print("%.4f %s" % (time.time(), perm_msg)) # # # # # # # # # # # # # # # # # # # -# fx: trap_observer +# fx: request_observer for community string rewrite +# # # # # # # # # # # # # # # # # # # +def comm_string_rewrite_observer(snmpEngine, execpoint, variables, cbCtx): + + # match ALL community strings + if re.match('.*', str(variables['communityName'])): + dcae_logger.debug('Rewriting communityName \'%s\' from %s into \'public\'' % (variables['communityName'], ':'.join([str(x) for x in + variables['transportInformation'][1]]))) + variables['communityName'] = variables['communityName'].clone('public') + +# # # # # # # # # # # # # # # # # # # +# fx: snmp_engine_observer_cb # callback for when trap is received # # # # # # # # # # # # # # # # # # # -def trap_observer(snmp_engine, execpoint, variables, cbCtx): + + +def snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): """ - Decompose trap attributes and load in dictionary which is later used to - create json string for publishing to dmaap. + Decompose trap attributes and load in dictionary which is later used to + create json string for publishing to dmaap. :Parameters: snmp_engine snmp engine created to listen for arriving traps execpoint - point in code that trap_observer was invoked + point in code that snmp_engine_observer_cb was invoked variables trap attributes cbCtx @@ -583,52 +489,67 @@ def trap_observer(snmp_engine, execpoint, variables, cbCtx): global trap_dict, last_epoch_second, traps_in_epoch - # empty dictionary on new trap + # init dictionary on new trap trap_dict = {} # assign uuid to trap trap_dict["uuid"] = str(uuid_mod.uuid1()) + if verbose: + print('%.4f snmp trap arrived from %s, assigned uuid: %s' % + (time.time(), variables['transportAddress'][0], trap_dict["uuid"])) + dcae_logger.info('%.4f snmp trap arrived from %s, assigned uuid: %s' % ( + time.time(), variables['transportAddress'][0], trap_dict["uuid"])) + + # if re.match('.*', str(variables['communityName'])): + # print('Rewriting communityName \'%s\' from %s into \'public\'' % (variables['communityName'], ':'.join([str(x) for x in variables['transportInformation'][1]]))) + # variables['communityName'] = variables['communityName'].clone('public') + # ip and hostname ip_addr_str = str(variables['transportAddress'][0]) trap_dict["agent address"] = ip_addr_str try: - if int(dns_cache_ip_expires[ip_addr_str] < int(time.time())): - dcae_logger.debug ('dns cache expired for %s' % ip_addr_str) - raise Exception('cache expired for %s at %d - updating value' % (ip_addr_str, (dns_cache_ip_expires[ip_addr_str]))) - else: - trap_dict["agent name"] = dns_cache_ip_to_name[ip_addr_str] + if int(dns_cache_ip_expires[ip_addr_str] < int(time.time())): + dcae_logger.debug('dns cache expired for %s' % ip_addr_str) + raise Exception('cache expired for %s at %d - updating value' % + (ip_addr_str, (dns_cache_ip_expires[ip_addr_str]))) + else: + trap_dict["agent name"] = dns_cache_ip_to_name[ip_addr_str] except: - dcae_logger.debug ('dns cache expired or missing for %s - reloading' % ip_addr_str) + if verbose: + print('%.4f dns cache expired for %s' % (time.time(), ip_addr_str)) + dcae_logger.debug( + 'dns cache expired or missing for %s - reloading' % ip_addr_str) host_addr_info = socket.gethostbyaddr(ip_addr_str) agent_fqdn = str(host_addr_info[0]) trap_dict["agent name"] = agent_fqdn dns_cache_ip_to_name[ip_addr_str] = agent_fqdn - dns_cache_ip_expires[ip_addr_str] = (time.time() + yc_dns_cache_ttl_seconds) - dcae_logger.debug ('cache for %s (%s) updated - set to expire at %d' % (agent_fqdn, ip_addr_str, dns_cache_ip_expires[ip_addr_str])) + dns_cache_ip_expires[ip_addr_str] = ( + time.time() + yaml_config_values.yc_dns_cache_ttl_seconds) + dcae_logger.debug('cache for %s (%s) updated - set to expire at %d' % + (agent_fqdn, ip_addr_str, dns_cache_ip_expires[ip_addr_str])) dns_cache_ip_to_name[str(trap_dict["agent address"])] trap_dict["cambria.partition"] = str(trap_dict["agent name"]) trap_dict["community"] = "" # do not include cleartext community in pub - trap_dict["community len"] = 0 # do not include cleartext community in pub + # do not include cleartext community in pub + trap_dict["community len"] = 0 # FIXME.CHECK_WITH_DOWNSTREAM_CONSUMERS: get rid of round for millisecond val # epoch_second = int(round(time.time())) epoch_msecond = time.time() epoch_second = int(round(epoch_msecond)) if epoch_second == last_epoch_second: - traps_in_epoch +=1 + traps_in_epoch += 1 else: traps_in_epoch = 0 last_epoch_second = epoch_second traps_in_epoch_04d = format(traps_in_epoch, '04d') - # FIXME: get rid of exponential formatted output - trap_dict['epoch_serno'] = (epoch_second * 10000) + traps_in_epoch - # FIXME.PERFORMANCE: faster to use strings? - # trap_dict['epoch_serno'] = (str(epoch_second) + str(traps_in_epoch_04d)) - + trap_dict['epoch_serno'] = int( + (str(epoch_second) + str(traps_in_epoch_04d))) + snmp_version = variables['securityModel'] if snmp_version == 1: trap_dict["protocol version"] = "v1" @@ -642,18 +563,21 @@ def trap_observer(snmp_engine, execpoint, variables, cbCtx): trap_dict["protocol version"] = "unknown" if snmp_version == 3: - trap_dict["protocol version"] = "v3" - trap_dict["security level"] = str(variables['securityLevel']) - trap_dict["context name"] = str(variables['contextName'].prettyPrint()) - trap_dict["security name"] = str(variables['securityName']) - trap_dict["security engine"] = str(variables['contextEngineId'].prettyPrint()) + trap_dict["protocol version"] = "v3" + trap_dict["security level"] = str(variables['securityLevel']) + trap_dict["context name"] = str(variables['contextName'].prettyPrint()) + trap_dict["security name"] = str(variables['securityName']) + trap_dict["security engine"] = str( + variables['contextEngineId'].prettyPrint()) trap_dict['time received'] = epoch_msecond - trap_dict['trap category'] = "DCAE-COLLECTOR-UCSNMP" # get this from dmaap_url when ready + # get this from dmaap_url when ready + trap_dict['trap category'] = "DCAE-COLLECTOR-UCSNMP" + # Callback function for receiving notifications # noinspection PyUnusedLocal,PyUnusedLocal,PyUnusedLocal -def cbFun(snmp_engine, stateReference, contextEngineId, contextName, - varBinds, cbCtx): +def notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, + varBinds, cbCtx): """ Callback executed when trap arrives :Parameters: @@ -673,26 +597,29 @@ def cbFun(snmp_engine, stateReference, contextEngineId, contextName, :Variables: """ - global trap_dict - print('CB for notification from ContextEngineId "%s", ContextName "%s"' % (contextEngineId.prettyPrint(), - contextName.prettyPrint())) + if verbose: + print('%.4f processing varbinds for %s' % + (time.time(), trap_dict["uuid"])) + dcae_logger.info('%.4f processing varbinds for %s' % + (time.time(), trap_dict["uuid"])) + # FIXME: add conversion from v1 to v2 prior to below? or special handling for v1? - # print('entering cbFun, trap_dict is: %s' % (json.dumps(trap_dict))) + # FIXME update reset location when batching publishes vb_dict = {} - vb_idx=0; - k1="" - k2="" + vb_idx = 0 + k1 = "" + k2 = "" # FIXME: Note that the vb type is present, just need to extract it efficiently somehow # print('\nvarBinds ==> %s' % (varBinds)) # - # varBinds ==> [(ObjectName('1.3.6.1.2.1.1.3.0'), TimeTicks(1243175676)), - # (ObjectName('1.3.6.1.6.3.1.1.4.1.0'), ObjectIdentifier('1.3.6.1.4.1.74.2.46.12.1.1')), - # (ObjectName('1.3.6.1.4.1.74.2.46.12.1.1.1'), OctetString(b'ucsnmp heartbeat - ignore')), + # varBinds ==> [(ObjectName('1.3.6.1.2.1.1.3.0'), TimeTicks(1243175676)), + # (ObjectName('1.3.6.1.6.3.1.1.4.1.0'), ObjectIdentifier('1.3.6.1.4.1.74.2.46.12.1.1')), + # (ObjectName('1.3.6.1.4.1.74.2.46.12.1.1.1'), OctetString(b'ucsnmp heartbeat - ignore')), # (ObjectName('1.3.6.1.4.1.74.2.46.12.1.1.2'), OctetString(b'Fri Aug 11 17:46:01 EDT 2017'))] # # This does NOT work: @@ -701,77 +628,88 @@ def cbFun(snmp_engine, stateReference, contextEngineId, contextName, # print('typ = %s' % (typ)) # print('val = %s\n' % (val)) + vb_all_string = "" for name, val in varBinds: + vb_dict = {} if vb_idx == 0: vb_sys_uptime_oid = name vb_sys_uptime = val + trap_dict["sysUptime"] = str(val) # print('vb_sys_uptime = %s' % (vb_sys_uptime)) else: if vb_idx == 1: trap_dict["notify OID"] = str(val) - trap_dict["notify OID len"] = (trap_dict["notify OID"].count('.') + 1) + trap_dict["notify OID len"] = ( + trap_dict["notify OID"].count('.') + 1) # print('vb_notify_oid = %s' % (vb_notify_oid)) # else: # vb_idx_02d = format((vb_idx - 2), '02d') vb_idx_02d = format((vb_idx), '02d') - k1="varbind" + str(vb_idx_02d) + "_oid" - k2="varbind" + str(vb_idx_02d) + "_val" - vb_dict[k1] = name.prettyPrint() - vb_dict[k2] = val.prettyPrint() - + k1 = "varbind_oid_" + str(vb_idx_02d) + k2 = "varbind_value_" + str(vb_idx_02d) + # vb_dict[k1] = name.prettyPrint() + # vb_dict[k2] = val.prettyPrint() + vb_dict["varbind_type"] = "tbd" + vb_dict["varbind_oid"] = name.prettyPrint() + vb_dict["varbind_value"] = val.prettyPrint() + vb_json = json.dumps(vb_dict) + vb_all_string += vb_json + vb_idx += 1 - # print('SNMP trap arrived: %s' % (pprint.pprint(json.dumps(trap_dict)))) trap_dict["num varbinds"] = vb_idx - # FIXME: now add varbind dict to trap dict - trap_dict["varbinds"] = vb_dict + # add varbind dict to trap dict + # trap_dict["varbinds"] = vb_dict + trap_dict["varbinds"] = vb_all_string + dcae_logger.debug("vb_dict json-ized: %s" % (json.dumps(vb_dict))) trap_json_msg = json.dumps(trap_dict) - print('SNMP trap arrived: %s' % trap_json_msg) - # FIXME: temporary pub to UEB for validating JSON - post_ueb(trap_json_msg) + # publish to dmaap after last varbind is processed + post_dmaap(dmaap_config_values.dmaap_url, dmaap_config_values.dmaap_user_name, dmaap_config_values.dmaap_p_var, + dmaap_config_values.dmaap_stream_id, dmaap_config_values.dmaap_host, trap_dict["uuid"], trap_json_msg) # # # # # # # # # # # # # # Main MAIN Main MAIN # # # # # # # # # # # # # # parse command line args -parser = argparse.ArgumentParser(description='Post SNMP traps ' \ +parser = argparse.ArgumentParser(description='Post SNMP traps ' 'to DCAE DMaap MR') parser.add_argument('-c', action="store", dest="yaml_conf_file", type=str, help="yaml config file name") -parser.add_argument('-u', action="store", dest="ueb_url", type=str, - help="ueb url for testing purposes ONLY") parser.add_argument('-v', action="store_true", dest="verbose", help="verbose logging") +parser.add_argument('-?', action="store_true", dest="usage_requested", + help="show command line use") # set vars from args -parser.set_defaults(yaml_conf_file = "") +parser.set_defaults(yaml_conf_file="") # parse args args = parser.parse_args() # set vars from args yaml_conf_file = args.yaml_conf_file -ueb_url = args.ueb_url verbose = args.verbose +usage_requested = args.usage_requested + +# if usage, just display and exit +if usage_requested: + usage_err() # Get non-ENV settings from config file; spoof 2 params # so same fx can be used for signal handling if yaml_conf_file == "": - usage_err -else: - load_cfg('0', '0') + usage_err() -# save current PID for future/external reference -pid_file_name = '%s/%s.pid' % (yc_pid_dir, prog_name) -save_pid(pid_file_name) +# always get yaml config values +yaml_config_values = read_yaml_config(yaml_conf_file) # setup custom logger -setup_dcae_logger() +setup_dcae_logger(yaml_config_values.yc_trapd_diag) # bump up logging level if overridden at command line if verbose: @@ -782,11 +720,50 @@ if verbose: dcae_logger.info("log will include info level messages") dcae_logger.error("log will include error level messages") dcae_logger.debug("log will include debug level messages") +dcae_logger.info("Runtime PID file: %s" % pid_file_name) + +# setup signal handling for config file reload +# FIXME: need to have signal handler return all tuples for configs +# signal.signal(signal.SIGUSR1, load_all_configs) + +# save current PID for future/external reference +pid_file_name = '%s/%s.pid' % (yaml_config_values.yc_pid_dir, prog_name) +rc = save_pid(pid_file_name) + +# always get trap configs +trap_config_values = read_trap_config( + yaml_config_values.yc_trap_conf, dcae_logger) + +# Set initial startup hour for rolling logfile +last_hour = datetime.datetime.now().hour + +#make sure my env is set properly +try: + c = get_config() + if c == {}: + msg = "Unable to fetch configuration or it is erroneously empty - fatal ONAP controller error, trying OpenDCAE config" + dcae_logger.error(msg) + print('%s' % msg) + +#if new controller not present, try dmaap.conf +except: + msg = "ONAP controller not present, attempting OpenDCAE dmaap.conf config" + dcae_logger.error(msg) + dmaap_config_values = read_dmaap_config( + yaml_config_values.yc_dmaap_conf, dcae_logger) + + # get the topic from the url + dmaap_topic = dmaap_config_values.dmaap_url.split('.')[-1] + dcae_logger.info("Topic: %s" % dmaap_topic) + json_log_filename = yaml_config_values.yc_published_traps_dir + '/' + 'DMAAP' + '_' \ + + dmaap_topic + '.json' + json_fd = open_json_log() + msg = "Using OpenDCAE dmaap.conf config" # Get the event loop for this thread loop = asyncio.get_event_loop() -# Create SNMP engine with autogenernated engineID and pre-bound +# Create SNMP engine with autogenernated engineID pre-bound # to socket transport dispatcher snmp_engine = engine.SnmpEngine() @@ -794,39 +771,57 @@ snmp_engine = engine.SnmpEngine() # Transport setup # # # # # # # # # # # # -# UDP over IPv4, first listening interface/port +# UDP over IPv4 +# FIXME: add check for presense of ipv4_interface prior to attempting add OR just put entire thing in try/except clause config.addTransport( snmp_engine, udp.domainName + (1,), - udp.UdpTransport().openServerMode(('127.0.0.1', 6163)) + udp.UdpTransport().openServerMode( + (yaml_config_values.yc_ipv4_interface, yaml_config_values.yc_ipv4_port)) ) -# UDP over IPv4, second listening interface/port +# UDP over IPv6 +# FIXME: add check for presense of ipv6_interface prior to attempting add OR just put entire thing in try/except clause config.addTransport( snmp_engine, - udp.domainName + (2,), - udp.UdpTransport().openServerMode(('127.0.0.1', 2162)) + udp6.domainName, + udp6.Udp6Transport().openServerMode( + (yaml_config_values.yc_ipv6_interface, yaml_config_values.yc_ipv6_port)) ) +# UDP over IPv4, second listening interface/port +# config.addTransport( +# snmp_engine, +# udp.domainName + (2,), +# udp.UdpTransport().openServerMode(('127.0.0.1', 2162)) +# ) + # # # # # # # # # # # # # SNMPv1/2c setup # # # # # # # # # # # # # SecurityName <-> CommunityName mapping +# to restrict trap reception to only those with specific community +# strings config.addV1System(snmp_engine, 'my-area', 'public') -# register trap_observer for message arrival +# register comm_string_rewrite_observer for message arrival +snmp_engine.observer.registerObserver( + comm_string_rewrite_observer, + 'rfc2576.processIncomingMsg:writable' +) + +# register snmp_engine_observer_cb for message arrival snmp_engine.observer.registerObserver( - trap_observer, + snmp_engine_observer_cb, 'rfc3412.receiveMessage:request', - 'rfc3412.returnResponsePdu' - # 'rfc2576.processIncomingMsg:writable' + 'rfc3412.returnResponsePdu', ) # Register SNMP Application at the SNMP engine -ntfrcv.NotificationReceiver(snmp_engine, cbFun) +ntfrcv.NotificationReceiver(snmp_engine, notif_receiver_cb) -snmp_engine.transportDispatcher.jobStarted(1) # this job would never finish +snmp_engine.transportDispatcher.jobStarted(1) # loop forever # Run I/O dispatcher which would receive queries and send confirmations try: @@ -834,4 +829,4 @@ try: except: snmp_engine.observer.unregisterObserver() snmp_engine.transportDispatcher.closeDispatcher() - rm_pid(pid_file_name) + cleanup_and_exit(1, pid_file_name) diff --git a/src/dcae_snmptrapd.sh b/src/dcae_snmptrapd.sh index 4e00efc..c829866 100755 --- a/src/dcae_snmptrapd.sh +++ b/src/dcae_snmptrapd.sh @@ -1,24 +1,47 @@ #!/usr/bin/env bash -/* -* ============LICENSE_START======================================================= -* org.onap.dcae -* ================================================================================ -* Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -* ================================================================================ -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -* -* ECOMP is a trademark and service mark of AT&T Intellectual Property. -*/ +# +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# -python dcae_snmptrapd.py -c ../etc/dcae_snmptrapd.yaml + +# get to where we are supposed to be for startup +cd /opt/app/snmptrap/src + +# include path to 3.6+ version of python that has required dependencies included +export PATH=/opt/app/python-3.6.1/bin:$PATH + +# expand search for python modules to include ./mod in runtime dir +export PYTHONPATH=./mod:./:$PYTHONPATH + +# set location of SSL certificates +export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-bundle.crt + +# PYTHONUNBUFFERED: +# set PYTHONUNBUFFERED to True to avoid output buffering; comment out for +# better performance! +# export PYTHONUNBUFFERED='True' + +# less verbose at startup? Use this: +# python dcae_snmptrapd.py -c ../etc/trapd.yaml +# want tracing? Use this: +# python -m trace --trackcalls dcae_snmptrapd.py -c ../etc/trapd.yaml +# standard startup? Use this: +python dcae_snmptrapd.py -v -c ../etc/trapd.yaml diff --git a/src/mod/trapd_dcae_logger.py b/src/mod/trapd_dcae_logger.py new file mode 100644 index 0000000..c47d8cf --- /dev/null +++ b/src/mod/trapd_dcae_logger.py @@ -0,0 +1,68 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +""" +NOTE: This is a placeholder for now - logger has not been externalized +from the main source. + +Setup custom logger for dcae_snmptrapd that incorporates +a rotating file _handler with 10 backups of diagnostic messages +:Parameters: + +:Exceptions: + +:Keywords: + +""" + +__docformat__ = 'restructuredtext' + +import logging + + +# # # # # # # # # # # # # # # # # # # +# fx: setup _dcae_logger custom logger +# # # # # # # # # # ## # # # # # # # +def setup_dcae_logger(_yc_snmptrapd_diag, _dcae_logger_max_bytes, _dcae_logger_num_archives): + """ + """ + + _date_fmt = '%m/%d/%Y %H:%M:%S' + + _yc_snmptrapd_diag_bak = "%s.bak" % (_yc_snmptrapd_diag) + if os.path.isfile(_yc_snmptrapd_diag): + os.rename(_yc_snmptrapd_diag, _yc_snmptrapd_diag_bak) + + _handler = logging._handlers.RotatingFileHandler(_yc_snmptrapd_diag, + maxBytes=_dcae_logger_max_bytes, + backupCount=_dcae_logger_num_archives) + + # set logLevel - valid values NOTSET, DEBUG, INFO, WARNING, ERROR, CRITICAL + _handler.setLevel(logging.DEBUG) + _dcae_logger.setLevel(logging.DEBUG) + + log_fmt = '%(levelname)s|%(asctime)s|%(name)s|%(process)d|%(funcName)s|'\ + '%(message)s' + _formatter = logging.Formatter(log_fmt) + _handler.setFormatter(formatter) + _dcae_logger.addHandler(_handler) + + if os.path.isfile(_yc_snmptrapd_diag): + os.chmod(_yc_snmptrapd_diag, 0o640) + + if os.path.isfile(_yc_snmptrapd_diag_bak): + os.chmod(_yc_snmptrapd_diag_bak, 0o640) + + return _dcae_logger diff --git a/src/mod/trapd_dmaap_config.py b/src/mod/trapd_dmaap_config.py new file mode 100644 index 0000000..c764e52 --- /dev/null +++ b/src/mod/trapd_dmaap_config.py @@ -0,0 +1,104 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +""" +NOTE: This module is for temporary use. It will be removed when dcae_snmptrapd +is migrated to the new controller infrastructure. + +trapd_dmaap_config is responsible for reading/parsing the previous generation +'dmaap.conf' file, which includes stream, server and authentication details for +publishing activities. +""" + +__docformat__ = 'restructuredtext' + +import os +import sys +import string +import time +import traceback +import collections +import json + +from trapd_exit import cleanup_and_exit + +prog_name = os.path.basename(__file__) + + +# # # # # # # # # # # +# fx: read_dmaap_config +# # # # # # # # # # # +def read_dmaap_config(_yc_dmaap_conf, _dcae_logger): + # FIXME NOTE: This is for testing purposes only, and utilizes the + # previous generation of the controller; dispose of when ready + """ + Load dmaap config /etc/dcae/dmaap.conf file (legacy controller) + :Parameters: + name of dmaap config file + :Exceptions: + file open + this function will throw an exception if unable to open + yc_dmaap_conf(fatal error) + :Keywords: + legacy controller dmaap.conf + :Variables: + yc_dmaap_conf + full path filename of dmaap_conf file provided by previous + generation controller + :Returns: + named tuple of config values + """ + + _dmaap_cfg_values_nt = collections.namedtuple('dmaap_config_values', [ + 'dmaap_url', 'dmaap_user_name', 'dmaap_p_var', 'dmaap_stream_id', 'dmaap_host']) + if os.path.isfile(_yc_dmaap_conf): + _dcae_logger.debug('Reading DMaaP config file %s ' % + _yc_dmaap_conf) + else: + _dcae_logger.error('DMaaP config file %s does NOT exist - exiting' + % (_yc_dmaap_conf)) + cleanup_and_exit(1, undefined) + + with open(_yc_dmaap_conf) as _dmaap_config_fd: + _dmaapCfgData = json.load(_dmaap_config_fd) + + try: + dmaap_url = _dmaapCfgData[0]["dmaapUrl"] + _dcae_logger.debug('dmaap_url: %s' % (dmaap_url)) + dmaap_user_name = _dmaapCfgData[0]["dmaapUserName"] + _dcae_logger.debug('dmaap_user_name: %s' % (dmaap_user_name)) + dmaap_p_var = _dmaapCfgData[0]["dmaapPassword"] + _dcae_logger.debug('dmaap_p_var: -') + dmaap_stream_id = _dmaapCfgData[0]["dmaapStreamId"] + _dcae_logger.debug('dmaap_stream_id: %s' % (dmaap_stream_id)) + except: + _dcae_logger.error('DMaaP config file %s has missing data - exiting' + % (_yc_dmaap_conf)) + cleanup_and_exit(1, "undefined") + + # This is for logging purposes only. + dmaap_host = dmaap_url.split('/')[2][:-5] + _dcae_logger.debug('dmaap_host: %s' % (dmaap_host)) + + _dmaap_config_fd.close() + + _dmaap_cfg_values = _dmaap_cfg_values_nt(dmaap_url=dmaap_url, dmaap_user_name=dmaap_user_name, + dmaap_p_var=dmaap_p_var, dmaap_stream_id=dmaap_stream_id, dmaap_host=dmaap_host) + return _dmaap_cfg_values diff --git a/src/mod/trapd_exit.py b/src/mod/trapd_exit.py new file mode 100644 index 0000000..d14ea81 --- /dev/null +++ b/src/mod/trapd_exit.py @@ -0,0 +1,63 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +""" +trapc_exit_snmptrapd is responsible for removing any existing runtime PID +file, and exiting with the provided (param 1) exit code +""" + +__docformat__ = 'restructuredtext' + +import sys +import os +import string +from trapd_runtime_pid import save_pid, rm_pid + +prog_name = os.path.basename(__file__) + + +# # # # # # # # # # # # # +# fx: cleanup_and_exit +# - remove pid file +# - exit with supplied return code +# # # # # # # # # # # # # +def cleanup_and_exit(_loc_exit_code, _pid_file_name): + """ + Remove existing PID file, and exit with provided exit code + :Parameters: + _loc_exit_code + value to return to calling shell upon exit + _pid_file_name + name of file that contains current process ID (for + removal) + :Exceptions: + none + :Keywords: + runtime PID exit + :Variables: + _num_params + number of parameters passed to module + """ + + _num_params = len(locals()) + + if _num_params == 2: + rc = rm_pid(_pid_file_name) + sys.exit(_loc_exit_code) diff --git a/src/mod/trapd_http_session.py b/src/mod/trapd_http_session.py new file mode 100644 index 0000000..82c74b2 --- /dev/null +++ b/src/mod/trapd_http_session.py @@ -0,0 +1,62 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +""" +trapd_http_session establishes an http session for future use in publishing +messages to the dmaap cluster. +""" + +__docformat__ = 'restructuredtext' + +import logging +import os +import requests +import string +import time +import traceback + +prog_name = os.path.basename(__file__) + + +# # # # # # # # # # # # # +# fx: init_session_obj +# # # # # # # # # # # # # +def init_session_obj(_dcae_logger): + """ + Initializes and returns a http request session object for later use + :Parameters: + dcae logger for diagnostics + :Exceptions: + session object creation + this function will throw an exception if unable to create + a new session object + :Keywords: + http request session + :Variables: + none + """ + + try: + s = requests.Session() + _dcae_logger.debug("New requests session has been initialized: %s" % s) + except: + _dcae_logger.error("Failed to create new requests session") + + return s diff --git a/src/mod/trapd_perm_status.py b/src/mod/trapd_perm_status.py new file mode 100644 index 0000000..5bf2180 --- /dev/null +++ b/src/mod/trapd_perm_status.py @@ -0,0 +1,61 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +""" +trapd_perm_status maintains a 'permanent' status file +important messages for audit/diagnostics/etc +""" + +__docformat__ = 'restructuredtext' + +import logging +import os +import string +import time +import traceback + +prog_name = os.path.basename(__file__) + + +# # # # # # # # # # # # # +# fx: log_to_perm_status +# # # # # # # # # # # # # +def log_to_perm_status(_loc_perm_file, _loc_perm_msg, _dcae_logger): + """ + Log select errors too permanent logfile + access. + :Parameters: + log message, logger + :Exceptions: + file open + this function will catch exception of unable to + open the log file + :Keywords: + permstatus + """ + + perm_fmt_date = time.strftime("%a %b %d %H:%M:%S %Z %Y") + + try: + f = open(_loc_perm_file, 'a') + f.write("%s %s\n" % (perm_fmt_date, _loc_perm_msg)) + f.close() + except IOError: + _dcae_logger.exception("File I/O Exception on %s" % perm_status_fd) diff --git a/src/mod/trapd_runtime_pid.py b/src/mod/trapd_runtime_pid.py new file mode 100644 index 0000000..bddc63c --- /dev/null +++ b/src/mod/trapd_runtime_pid.py @@ -0,0 +1,91 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +""" +trapd_runtime_pid maintains a 'PID file' (file that contains the +PID of currently running trap receiver) +""" + +__docformat__ = 'restructuredtext' + +import logging +import os +import string +import time +import traceback + +prog_name = os.path.basename(__file__) + + +# # # # # # # # # # # # # +# fx: save_pid - save PID of running process +# # # # # # # # # # # # # +def save_pid(_pid_file_name): + """ + Save the current process ID in a file for external + access. + :Parameters: + none + :Exceptions: + file open + this function will catch exception of unable to + open/create _pid_file_name + :Keywords: + pid /var/run + """ + + try: + pid_fd = open(_pid_file_name, 'w') + pid_fd.write('%d' % os.getpid()) + pid_fd.close() + except IOError: + print("IOError saving PID file %s :" % _pid_file_name) + return False + # except: + # print("Error saving PID file %s :" % _pid_file_name) + # return False + else: + print("Runtime PID file: %s" % _pid_file_name) + return True + + +# # # # # # # # # # # # # +# fx: rm_pid - remove PID of running process +# # # # # # # # # # # # # +def rm_pid(_pid_file_name): + """ + Remove the current process ID file before exiting. + :Parameters: + none + :Exceptions: + file open + this function will catch exception of unable to find or remove + _pid_file_name + :Keywords: + pid /var/run + """ + + try: + if os.path.isfile(_pid_file_name): + os.remove(_pid_file_name) + return True + except IOError: + print("Error removing Runtime PID file: %s" % _pid_file_name) + return False diff --git a/src/mod/trapd_trap_config.py b/src/mod/trapd_trap_config.py new file mode 100644 index 0000000..bfcbb41 --- /dev/null +++ b/src/mod/trapd_trap_config.py @@ -0,0 +1,98 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +""" +trapd_trap_conf reads config file of traps and stores/returns them +in a data dictionary that is used to compare arriving SNMP OID's +to the list contained in this file for a keep(/publish) or ignore +decision. +""" + +__docformat__ = 'restructuredtext' + +import os +import sys +import string +import time +import traceback +from trapd_exit import cleanup_and_exit + + +prog_name = os.path.basename(__file__) + + +# # # # # # # # # # # +# fx: read_trap_config +# # # # # # # # # # # + +def read_trap_config(_yc_trap_conf, _dcae_logger): + """ + Load trap config file specified in yaml conf. This config (1) specifies + which traps should be published(inclusion) and which traps should be discarded + (not present in config) and (2) maps SNMP Notify OID to DMAAP/MR topics + :Parameters: + none + :Exceptions: + file open + this function will throw an exception if unable to open + _yc_trap_conf + :Keywords: + NotifyOID trap config topic + :Variables: + """ + + _trap_conf_dict = {} + + if os.path.isfile(_yc_trap_conf): + _dcae_logger.debug('Reading trap config file %s ' % _yc_trap_conf) + else: + _dcae_logger.error('ERROR: trap config file %s does NOT exist - exiting' + % (_yc_trap_conf)) + cleanup_and_exit(1, "undefined") + + # reset dictionaries in case we've been here before + _num_trap_conf_entries = 0 + + field_separator = " " + + _dcae_logger.debug('processing trap config settings from %s' + % (_yc_trap_conf)) + for line in open(_yc_trap_conf): + # format: + # + # oid_including_regex + # + if line[0] != '#': + columns = line.rstrip().split(field_separator) + # process trap config entries + if len(columns) == 2: + _trap_conf_oid = columns[0] + _trap_conf_dict[_trap_conf_oid] = columns[1] + _dcae_logger.debug('%d oid: %s topic: %s' % + (_num_trap_conf_entries, _trap_conf_oid, _trap_conf_dict[_trap_conf_oid])) + _num_trap_conf_entries += 1 + else: + _dcae_logger.debug('ERROR: Invalid trap config entry - ' + 'skipping: %s' % (line.rstrip())) + + _dcae_logger.debug('%d trap config entries found in %s' % (_num_trap_conf_entries, + _yc_trap_conf)) + + return _trap_conf_dict diff --git a/src/mod/trapd_yaml_config.py b/src/mod/trapd_yaml_config.py new file mode 100644 index 0000000..0041232 --- /dev/null +++ b/src/mod/trapd_yaml_config.py @@ -0,0 +1,193 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +""" +Read the SNMP trap receiver YAML config file, which contains the vast +majority of configurable parameters for the process, including +location of other config files, http timeouts, dns cache times, +etc. +""" + +__docformat__ = 'restructuredtext' + +import os +import sys +import string +import time +import traceback +import collections +import yaml +from trapd_exit import cleanup_and_exit + + +prog_name = os.path.basename(__file__) + + +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +# function: get_yaml_cfg +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + + +def read_yaml_config(loc_yaml_conf_file): + """ + Load all sorts of goodies from yaml config file. + :Parameters: + loc_yaml_conf_file + filename including full path to yaml config file + :Exceptions: + file open + this function will throw an exception if unable to open + loc_yaml_conf_file (fatal error) or any of the required + values are not found in the loc_yaml_conf_file (fatal error) + :Keywords: + yaml config runtime protocol files dmaap + :Variables: + yc_transport + protocol transport for snmp traps (udp|tcp) + yc_ipv4_interface + what ipv4 interface to listen for traps on + yc_ipv4_port + what port to listen for traps on + yc_dns_cache_ttl_seconds + how many seconds an entry remains in DNS cache prior to refresh + yc_runtime_base_dir + base directory of dcae_snmptrapd application + yc_log_dir + log directory of dcae_snmptrapd application + yc_data_dir + data directory of dcae_snmptrapd application + yc_pid_dir + directory where running PID file will be written (filename /.pid) + yc_trapd_diag + program diagnostic log, auto rotated and archived via python handler + yc_raw_traps_log + file to write raw trap data to + yc_published_traps_dir + file to write json formatted trap data for successful publishes (only!) + yc_trap_stats_log + file to write trap stats (traps per second, by OID, by agent) + yc_perm_status_file + file to write trap stats (traps per second, by OID, by agent) + yc_dmaap_conf + file (full path) of yaml config entries referenced at runtime, passed as + runtime command argument "-c + yc_http_timeout + http timeout in seconds for dmaap publish attempt + yc_http_retries + num of http retries to attempt in response to failed post + yc_primary_publisher + boolean defining whether local instance is primary (future use) + yc_peer_publisher + identity of peer publisher in case this one fails (future use) + yc_max_traps_between_publish + if batching publishes, max number of traps to queue before http post + yc_max_milliseconds_between_publish + if batching publishes, max number of milliseconds between http post + Note: using the batch feature creates an opportunity for trap loss if + traps stop arriving and the process exits (traps in queue will remain + there until another trap arrives and kicks of the evaluation of max_traps + or max_milliseconds above). + """ + + # named tuple for values in yaml config file + _yaml_config_values_nt = collections.namedtuple('yaml_config_values', ['yc_transport', 'yc_ipv4_port', 'yc_ipv4_interface', 'yc_ipv6_port', 'yc_ipv6_interface', 'yc_dns_cache_ttl_seconds', 'yc_runtime_base_dir', 'yc_log_dir', 'yc_data_dir', 'yc_pid_dir', 'yc_trap_conf', 'yc_trapd_diag', + 'yc_raw_traps_log', 'yc_published_traps_dir', 'yc_trap_stats_log', 'yc_perm_status_file', 'yc_dmaap_conf', 'yc_http_timeout', 'yc_http_retries', 'yc_http_secs_between_retries', 'yc_primary_publisher', 'yc_peer_publisher', 'yc_max_traps_between_publish', 'yc_max_milliseconds_between_publish']) + + with open(loc_yaml_conf_file, 'r') as yaml_fd: + cfg_data = yaml.load(yaml_fd) + + # ONAP FIXME: split try into per-section except loops below + try: + # protocol + yc_transport = (cfg_data['protocol']['transport']) + yc_ipv4_interface = (cfg_data['protocol']['ipv4_interface']) + yc_ipv4_port = int(cfg_data['protocol']['ipv4_port']) + yc_ipv6_interface = (cfg_data['protocol']['ipv6_interface']) + yc_ipv6_port = int(cfg_data['protocol']['ipv6_port']) + yc_dns_cache_ttl_seconds = int( + cfg_data['protocol']['dns_cache_ttl_seconds']) + + # files and directories + yc_runtime_base_dir = (cfg_data['files']['runtime_base_dir']) + yc_log_dir = (cfg_data['files']['log_dir']) + yc_data_dir = (cfg_data['files']['data_dir']) + yc_pid_dir = (cfg_data['files']['pid_dir']) + yc_trap_conf = (cfg_data['files']['trap_conf']) + yc_trapd_diag = (cfg_data['files']['snmptrapd_diag']) + yc_raw_traps_log = (cfg_data['files']['raw_traps_log']) + yc_published_traps_dir = (cfg_data['files']['published_traps_dir']) + yc_trap_stats_log = (cfg_data['files']['trap_stats_log']) + yc_perm_status_file = (cfg_data['files']['perm_status_file']) + + # dmaap + yc_dmaap_conf = (cfg_data['dmaap']['dmaap_conf']) + yc_http_timeout = (cfg_data['dmaap']['http_timeout']) + yc_http_retries = (cfg_data['dmaap']['http_retries']) + yc_http_secs_between_retries = ( + cfg_data['dmaap']['http_secs_between_retries']) + yc_primary_publisher = (cfg_data['dmaap']['primary_publisher']) + yc_peer_publisher = (cfg_data['dmaap']['peer_publisher']) + yc_max_traps_between_publish = ( + cfg_data['dmaap']['max_traps_between_publish']) + yc_max_milliseconds_between_publish = ( + cfg_data['dmaap']['max_milliseconds_between_publish']) + + except: + print("ERROR reading config: %s" % loc_yaml_conf_file) + raise + cleanup_and_exit(1, "undefined") + + # print back for confirmation + print("Configs read from: %s" % loc_yaml_conf_file) + print(" protocol section:") + print(" transport: %s" % yc_transport) + print(" ipv4_port: %s" % yc_ipv4_port) + print(" ipv4_interface: %s" % yc_ipv4_interface) + print(" ipv6_port: %s" % yc_ipv6_port) + print(" ipv6_interface: %s" % yc_ipv6_interface) + print(" dns_cache_ttl_seconds: %s" % yc_dns_cache_ttl_seconds) + print(" files section:") + print(" runtime_base_dir: %s" % yc_runtime_base_dir) + print(" log_dir: %s" % yc_log_dir) + print(" data_dir: %s" % yc_data_dir) + print(" pid_dir: %s" % yc_pid_dir) + print(" trap_conf: %s" % yc_trap_conf) + print(" snmptrapd_diag: %s" % yc_trapd_diag) + print(" raw_traps_log: %s" % yc_raw_traps_log) + print(" published_traps_dir: %s" % yc_published_traps_dir) + print(" trap_stats_log: %s" % yc_trap_stats_log) + print(" perm_status_file: %s" % yc_perm_status_file) + print(" dmaap section:") + print(" dmaap_config_file: %s" % yc_dmaap_conf) + print(" http_timeout: %s" % yc_http_timeout) + print(" http_retries: %s" % yc_http_retries) + print(" http_secs_between_retries: %s" % + yc_http_secs_between_retries) + print(" primary_publisher: %s" % yc_primary_publisher) + print(" peer_publisher: %s" % yc_peer_publisher) + print(" max_traps_between_publish: %s" % + yc_max_traps_between_publish) + print(" max_milliseconds_between_publish: %s" % + yc_max_milliseconds_between_publish) + + _yaml_config_values = _yaml_config_values_nt(yc_transport=yc_transport, yc_ipv4_port=yc_ipv4_port, yc_ipv4_interface=yc_ipv4_interface, yc_ipv6_port=yc_ipv6_port, yc_ipv6_interface=yc_ipv6_interface, yc_dns_cache_ttl_seconds=yc_dns_cache_ttl_seconds, yc_runtime_base_dir=yc_runtime_base_dir, yc_log_dir=yc_log_dir, yc_data_dir=yc_data_dir, yc_pid_dir=yc_pid_dir, yc_trap_conf=yc_trap_conf, yc_trapd_diag=yc_trapd_diag, yc_raw_traps_log=yc_raw_traps_log, yc_published_traps_dir=yc_published_traps_dir, + yc_trap_stats_log=yc_trap_stats_log, yc_perm_status_file=yc_perm_status_file, yc_dmaap_conf=yc_dmaap_conf, yc_http_timeout=yc_http_timeout, yc_http_retries=yc_http_retries, yc_http_secs_between_retries=yc_http_secs_between_retries, yc_primary_publisher=yc_primary_publisher, yc_peer_publisher=yc_peer_publisher, yc_max_traps_between_publish=yc_max_traps_between_publish, yc_max_milliseconds_between_publish=yc_max_milliseconds_between_publish) + + return _yaml_config_values -- cgit 1.2.3-korg