aboutsummaryrefslogtreecommitdiffstats
path: root/tests/dcaegen2-collectors-hv-ves/testcases/libraries/KafkaLibrary.py
blob: 5c9ffb1ca7eac0cf9cec165132a175e4a2fa14ab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# ============LICENSE_START====================================
# csit-dcaegen2-collectors-hv-ves
# =========================================================
# Copyright (C) 2019 Nokia. All rights reserved.
# =========================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=====================================

import docker
import os
from robot.api import logger

KAFKA_IMAGE_FULL_NAME = os.getenv("KAFKA_IMAGE_FULL_NAME")
KAFKA_ADDRESS = "kafka:9092"
ZOOKEEPER_ADDRESS = "zookeeper:2181"

LIST_TOPICS_COMMAND = "kafka-topics.sh --list --zookeeper %s" % ZOOKEEPER_ADDRESS
TOPIC_STATUS_COMMAND = "kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list " + KAFKA_ADDRESS + " --topic %s --time -1"
DELETE_TOPIC_COMMAND = "kafka-topics.sh --zookeeper " + ZOOKEEPER_ADDRESS + " --delete --topic %s"


class KafkaLibrary:

    def log_kafka_status(self):
        dockerClient = docker.from_env()
        kafka = dockerClient.containers.list(filters={"ancestor": KAFKA_IMAGE_FULL_NAME}, all=True)[0]

        topics = self.get_topics(kafka)
        logger.info("Topics initialized in Kafka cluster: " + str(topics))
        for topic in topics:
            if topic == "__consumer_offsets":
                # kafka-internal topic, ignore it
                continue

            self.log_topic_status(kafka, topic)
            self.reset_topic(kafka, topic)

        dockerClient.close()

    def get_topics(self, kafka):
        exitCode, output = kafka.exec_run(LIST_TOPICS_COMMAND)
        return output.splitlines()

    def log_topic_status(self, kafka, topic):
        _, topic_status = kafka.exec_run(TOPIC_STATUS_COMMAND % topic)
        logger.info("Messages on topic: " + str(topic_status))

    def reset_topic(self, kafka, topic):
        logger.info("Removing topic " + str(
            topic) + " (note that it will be recreated by dcae-app-simulator/hv-ves-collector, however the offset will be reseted)")
        _, output = kafka.exec_run(DELETE_TOPIC_COMMAND % topic)
        logger.info(str(output))