input {
 beats {

 ## Add a id to plugin configuration. Can be anything unique.
 id => 'beats_plugin'

 ######## Connection configurations ########

 ## The port to listen on.
 port => 5044

 ## Close Idle clients after the specified time in seconds. Default is 60 seconds
 #client_inactivity_timeout => 60

 ######## Security configurations ########

 ## Enable encryption. Default false.
 #ssl => $filebeat_ssl

 ## ssl certificate path.
 #ssl_certificate => $filebeat_ssl_certificate

 ## SSL key to use.
 #ssl_key => $filebeat_ssl_key

 ##SSL key passphrase to use.
 #ssl_key_passphrase => $filebeat_ssl_key_passphrase

 ## Value can be any of: none, peer, force_peer.
 #ssl_verify_mode => $filebeat_ssl_verify_mode

 ## Time in milliseconds for an incomplete ssl handshake to timeout. Default is 10000 ms.
 #ssl_handshake_timeout => 10000
 include_codec_tag => false
 }
}


filter {
 # Filter for log4j xml events
 if "</log4j:event>" in [message] {
   #Filter to parse xml event and retrieve data
   xml {
     source => "message"
     store_xml => false
     remove_namespaces => true
     target => "xml_content"
     xpath => [ "/event/message/text()", "logmsg" ,
                "/event/@logger", "Logger",
                "/event/@timestamp", "Timestamp",
                "/event/@level", "loglevel",
                "/event/@thread", "Thread",
                "/event/throwable/text()", "Exceptionthrowable",
                "/event/NDC/text()", "NDCs",
                "/event/properties/data/@name","mdcname",
                "/event/properties/data/@value","mdcvalue"]

    }

   #Ruby filter to iterate and separate MDCs into documents
   ruby {
     code => '
       $i = 0
       $num = 0
       if event.get("[mdcname]")
         $num = event.get("[mdcname]").length
       end
	   if $num != 0
        until $i > $num do
         if event.get("[mdcname]").at($i) and event.get("[mdcvalue]").at($i)
            event.set(event.get("[mdcname]").at($i), event.get("[mdcvalue]").at($i))
         end
         $i=$i+1
        end
	   end
          '
    }

   #Validations
   if [Exceptionthrowable]
   {
      mutate {
        replace => {
           "exceptionmessage" => "%{[Exceptionthrowable]}"
        }
      }
    }

   if [NDCs]
   {
      mutate {
        replace => {
          "NDC" => "%{[NDCs]}"
        }
      }
   }

   mutate {
     replace => {
        "Logger" =>"%{[Logger]}"
        "logmsg" =>"%{[logmsg]}"
        "Timestamp" =>"%{[Timestamp]}"
        "loglevel" =>"%{[loglevel]}"
        "message" => "%{logmsg}"
        "Thread" => "%{[Thread]}"
     }
     remove_field => ["mdcname", "mdcvalue", "logmsg","Exceptionthrowable","NDCs"]
   }

   if [Timestamp]
   {
     date {
        match => ["Timestamp", "UNIX_MS"]
        target => "Timestamp"
     }
   }
 }
 # Filter for logback events
 else {

#  mutate { add_field => { "orgmsg" => "%{message}" } }    # Copy of orginal msg for debug

  mutate {
    gsub => [
      'message', ' = ', '=',
      'message', '= ', '=null',
      'message', '=\t', '=null	', #This null is followed by a tab
      'message', '\t$', '\t'
    ]
  }
  grok {
    break_on_match => false
    match => {
      "message" => ["%{TIMESTAMP_ISO8601:Timestamp}\t%{GREEDYDATA:Thread}\t%{SPACE}%{LOGLEVEL:loglevel}%{SPACE}\t%{JAVACLASS:Logger}\t(?:[^\t]+\t)*%{GREEDYDATA:message}",
                    "(?<MDCs>.*\t)"
                   ]
      "source" => ["/var/log/onap/(?<componentName>[^/]+)/",
                   "/var/log/onap/%{GREEDYDATA:componentLogFile}"
                  ]
    }
    overwrite => ["message"]
  }
  kv {
    source => "MDCs"
    field_split => "\t"
    trim_key => "\s"
    trim_value => "\s"
    remove_field => [ "MDCs" ]
  }

  date {
    match => [ "Timestamp", "ISO8601", "yyyy-MM-dd HH:mm:ss,SSS" ]
    target => "Timestamp"
  }

  if [source] == "/var/log/onap/aai/aai-ml/metrics.log" {
    csv {
      source => "message"
      separator => "|"
      quote_char => "`"
      columns => ["Begin TS", "End TS", "DuplicateRequestID", "Unknown1", "threadID", "phys/virt server name", "service name", "Partner Name", "Unknown2", "Unknown3", "Unknown4", "Unknown5", "Unknown6", "Unknown7", "Log level", "Unknown8", "Unknown9", "Status code", "Server", "Unknown10", "Unknown11", "Unknown12", "Unknown13", "Unknown14", "Unknown15", "Unknown16", "Unknown17", "Unknown18", "message"]
    }
  }
  else if [source] == "/var/log/onap/aai/aai-ml/audit.log" {
    csv {
      source => "message"
      separator => "|"
      quote_char => "`"
      columns => ["Begin TS", "End TS", "DuplicateRequestID", "Unknown1", "threadID", "phys/virt server name", "service name", "Partner Name", "Unknown2", "Unknown3", "Unknown4", "Unknown5", "Log level", "Unknown6", "Unknown7", "Status code", "Server", "Unknown10", "Unknown11", "Unknown12", "Unknown13", "Unknown14", "Unknown15", "Unknown16", "Unknown17", "message"]
    }
  }

  mutate {
    remove_field => ["DuplicateRequestID", "Unknown1", "Unknown2", "Unknown3", "Unknown4", "Unknown5", "Unknown6", "Unknown7", "Unknown8", "Unknown9", "Unknown10", "Unknown11", "Unknown12", "Unknown13", "Unknown14", "Unknown15", "Unknown16", "Unknown17", "Unknown18"]
  }

  if ([source] == "/var/log/onap/sdc/sdc-be/audit.log") {
    #Parse kvps in message
    kv {
      field_split => "\s"
      trim_key => "\s"
      trim_value => "\s"
    }

    #If Request Id is missing and DID is present use as RequestId
    if (![RequestId] and [DID] =~ /.+/) {
      mutate { add_field => { "RequestId" => "%{DID}" } }
    }
  }

 } #Close else statement for logback events
} #Close filter


output {
 elasticsearch {
 id => 'onap_es'

 ######### Security configurations #########

 user => "elastic"
 password => "changeme"

 ## The .cer or .pem file to validate the server's certificate
 #cacert => $es_cacert

 ## The keystore used to present a certificate to the server. It can be either .jks or .p12
 #keystore => $es_keystore
 #keystore_password => $es_keystore_password

 ## Enable SSL/TLS secured communication to Elasticsearch cluster.
 ## Default is not set which in that case depends on the protocol specidfied in hosts list
 #ssl => $es_ssl

 ## Option to validate the server's certificate. Default is true
 #ssl_certificate_verification => $es_ssl_certificate_verification

 ## The JKS truststore to validate the server's certificate.
 #truststore => $es_truststore
 #truststore_password => $es_truststore_password


 ######### Elasticsearchcluster and host configurations #########

#can specify one or a list of hosts. If sniffing is set, one is enough and others will be auto-discovered
##Also protocol can be specified like ["http://10.247.186.12:9200"]
 hosts => ["http://elasticsearch.{{.Values.nsPrefix}}-log:9200"]


 ## This setting asks Elasticsearch for the list of all cluster nodes and adds them to the hosts list. Default is false.
 sniffing => true

 ## How long to wait, in seconds, between sniffing attempts. Default is 5 seconds.
 #sniffing_delay => 5

 ## Set the address of a forward HTTP proxy.
 #proxy => $es_proxy

 ##Use this if you must run Elasticsearch behind a proxy that remaps the root path for the Elasticsearch HTTP API lives
 #path => $es_path

 ######### Elasticsearch request configurations #########

 ## This setting defines the maximum sized bulk request Logstash will make.
 #flush_size => ?

 ######### Document configurations #########

 index => "onaplogs-%{+YYYY.MM.dd}"
 document_type => "logs"

 ## This can be used to associate child documents with a parent using the parent ID.
 #parent => "abcd'
 }
}