summaryrefslogtreecommitdiffstats
path: root/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
diff options
context:
space:
mode:
Diffstat (limited to 'kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf')
-rw-r--r--kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf108
1 files changed, 108 insertions, 0 deletions
diff --git a/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
new file mode 100644
index 0000000000..aa087e3445
--- /dev/null
+++ b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
@@ -0,0 +1,108 @@
+input {
+ http_poller {
+ urls => {
+ event_queue => {
+ method => get
+ url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
+ headers => {
+ Accept => "application/json"
+ }
+ add_field => { "topic" => "${event_topic}" }
+ }
+ notification_queue => {
+ method => get
+ url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
+ headers => {
+ Accept => "application/json"
+ }
+ add_field => { "topic" => "${notification_topic}" }
+ }
+ request_queue => {
+ method => get
+ url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
+ headers => {
+ Accept => "application/json"
+ }
+ add_field => { "topic" => "${request_topic}" }
+ }
+ }
+ socket_timeout => 30
+ request_timeout => 30
+ interval => 60
+ codec => "plain"
+ }
+}
+
+filter {
+ # avoid noise if no entry in the list
+ if [message] == "[]" {
+ drop { }
+ }
+
+ # parse json, split the list into multiple events, and parse each event
+ json {
+ source => "[message]"
+ target => "message"
+ }
+ split {
+ field => "message"
+ }
+ json {
+ source => "message"
+ }
+ mutate { remove_field => [ "message" ] }
+ # express timestamps in milliseconds instead of microseconds
+ ruby {
+ code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))"
+ }
+ date {
+ match => [ "closedLoopAlarmStart", UNIX_MS ]
+ target => "closedLoopAlarmStart"
+ }
+
+ if [closedLoopAlarmEnd] {
+ ruby {
+ code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))"
+ }
+ date {
+ match => [ "closedLoopAlarmEnd", UNIX_MS ]
+ target => "closedLoopAlarmEnd"
+ }
+
+ }
+ #"yyyy-MM-dd HH:mm:ss"
+ if [notificationTime] {
+ mutate {
+ gsub => [
+ "notificationTime", " ", "T"
+ ]
+ }
+ date {
+ match => [ "notificationTime", ISO8601 ]
+ target => "notificationTime"
+ }
+ }
+}
+output {
+ stdout {
+ codec => rubydebug
+ }
+
+ if [http_request_failure] {
+ elasticsearch {
+ codec => "json"
+ hosts => ["${elasticsearch_base_url}"]
+ index => "errors-%{+YYYY.MM.DD}"
+ doc_as_upsert => true
+ }
+ } else {
+ elasticsearch {
+ codec => "json"
+ hosts => ["${elasticsearch_base_url}"]
+ index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
+ doc_as_upsert => true
+
+ }
+ }
+
+}