1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
import logging
from confluent_kafka import Consumer
import json
logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
class CustomKafkaConsumer:
def __init__(self):
self.output_map = dict()
self.topic_name = "metrics3"
#self.topic_name = "adatopic1"
self.consumer = Consumer({
'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092',
#'bootstrap.servers': '172.25.103.6:31610',
'group.id': 'grp1',
'auto.offset.reset': 'earliest'
})
self.duration = 31536000 #50
self.time_format = 'timestamp' #or 'iso'
# duration may be equal to no_of_recs_wanted, say we gurantee 50 secs generate 50 recs
self.no_of_recs_wanted = 3
def processMessage(self, msg_key, msg_val):
python_obj = {}
try:
python_obj = json.loads(msg_key)
except ValueError:
pass
try:
python_obj = json.loads(msg_val)
except ValueError:
pass
#print(python_obj["labels"]["__name__"])
metric_name = python_obj["labels"]["__name__"]
ip = python_obj["labels"]["instance"]
if self.time_format == 'iso':
logging.info("Time_format is ISO-FORMAT")
iso_time = python_obj["timestamp"]
logging.info("iso_time:: {}".format(iso_time))
import dateutil.parser as dp
parsed_datetime_obj = dp.parse(iso_time)
from datetime import datetime
now_datetime_obj = datetime.now()
st_datetime_obj = now_datetime_obj - datetime.timedelta(seconds= self.duration)
en_datetime_obj = now_datetime_obj
if st_datetime_obj <= parsed_datetime_obj and parsed_datetime_obj <= en_datetime_obj:
logging.info("Parsed a relevant record")
if metric_name in self.output_map:
if ip in self.output_map[metric_name]:
self.output_map[metric_name][ip].append(python_obj)
logging.info("::Appended a record to existing time series data::")
else:
self.output_map[metric_name][ip] = list()
self.output_map[metric_name][ip].append(python_obj)
logging.info("::Appended a recorded to existing time series data with a new ip::")
else:
self.output_map[metric_name] = dict()
self.output_map[metric_name][ip] = list()
self.output_map[metric_name][ip].append(python_obj)
logging.info("::Inserted the first record to a new time series::")
else:
logging.info("Time_format is timestamp")
parsed_timestamp = python_obj["timestamp"]
logging.info("parsed_timestamp:: {}".format(parsed_timestamp))
from datetime import datetime, timedelta
now_datetime_obj = datetime.now()
st_datetime_obj = now_datetime_obj - timedelta(seconds=self.duration)
en_datetime_obj = now_datetime_obj
st_timestamp = int(st_datetime_obj.timestamp()*1000)
en_timestamp = int(en_datetime_obj.timestamp()*1000)
logging.info("st_timestamp:: {}".format(st_timestamp))
logging.info("en_timestamp:: {}".format(en_timestamp))
if st_timestamp <= parsed_timestamp and en_timestamp>=parsed_timestamp:
if metric_name in self.output_map:
if ip in self.output_map[metric_name]:
self.output_map[metric_name][ip].append(python_obj)
logging.info("::Appended a record to existing time series data::")
else:
self.output_map[metric_name][ip] = list()
self.output_map[metric_name][ip].append(python_obj)
logging.info("::Appended a recorded to existing time series data with a new ip::")
else:
self.output_map[metric_name] = dict()
self.output_map[metric_name][ip] = list()
self.output_map[metric_name][ip].append(python_obj)
logging.info("::Inserted the first record to a new time series::")
logging.info("The size of the o/p map :: {}".format(len(self.output_map[metric_name][ip])))
if len(self.output_map[metric_name][ip]) == self.no_of_recs_wanted:
logging.info("Size of the q {}-{} exceeded ".format(metric_name, ip))
logging.info("Poping out the record: {}".format(self.output_map[metric_name][ip].pop(0)))
def executeQuery(self, metric_name, ip):
if metric_name in self.output_map:
if ip in self.output_map[metric_name]:
return self.output_map[metric_name][ip]
def consume(self):
self.consumer.subscribe([self.topic_name])
while True:
msg = self.consumer.poll(1.0)
if msg is None:
logging.info('Looking for message on topic:: {}'.format(self.topic_name))
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
# print("msg type:: {} and msg:: {}".format(type(msg), msg))
# print('Received message key from producer: {}'.format(msg.key().decode('utf-8')))
# print('Received message val from producer: {}'.format(msg.value().decode('utf-8')))
# print("mes-key-type:: {}".format(type(msg.key().decode('utf-8'))))
# print("msg-value-type:: {}".format(type(msg.value().decode('utf-8'))))
self.processMessage(msg.key(), msg.value())
self.consumer.close()
|