# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. input { http_poller { urls => { event_queue => { method => get url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${event_topic}" } type => "dmaap_event" } notification_queue => { method => get url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${notification_topic}" } type => "dmaap_notification" } request_queue => { method => get url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${request_topic}" } type => "dmaap_request" } } socket_timeout => 30 request_timeout => 30 codec => "plain" schedule => { "every" => "1m" } cacert => "/certs.d/aafca.pem" } } input { file { path => [ "/log-input/*" ] type => "dmaap_log" codec => "json" } } filter { if [type] != "dmaap_log" { #only execute this section for dmaap events from http request #it doesn't apply to dmaap events from log file # avoid noise if no entry in the list if [message] == "[]" { drop { } } if [http_request_failure] or [@metadata][code] != "200" { mutate { add_tag => [ "error" ] } } if "dmaap_source" in [tags] { # # Dmaap provides a json list, whose items are Strings containing the event # provided to Dmaap, which itself is an escaped json. # # We first need to parse the json as we have to use the plaintext as it cannot # work with list of events, then split that list into multiple string events, # that we then transform into json. # json { source => "[message]" target => "message" } ruby { code => " for ev in event.get('message', []) ev.set('@metadata', event.get('@metadata')) end " } split { field => "message" } json { source => "message" } mutate { remove_field => [ "message" ] } } } #now start the common, to both http request and log file events, processing # # Some timestamps are expressed as milliseconds, some are in microseconds # if [closedLoopAlarmStart] { ruby { code => " if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999 event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000) else event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10)) end " } date { match => [ "closedLoopAlarmStart", UNIX_MS ] target => "closedLoopAlarmStart" } } if [closedLoopAlarmEnd] { ruby { code => " if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999 event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000) else event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10)) end " } date { match => [ "closedLoopAlarmEnd", UNIX_MS ] target => "closedLoopAlarmEnd" } } # # Notification time are expressed under the form "yyyy-MM-dd HH:mm:ss", which # is close to ISO8601, but lacks of T as spacer: "yyyy-MM-ddTHH:mm:ss" # if [notificationTime] { mutate { gsub => [ "notificationTime", " ", "T" ] } date { match => [ "notificationTime", ISO8601 ] target => "notificationTime" } } # # Renaming some fields for readability # if [AAI][generic-vnf.vnf-name] { mutate { add_field => { "vnfName" => "%{[AAI][generic-vnf.vnf-name]}" } } } if [AAI][generic-vnf.vnf-type] { mutate { add_field => { "vnfType" => "%{[AAI][generic-vnf.vnf-type]}" } } } if [AAI][vserver.vserver-name] { mutate { add_field => { "vmName" => "%{[AAI][vserver.vserver-name]}" } } } if [AAI][complex.city] { mutate { add_field => { "locationCity" => "%{[AAI][complex.city]}" } } } if [AAI][complex.state] { mutate { add_field => { "locationState" => "%{[AAI][complex.state]}" } } } # # Adding some flags to ease aggregation # if [closedLoopEventStatus] =~ /(?i)ABATED/ { mutate { add_field => { "flagAbated" => "1" } } } if [notification] =~ /^.*?(?:\b|_)FINAL(?:\b|_).*?(?:\b|_)FAILURE(?:\b|_).*?$/ { mutate { add_field => { "flagFinalFailure" => "1" } } } if "error" not in [tags] { # # Creating data for a secondary index # clone { clones => [ "event-cl-aggs" ] add_tag => [ "event-cl-aggs" ] } if "event-cl-aggs" in [tags] { # # we only need a few fields for aggregations; remove all fields from clone except : # vmName,vnfName,vnfType,requestID,closedLoopAlarmStart, closedLoopControlName,closedLoopAlarmEnd,abated,nbrDmaapevents,finalFailure # prune { whitelist_names => ["^@.*$","^topic$","^type$","^tags$","^flagFinalFailure$","^flagAbated$","^locationState$","^locationCity$","^vmName$","^vnfName$","^vnfType$","^requestID$","^closedLoopAlarmStart$","^closedLoopControlName$","^closedLoopAlarmEnd$","^target$","^target_type$","^triggerSourceName$","^policyScope$","^policyName$","^policyVersion$"] } } } } output { stdout { codec => rubydebug { metadata => true } } if "error" in [tags] { elasticsearch { ilm_enabled => false codec => "json" ssl => true cacert => "/clamp-cert/ca-certs.pem" ssl_certificate_verification => false hosts => ["${elasticsearch_base_url}"] user => "${LOGSTASH_USR}" password => "${LOGSTASH_PWD}" index => "errors-%{+YYYY.MM.DD}" doc_as_upsert => true } } else if "event-cl-aggs" in [tags] { elasticsearch { ilm_enabled => false codec => "json" ssl => true cacert => "/clamp-cert/ca-certs.pem" ssl_certificate_verification => false hosts => ["${elasticsearch_base_url}"] user => "${LOGSTASH_USR}" password => "${LOGSTASH_PWD}" document_id => "%{requestID}" index => "events-cl-%{+YYYY.MM.DD}" # creates daily indexes for control loop doc_as_upsert => true action => "update" } } else { elasticsearch { ilm_enabled => false codec => "json" ssl => true cacert => "/clamp-cert/ca-certs.pem" ssl_certificate_verification => false hosts => ["${elasticsearch_base_url}"] user => "${LOGSTASH_USR}" password => "${LOGSTASH_PWD}" index => "events-raw-%{+YYYY.MM.DD}" # creates daily indexes doc_as_upsert => true } } }