From 53c9fab327d1d9a079154b01242cf0930c106989 Mon Sep 17 00:00:00 2001 From: Rajamohan Raj Date: Fri, 1 Nov 2019 00:30:30 +0000 Subject: First working draft of kafka for inference app Created a python based inference app which can query a given metrics for a given duration from kafka topic. Consumer runs on separate thread and doesnt interfere with the main app. Issue-ID: ONAPARC-528 Signed-off-by: Rajamohan Raj Change-Id: Ic84ea137b134385246bf11dee2ed6d34b593b956 --- .../python-kafkaConsumer-inference-app/README.md | 1 + .../kafka.inference.deploy.yaml | 31 ++++++ .../skaffold.yaml | 19 ++++ .../src/Dockerfile | 35 ++++++ .../src/__init__.py | 0 .../src/consumer/CustomKafkaConsumer.py | 120 +++++++++++++++++++++ .../src/consumer/__init__.py | 0 .../python-kafkaConsumer-inference-app/src/main.py | 47 ++++++++ .../src/producer/CustomKafkaProducer.py | 38 +++++++ .../src/producer/__init__.py | 0 .../src/requirements.txt | 2 + .../src/utils/utils.py | 8 ++ 12 files changed, 301 insertions(+) create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/README.md create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/kubernetes-manifests/kafka.inference.deploy.yaml create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/skaffold.yaml create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/Dockerfile create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/__init__.py create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/CustomKafkaConsumer.py create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/__init__.py create mode 100755 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/requirements.txt create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/utils/utils.py diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/README.md b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/README.md new file mode 100644 index 00000000..e1e30f31 --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/README.md @@ -0,0 +1 @@ +### Kakfka based Inference APP \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/kubernetes-manifests/kafka.inference.deploy.yaml b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/kubernetes-manifests/kafka.inference.deploy.yaml new file mode 100644 index 00000000..bbd3d55b --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/kubernetes-manifests/kafka.inference.deploy.yaml @@ -0,0 +1,31 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: python-kafkaconsumer-inference-app + labels: + app: python-kafkaconsumer-inference-app + tier: app +spec: + replicas: 1 + selector: + matchLabels: + app: python-kafkaconsumer-inference-app + tier: app + template: + metadata: + labels: + app: python-kafkaconsumer-inference-app + tier: app + spec: + containers: + - name: python-kafkaconsumer-inference-app + image: python-kafkaconsumer-inference-app + ports: + - containerPort: 8080 + resources: + requests: + memory: "640Mi" + cpu: "2500m" + limits: + memory: "1280Mi" + cpu: "5000m" diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/skaffold.yaml b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/skaffold.yaml new file mode 100644 index 00000000..e8891c6b --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/skaffold.yaml @@ -0,0 +1,19 @@ +apiVersion: skaffold/v1beta11 +kind: Config +build: + tagPolicy: + sha256: {} + # defines where to find the code at build time and where to push the resulting image + artifacts: + - context: src + image: python-kafkaconsumer-inference-app +# defines the Kubernetes manifests to deploy on each run +deploy: + kubectl: + manifests: + - kubernetes-manifests/**.yaml +# use the cloudbuild profile to build images using Google Cloud Build +profiles: +- name: cloudbuild + build: + googleCloudBuild: {} diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/Dockerfile b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/Dockerfile new file mode 100644 index 00000000..8c5d822d --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/Dockerfile @@ -0,0 +1,35 @@ +# Python image to use. +FROM python:3.8 + +# Set the working directory to /src/hdfs-writer +WORKDIR /src/inferenceApp + +# Install librdkafka +RUN mkdir /librdkafka-dir && cd /librdkafka-dir +RUN git clone https://github.com/edenhill/librdkafka.git && \ +cd librdkafka && \ +./configure --prefix /usr && \ +make && \ +make install + +#RUN export PYTHONPATH="/usr/bin/python3:/src/python-kafkaconsumer-inference-app/" + +# copy the requirements file used for dependencies +COPY requirements.txt . + +# Install any needed packages specified in requirements.txt +RUN pip install --trusted-host pypi.python.org -r requirements.txt + +RUN pip install confluent-kafka +RUN pip install python-dateutil + +# Install ptvsd for debugging +RUN pip install ptvsd + + + +# Copy the rest of the working directory contents into the container at /app +COPY . ./ + +# Start the server when the container launches +CMD ["python3", "-m", "ptvsd", "--host", "localhost", "--port", "5000", "--wait", "/src/inferenceApp/main.py"] \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/__init__.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/CustomKafkaConsumer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/CustomKafkaConsumer.py new file mode 100644 index 00000000..1e311bf1 --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/CustomKafkaConsumer.py @@ -0,0 +1,120 @@ +import logging +from confluent_kafka import Consumer +import json + +logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S') + + +class CustomKafkaConsumer: + def __init__(self): + self.output_map = dict() + self.topic_name = "metrics3" + #self.topic_name = "adatopic1" + self.consumer = Consumer({ + 'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092', + #'bootstrap.servers': '172.25.103.6:31610', + 'group.id': 'grp1', + 'auto.offset.reset': 'earliest' + }) + self.duration = 31536000 #50 + self.time_format = 'timestamp' #or 'iso' + # duration may be equal to no_of_recs_wanted, say we gurantee 50 secs generate 50 recs + self.no_of_recs_wanted = 3 + + + def processMessage(self, msg_key, msg_val): + python_obj = {} + try: + python_obj = json.loads(msg_key) + except ValueError: + pass + try: + python_obj = json.loads(msg_val) + except ValueError: + pass + #print(python_obj["labels"]["__name__"]) + metric_name = python_obj["labels"]["__name__"] + ip = python_obj["labels"]["instance"] + if self.time_format == 'iso': + logging.info("Time_format is ISO-FORMAT") + iso_time = python_obj["timestamp"] + logging.info("iso_time:: {}".format(iso_time)) + import dateutil.parser as dp + parsed_datetime_obj = dp.parse(iso_time) + from datetime import datetime + now_datetime_obj = datetime.now() + st_datetime_obj = now_datetime_obj - datetime.timedelta(seconds= self.duration) + en_datetime_obj = now_datetime_obj + if st_datetime_obj <= parsed_datetime_obj and parsed_datetime_obj <= en_datetime_obj: + logging.info("Parsed a relevant record") + if metric_name in self.output_map: + if ip in self.output_map[metric_name]: + self.output_map[metric_name][ip].append(python_obj) + logging.info("::Appended a record to existing time series data::") + else: + self.output_map[metric_name][ip] = list() + self.output_map[metric_name][ip].append(python_obj) + logging.info("::Appended a recorded to existing time series data with a new ip::") + else: + self.output_map[metric_name] = dict() + self.output_map[metric_name][ip] = list() + self.output_map[metric_name][ip].append(python_obj) + logging.info("::Inserted the first record to a new time series::") + else: + logging.info("Time_format is timestamp") + parsed_timestamp = python_obj["timestamp"] + logging.info("parsed_timestamp:: {}".format(parsed_timestamp)) + from datetime import datetime, timedelta + now_datetime_obj = datetime.now() + st_datetime_obj = now_datetime_obj - timedelta(seconds=self.duration) + en_datetime_obj = now_datetime_obj + st_timestamp = int(st_datetime_obj.timestamp()*1000) + en_timestamp = int(en_datetime_obj.timestamp()*1000) + + logging.info("st_timestamp:: {}".format(st_timestamp)) + logging.info("en_timestamp:: {}".format(en_timestamp)) + if st_timestamp <= parsed_timestamp and en_timestamp>=parsed_timestamp: + if metric_name in self.output_map: + if ip in self.output_map[metric_name]: + self.output_map[metric_name][ip].append(python_obj) + logging.info("::Appended a record to existing time series data::") + else: + self.output_map[metric_name][ip] = list() + self.output_map[metric_name][ip].append(python_obj) + logging.info("::Appended a recorded to existing time series data with a new ip::") + else: + self.output_map[metric_name] = dict() + self.output_map[metric_name][ip] = list() + self.output_map[metric_name][ip].append(python_obj) + logging.info("::Inserted the first record to a new time series::") + + logging.info("The size of the o/p map :: {}".format(len(self.output_map[metric_name][ip]))) + if len(self.output_map[metric_name][ip]) == self.no_of_recs_wanted: + logging.info("Size of the q {}-{} exceeded ".format(metric_name, ip)) + logging.info("Poping out the record: {}".format(self.output_map[metric_name][ip].pop(0))) + + + def executeQuery(self, metric_name, ip): + if metric_name in self.output_map: + if ip in self.output_map[metric_name]: + return self.output_map[metric_name][ip] + + + def consume(self): + self.consumer.subscribe([self.topic_name]) + while True: + msg = self.consumer.poll(1.0) + if msg is None: + logging.info('Looking for message on topic:: {}'.format(self.topic_name)) + continue + if msg.error(): + print("Consumer error: {}".format(msg.error())) + continue + # print("msg type:: {} and msg:: {}".format(type(msg), msg)) + # print('Received message key from producer: {}'.format(msg.key().decode('utf-8'))) + # print('Received message val from producer: {}'.format(msg.value().decode('utf-8'))) + # print("mes-key-type:: {}".format(type(msg.key().decode('utf-8')))) + # print("msg-value-type:: {}".format(type(msg.value().decode('utf-8')))) + + self.processMessage(msg.key(), msg.value()) + self.consumer.close() \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/__init__.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/consumer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py new file mode 100755 index 00000000..bf62f50f --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +# from .consumer.CustomKafkaConsumer import CustomKafkaConsumer +# from .producer.CustomKafkaProducer import CustomKafkaProducer + +import sys +import os, threading +import traceback +import json +import concurrent.futures +import logging + +from consumer import CustomKafkaConsumer +from producer import CustomKafkaProducer + +logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S') + +def main(): + #Begin: Sample producer based on file + customKafkaProducer = CustomKafkaProducer.CustomKafkaProducer() + with open("./multithreading-metrics.json") as input_file: + for each_line in input_file: + python_obj = json.loads(each_line) + # print(python_obj["labels"]["__name__"]) + customKafkaProducer.produce(each_line, python_obj["labels"]["__name__"]) + #END: Sample producer based on file + + customKafkaConsumer = CustomKafkaConsumer.CustomKafkaConsumer() + + #Form a data structure for query formation + queries = [] + queries.append({"metric_name" : "go_gc_duration_seconds_count", "ip": "10.42.1.93:8686"}) + queries.append({"metric_name" : 'go_gc_duration_seconds_count', "ip": "10.42.1.92:8686"}) + + executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + executor.submit(customKafkaConsumer.consume) + + while(True): + for each_record in queries: + list_of_records = customKafkaConsumer.executeQuery(each_record["metric_name"], each_record["ip"]) + logging.info("The records collected :: {}".format(list_of_records)) + logging.info("The length of records collected: {}".format(len(list_of_records))) + print("The records :: {}".format(list_of_records)) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py new file mode 100644 index 00000000..8f726bd9 --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py @@ -0,0 +1,38 @@ +import logging +from confluent_kafka import Producer +import traceback + +logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S') + + +class CustomKafkaProducer: + def __init__(self): + self.topic_name = "metrics3" + #self.topic_name = "adatopic1" + conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092' + } + self.producer = Producer(**conf) + + + def produce(self, kafka_msg, kafka_key): + try: + self.producer.produce(topic=self.topic_name, + value=kafka_msg, + key=kafka_key, + callback=lambda err, msg: self.on_delivery(err, msg) + ) + self.producer.flush() + + except Exception as e: + #print("Error during producing to kafka topic. Stacktrace is %s",e) + logging.error("Error during producing to kafka topic.") + traceback.print_exc() + + + def on_delivery(self, err, msg): + if err: + print("Message failed delivery, error: %s", err) + logging.error('%s raised an error', err) + else: + logging.info("Message delivered to %s on partition %s", + msg.topic(), msg.partition()) \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/requirements.txt b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/requirements.txt new file mode 100644 index 00000000..78cdc973 --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/requirements.txt @@ -0,0 +1,2 @@ +confluent-kafka +python-dateutil \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/utils/utils.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/utils/utils.py new file mode 100644 index 00000000..4ed3b47b --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/utils/utils.py @@ -0,0 +1,8 @@ +class utils: + + def __init__(self): + pass + + def readFile(self, fileName): + pass + -- cgit 1.2.3-korg