diff options
Diffstat (limited to 'kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf')
-rw-r--r-- | kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf new file mode 100644 index 0000000000..aa087e3445 --- /dev/null +++ b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf @@ -0,0 +1,108 @@ +input { + http_poller { + urls => { + event_queue => { + method => get + url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" + headers => { + Accept => "application/json" + } + add_field => { "topic" => "${event_topic}" } + } + notification_queue => { + method => get + url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" + headers => { + Accept => "application/json" + } + add_field => { "topic" => "${notification_topic}" } + } + request_queue => { + method => get + url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" + headers => { + Accept => "application/json" + } + add_field => { "topic" => "${request_topic}" } + } + } + socket_timeout => 30 + request_timeout => 30 + interval => 60 + codec => "plain" + } +} + +filter { + # avoid noise if no entry in the list + if [message] == "[]" { + drop { } + } + + # parse json, split the list into multiple events, and parse each event + json { + source => "[message]" + target => "message" + } + split { + field => "message" + } + json { + source => "message" + } + mutate { remove_field => [ "message" ] } + # express timestamps in milliseconds instead of microseconds + ruby { + code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))" + } + date { + match => [ "closedLoopAlarmStart", UNIX_MS ] + target => "closedLoopAlarmStart" + } + + if [closedLoopAlarmEnd] { + ruby { + code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))" + } + date { + match => [ "closedLoopAlarmEnd", UNIX_MS ] + target => "closedLoopAlarmEnd" + } + + } + #"yyyy-MM-dd HH:mm:ss" + if [notificationTime] { + mutate { + gsub => [ + "notificationTime", " ", "T" + ] + } + date { + match => [ "notificationTime", ISO8601 ] + target => "notificationTime" + } + } +} +output { + stdout { + codec => rubydebug + } + + if [http_request_failure] { + elasticsearch { + codec => "json" + hosts => ["${elasticsearch_base_url}"] + index => "errors-%{+YYYY.MM.DD}" + doc_as_upsert => true + } + } else { + elasticsearch { + codec => "json" + hosts => ["${elasticsearch_base_url}"] + index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes + doc_as_upsert => true + + } + } + +} |