blob: 2b5a24e0436e75187220d7dda2d716fec193284d (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
input {
http_poller {
urls => {
event_queue => {
method => get
url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
}
add_field => { "topic" => "${event_topic}" }
}
notification_queue => {
method => get
url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
}
add_field => { "topic" => "${notification_topic}" }
}
request_queue => {
method => get
url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
}
add_field => { "topic" => "${request_topic}" }
}
}
socket_timeout => 30
request_timeout => 30
interval => 15
codec => "plain"
}
}
filter {
# avoid noise if no entry in the list
if [message] == "[]" {
drop { }
}
# parse json, split the list into multiple events, and parse each event
json {
source => "[message]"
target => "message"
}
split {
field => "message"
}
json {
source => "message"
}
mutate { remove_field => [ "message" ] }
# express timestamps in milliseconds instead of microseconds
ruby {
code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))"
}
date {
match => [ "closedLoopAlarmStart", UNIX_MS ]
target => "closedLoopAlarmStart"
}
if [closedLoopAlarmEnd] {
ruby {
code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))"
}
date {
match => [ "closedLoopAlarmEnd", UNIX_MS ]
target => "closedLoopAlarmEnd"
}
}
#"yyyy-MM-dd HH:mm:ss"
if [notificationTime] {
mutate {
gsub => [
"notificationTime", " ", "T"
]
}
date {
match => [ "notificationTime", ISO8601 ]
target => "notificationTime"
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
codec => "json"
hosts => [elasticsearch]
index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
doc_as_upsert => true
}
}
|