input { beats { ## Add a id to plugin configuration. Can be anything unique. id => 'beats_plugin' ######## Connection configurations ######## ## The port to listen on. port => 5044 ## Close Idle clients after the specified time in seconds. Default is 60 seconds #client_inactivity_timeout => 60 ######## Security configurations ######## ## Enable encryption. Default false. #ssl => $filebeat_ssl ## ssl certificate path. #ssl_certificate => $filebeat_ssl_certificate ## SSL key to use. #ssl_key => $filebeat_ssl_key ##SSL key passphrase to use. #ssl_key_passphrase => $filebeat_ssl_key_passphrase ## Value can be any of: none, peer, force_peer. #ssl_verify_mode => $filebeat_ssl_verify_mode ## Time in milliseconds for an incomplete ssl handshake to timeout. Default is 10000 ms. #ssl_handshake_timeout => 10000 include_codec_tag => false } } filter { # Filter for log4j xml events if "" in [message] { #Filter to parse xml event and retrieve data xml { source => "message" store_xml => false remove_namespaces => true target => "xml_content" xpath => [ "/event/message/text()", "logmsg" , "/event/@logger", "Logger", "/event/@timestamp", "Timestamp", "/event/@level", "loglevel", "/event/@thread", "Thread", "/event/throwable/text()", "Exceptionthrowable", "/event/NDC/text()", "NDCs", "/event/properties/data/@name","mdcname", "/event/properties/data/@value","mdcvalue"] } #Ruby filter to iterate and separate MDCs into documents ruby { code => ' $i = 0 $num = 0 if event.get("[mdcname]") $num = event.get("[mdcname]").length end if $num != 0 until $i > $num do if event.get("[mdcname]").at($i) and event.get("[mdcvalue]").at($i) event.set(event.get("[mdcname]").at($i), event.get("[mdcvalue]").at($i)) end $i=$i+1 end end ' } #Validations if [Exceptionthrowable] { mutate { replace => { "exceptionmessage" => "%{[Exceptionthrowable]}" } } } if [NDCs] { mutate { replace => { "NDC" => "%{[NDCs]}" } } } mutate { replace => { "Logger" =>"%{[Logger]}" "logmsg" =>"%{[logmsg]}" "Timestamp" =>"%{[Timestamp]}" "loglevel" =>"%{[loglevel]}" "message" => "%{logmsg}" "Thread" => "%{[Thread]}" } remove_field => ["mdcname", "mdcvalue", "logmsg","Exceptionthrowable","NDCs"] } if [Timestamp] { date { match => ["Timestamp", "UNIX_MS"] target => "Timestamp" } } } # Filter for logback events else { # mutate { add_field => { "orgmsg" => "%{message}" } } # Copy of orginal msg for debug mutate { gsub => [ 'message', ' = ', '=', 'message', '= ', '=null', 'message', '=\t', '=null ', #This null is followed by a tab 'message', '\t$', '\t' ] } grok { break_on_match => false match => { "message" => ["%{TIMESTAMP_ISO8601:Timestamp}\t%{GREEDYDATA:Thread}\t%{SPACE}%{LOGLEVEL:loglevel}%{SPACE}\t%{JAVACLASS:Logger}\t(?:[^\t]+\t)*%{GREEDYDATA:message}", "(?.*\t)" ] "source" => ["/var/log/onap/(?[^/]+)/", "/var/log/onap/%{GREEDYDATA:componentLogFile}" ] } overwrite => ["message"] } kv { source => "MDCs" field_split => "\t" trim_key => "\s" trim_value => "\s" remove_field => [ "MDCs" ] } date { match => [ "Timestamp", "ISO8601", "yyyy-MM-dd HH:mm:ss,SSS" ] target => "Timestamp" } if [source] == "/var/log/onap/aai/aai-ml/metrics.log" { csv { source => "message" separator => "|" quote_char => "`" columns => ["Begin TS", "End TS", "DuplicateRequestID", "Unknown1", "threadID", "phys/virt server name", "service name", "Partner Name", "Unknown2", "Unknown3", "Unknown4", "Unknown5", "Unknown6", "Unknown7", "Log level", "Unknown8", "Unknown9", "Status code", "Server", "Unknown10", "Unknown11", "Unknown12", "Unknown13", "Unknown14", "Unknown15", "Unknown16", "Unknown17", "Unknown18", "message"] } } else if [source] == "/var/log/onap/aai/aai-ml/audit.log" { csv { source => "message" separator => "|" quote_char => "`" columns => ["Begin TS", "End TS", "DuplicateRequestID", "Unknown1", "threadID", "phys/virt server name", "service name", "Partner Name", "Unknown2", "Unknown3", "Unknown4", "Unknown5", "Log level", "Unknown6", "Unknown7", "Status code", "Server", "Unknown10", "Unknown11", "Unknown12", "Unknown13", "Unknown14", "Unknown15", "Unknown16", "Unknown17", "message"] } } mutate { remove_field => ["DuplicateRequestID", "Unknown1", "Unknown2", "Unknown3", "Unknown4", "Unknown5", "Unknown6", "Unknown7", "Unknown8", "Unknown9", "Unknown10", "Unknown11", "Unknown12", "Unknown13", "Unknown14", "Unknown15", "Unknown16", "Unknown17", "Unknown18"] } if ([source] == "/var/log/onap/sdc/sdc-be/audit.log") { #Parse kvps in message kv { field_split => "\s" trim_key => "\s" trim_value => "\s" } #If Request Id is missing and DID is present use as RequestId if (![RequestId] and [DID] =~ /.+/) { mutate { add_field => { "RequestId" => "%{DID}" } } } } } #Close else statement for logback events } #Close filter output { elasticsearch { id => 'onap_es' ######### Security configurations ######### user => "elastic" password => "changeme" ## The .cer or .pem file to validate the server's certificate #cacert => $es_cacert ## The keystore used to present a certificate to the server. It can be either .jks or .p12 #keystore => $es_keystore #keystore_password => $es_keystore_password ## Enable SSL/TLS secured communication to Elasticsearch cluster. ## Default is not set which in that case depends on the protocol specidfied in hosts list #ssl => $es_ssl ## Option to validate the server's certificate. Default is true #ssl_certificate_verification => $es_ssl_certificate_verification ## The JKS truststore to validate the server's certificate. #truststore => $es_truststore #truststore_password => $es_truststore_password ######### Elasticsearchcluster and host configurations ######### #can specify one or a list of hosts. If sniffing is set, one is enough and others will be auto-discovered ##Also protocol can be specified like ["http://10.247.186.12:9200"] hosts => ["http://elasticsearch.onap-log:9200"] ## This setting asks Elasticsearch for the list of all cluster nodes and adds them to the hosts list. Default is false. sniffing => true ## How long to wait, in seconds, between sniffing attempts. Default is 5 seconds. #sniffing_delay => 5 ## Set the address of a forward HTTP proxy. #proxy => $es_proxy ##Use this if you must run Elasticsearch behind a proxy that remaps the root path for the Elasticsearch HTTP API lives #path => $es_path ######### Elasticsearch request configurations ######### ## This setting defines the maximum sized bulk request Logstash will make. #flush_size => ? ######### Document configurations ######### index => "onaplogs-%{+YYYY.MM.dd}" document_type => "logs" ## This can be used to associate child documents with a parent using the parent ID. #parent => "abcd' } }