From 3db2da4634eb2f0b64042ce040210616327e6681 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Fri, 25 Oct 2024 09:55:22 +0100 Subject: Verify LCM events during registration - verifying the LCM state transition to ADVISED and then to READY state during cm handle registration - enhanced the base kafka test container to be thread safe - changed the auto offset reset policy for integration test to latest (default) from earliest - added retry mechanism to poll for the records Issue-ID: CPS-2468 Change-Id: Iabe603e1f5dd985899f04f5ace5d082acef7567a Signed-off-by: mpriyank --- .../onap/cps/integration/KafkaTestContainer.java | 25 ++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) (limited to 'integration-test/src/test/java/org') diff --git a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java index d41f752912..ff4aec4175 100644 --- a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java +++ b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java @@ -21,6 +21,7 @@ package org.onap.cps.integration; import java.util.HashMap; import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -33,11 +34,12 @@ import org.testcontainers.utility.DockerImageName; * This ensures only one instance of Kafka container across the integration tests. * Avoid unnecessary resource and time consumption. */ +@Slf4j public class KafkaTestContainer extends KafkaContainer { private static final String IMAGE_NAME_AND_VERSION = "registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1"; - private static KafkaTestContainer kafkaTestContainer; + private static volatile KafkaTestContainer kafkaTestContainer; private KafkaTestContainer() { super(DockerImageName.parse(IMAGE_NAME_AND_VERSION).asCompatibleSubstituteFor("confluentinc/cp-kafka")); @@ -51,8 +53,15 @@ public class KafkaTestContainer extends KafkaContainer { */ public static KafkaTestContainer getInstance() { if (kafkaTestContainer == null) { - kafkaTestContainer = new KafkaTestContainer(); - Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::close)); + synchronized (KafkaTestContainer.class) { + if (kafkaTestContainer == null) { + kafkaTestContainer = new KafkaTestContainer(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutting down KafkaTestContainer..."); + kafkaTestContainer.stop(); + })); + } + } } return kafkaTestContainer; } @@ -63,8 +72,11 @@ public class KafkaTestContainer extends KafkaContainer { @Override public void start() { - super.start(); - System.setProperty("spring.kafka.properties.bootstrap.servers", kafkaTestContainer.getBootstrapServers()); + if (!isRunning()) { + super.start(); + System.setProperty("spring.kafka.properties.bootstrap.servers", getBootstrapServers()); + log.info("KafkaTestContainer started at {}", getBootstrapServers()); + } } @Override @@ -78,8 +90,9 @@ public class KafkaTestContainer extends KafkaContainer { configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers()); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); - configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.MAX_VALUE); return configProps; } -- cgit