summaryrefslogtreecommitdiffstats
path: root/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
blob: 5d92de637bf81801068168b015476f1aaaf83dd3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# Copyright © 2018  AT&T, Amdocs, Bell Canada Intellectual Property.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
input {
  http_poller {
        urls => {
            event_queue => {
                method => get
                url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
                headers => {
                    Accept => "application/json"
                }
                add_field => { "topic" => "${event_topic}" }
            }
            notification_queue => {
                method => get
                url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
                headers => {
                    Accept => "application/json"
                }
                add_field => { "topic" => "${notification_topic}" }
            }
            request_queue => {
                method => get
                url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
                headers => {
                    Accept => "application/json"
                }
                add_field => { "topic" => "${request_topic}" }
            }
        }
        socket_timeout => 30
        request_timeout => 30
        interval => 60
        codec => "plain"
  }
}

filter {
    # avoid noise if no entry in the list
    if [message] == "[]" {
       drop { }
    }

    # parse json, split  the list into multiple events, and parse each event
    json {
         source => "[message]"
         target => "message"
    }
    split {
          field => "message"
    }
    json {
         source => "message"
    }
    mutate { remove_field => [ "message" ] }
    # express timestamps in milliseconds instead of microseconds
    ruby {
        code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))"
    }
    date {
        match => [ "closedLoopAlarmStart", UNIX_MS ]
        target => "closedLoopAlarmStart"
    }

    if [closedLoopAlarmEnd] {
        ruby {
            code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))"
        }
        date {
            match => [ "closedLoopAlarmEnd", UNIX_MS ]
            target => "closedLoopAlarmEnd"
        }

    }
    #"yyyy-MM-dd HH:mm:ss"
    if [notificationTime] {
       mutate {
              gsub => [
                   "notificationTime", " ", "T"
              ]
       }
       date {
            match => [ "notificationTime", ISO8601 ]
            target => "notificationTime"
       }
    }
}
output {
    stdout {
        codec => rubydebug
    }

    if [http_request_failure] {
        elasticsearch {
            codec => "json"
            hosts => ["${elasticsearch_base_url}"]
            index => "errors-%{+YYYY.MM.DD}"
            doc_as_upsert => true
        }
    } else {
        elasticsearch {
            codec => "json"
            hosts => ["${elasticsearch_base_url}"]
            index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
            doc_as_upsert => true

        }
    }

}