input {
  http_poller {
        urls => {
            event_queue => {
                method => get
                url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
                headers => {
                    Accept => "application/json"
                }
                add_field => { "topic" => "${event_topic}" }
                type => "dmaap_event"
            }
            notification_queue => {
                method => get
                url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
                headers => {
                    Accept => "application/json"
                }
                add_field => { "topic" => "${notification_topic}" }
                type => "dmaap_notification"
            }
            request_queue => {
                method => get
                url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
                headers => {
                    Accept => "application/json"
                }
                add_field => { "topic" => "${request_topic}" }
                type => "dmaap_request"
            }
        }
        socket_timeout => 30
        request_timeout => 30
        codec => "plain"
        schedule => { "every" => "1m"  }
  }
}

input {
  file {
    path => [
      "/log-input/dmaap_evt.log"
    ]
    type => "dmaap_log"
    codec => "json"
  }
}

filter {

    # parse json, split  the list into multiple events, and parse each event
    if [type] != "dmaap_log" {
	    # avoid noise if no entry in the list
	    if [message] == "[]" {
	       drop { }
	    }
	    
	    json {
	         source => "[message]"
	         target => "message"
	    }
#	    ruby {
#	    	code => "event.get('message').each{|m| m.set('type',event.get('type')}"
#	    }
	    split {
	          field => "message"
	          add_field => {
      			"type" => "%{type}"
      			"topic" => "%{topic}"
    		  }
	    }
	    
	    json {
	         source => "message"
	    }
	    
	    mutate { remove_field => [ "message" ] }
    }
    
    # express timestamps in milliseconds instead of microseconds
    if [closedLoopAlarmStart] {
        ruby {
            code => "
                     if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
                       event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
                     else
                       event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
                     end
                    "
        }
        date {
            match => [ "closedLoopAlarmStart", UNIX_MS ]
            target => "closedLoopAlarmStart"
        }
    }

    if [closedLoopAlarmEnd] {
        ruby {
            code => "
                    if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999  
                      event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
                    else
                      event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
                    end
                    "
        }
        date {
            match => [ "closedLoopAlarmEnd", UNIX_MS ]
            target => "closedLoopAlarmEnd"
        }

    }
    #"yyyy-MM-dd HH:mm:ss"
    if [notificationTime] {
       mutate {
              gsub => [
                   "notificationTime", " ", "T"
              ]
       }
       date {
            match => [ "notificationTime", ISO8601 ]
            target => "notificationTime"
       }
    }
}
output {
    stdout {
        codec => rubydebug
    }

    if [http_request_failure] {
        elasticsearch {
            codec => "json"
            hosts => ["${elasticsearch_hosts}"]
            index => "errors-%{+YYYY.MM.DD}"
            doc_as_upsert => true
        }
    } else {
        elasticsearch {
            codec => "json"
            hosts => ["${elasticsearch_hosts}"]
            index => "events-%{+YYYY.MM.DD}" # creates daily indexes
            doc_as_upsert => true

        }
    }

}