summaryrefslogtreecommitdiffstats
path: root/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
diff options
context:
space:
mode:
Diffstat (limited to 'kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf')
-rw-r--r--kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf56
1 files changed, 39 insertions, 17 deletions
diff --git a/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
index 5d92de637b..f88e40da14 100644
--- a/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
+++ b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
@@ -21,6 +21,7 @@ input {
Accept => "application/json"
}
add_field => { "topic" => "${event_topic}" }
+ type => "dmaap_event"
}
notification_queue => {
method => get
@@ -29,6 +30,7 @@ input {
Accept => "application/json"
}
add_field => { "topic" => "${notification_topic}" }
+ type => "dmaap_notification"
}
request_queue => {
method => get
@@ -37,6 +39,7 @@ input {
Accept => "application/json"
}
add_field => { "topic" => "${request_topic}" }
+ type => "dmaap_request"
}
}
socket_timeout => 30
@@ -47,26 +50,39 @@ input {
}
filter {
- # avoid noise if no entry in the list
- if [message] == "[]" {
- drop { }
- }
+ if [type] != "dmaap_log" {
+ # avoid noise if no entry in the list
+ if [message] == "[]" {
+ drop { }
+ }
- # parse json, split the list into multiple events, and parse each event
- json {
- source => "[message]"
- target => "message"
- }
- split {
- field => "message"
- }
- json {
- source => "message"
+ # parse json, split the list into multiple events, and parse each event
+ json {
+ source => "[message]"
+ target => "message"
+ }
+ split {
+ field => "message"
+ add_field => {
+ "type" => "%{type}"
+ "topic" => "%{topic}"
+ }
+ }
+ json {
+ source => "message"
+ }
+ mutate { remove_field => [ "message" ] }
}
- mutate { remove_field => [ "message" ] }
+
# express timestamps in milliseconds instead of microseconds
ruby {
- code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))"
+ code => "
+ if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
+ event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
+ else
+ event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
+ end
+ "
}
date {
match => [ "closedLoopAlarmStart", UNIX_MS ]
@@ -75,7 +91,13 @@ filter {
if [closedLoopAlarmEnd] {
ruby {
- code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))"
+ code => "
+ if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999
+ event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
+ else
+ event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
+ end
+ "
}
date {
match => [ "closedLoopAlarmEnd", UNIX_MS ]