diff options
Diffstat (limited to 'integration-test/src/test/java/org')
-rw-r--r-- | integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java index d41f752912..ff4aec4175 100644 --- a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java +++ b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java @@ -21,6 +21,7 @@ package org.onap.cps.integration; import java.util.HashMap; import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -33,11 +34,12 @@ import org.testcontainers.utility.DockerImageName; * This ensures only one instance of Kafka container across the integration tests. * Avoid unnecessary resource and time consumption. */ +@Slf4j public class KafkaTestContainer extends KafkaContainer { private static final String IMAGE_NAME_AND_VERSION = "registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1"; - private static KafkaTestContainer kafkaTestContainer; + private static volatile KafkaTestContainer kafkaTestContainer; private KafkaTestContainer() { super(DockerImageName.parse(IMAGE_NAME_AND_VERSION).asCompatibleSubstituteFor("confluentinc/cp-kafka")); @@ -51,8 +53,15 @@ public class KafkaTestContainer extends KafkaContainer { */ public static KafkaTestContainer getInstance() { if (kafkaTestContainer == null) { - kafkaTestContainer = new KafkaTestContainer(); - Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::close)); + synchronized (KafkaTestContainer.class) { + if (kafkaTestContainer == null) { + kafkaTestContainer = new KafkaTestContainer(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutting down KafkaTestContainer..."); + kafkaTestContainer.stop(); + })); + } + } } return kafkaTestContainer; } @@ -63,8 +72,11 @@ public class KafkaTestContainer extends KafkaContainer { @Override public void start() { - super.start(); - System.setProperty("spring.kafka.properties.bootstrap.servers", kafkaTestContainer.getBootstrapServers()); + if (!isRunning()) { + super.start(); + System.setProperty("spring.kafka.properties.bootstrap.servers", getBootstrapServers()); + log.info("KafkaTestContainer started at {}", getBootstrapServers()); + } } @Override @@ -78,8 +90,9 @@ public class KafkaTestContainer extends KafkaContainer { configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers()); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); - configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.MAX_VALUE); return configProps; } |