diff options
author | mpriyank <priyank.maheshwari@est.tech> | 2024-10-25 09:55:22 +0100 |
---|---|---|
committer | mpriyank <priyank.maheshwari@est.tech> | 2024-11-04 13:56:52 +0000 |
commit | 3db2da4634eb2f0b64042ce040210616327e6681 (patch) | |
tree | 1cac36960002c6883af99d859c2341b19aa1b563 | |
parent | 45e00010bef158d55aacb99756f09869777908bb (diff) |
Verify LCM events during registration
- verifying the LCM state transition to ADVISED and then to READY state
during cm handle registration
- enhanced the base kafka test container to be thread safe
- changed the auto offset reset policy for integration test to latest
(default) from earliest
- added retry mechanism to poll for the records
Issue-ID: CPS-2468
Change-Id: Iabe603e1f5dd985899f04f5ace5d082acef7567a
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
2 files changed, 65 insertions, 31 deletions
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy index 19b10a3c79..00ce38fa2d 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy @@ -20,7 +20,7 @@ package org.onap.cps.integration.functional.ncmp -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.integration.KafkaTestContainer import org.onap.cps.integration.base.CpsIntegrationSpecBase @@ -32,7 +32,6 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle import org.onap.cps.ncmp.events.lcm.v1.LcmEvent import org.onap.cps.ncmp.impl.inventory.models.CmHandleState import org.onap.cps.ncmp.impl.inventory.models.LockReasonCategory -import spock.lang.Ignore import spock.util.concurrent.PollingConditions import java.time.Duration @@ -42,21 +41,23 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { NetworkCmProxyInventoryFacade objectUnderTest def uniqueId = 'ch-unique-id-for-create-test' - def kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class) + static KafkaConsumer kafkaConsumer def setup() { objectUnderTest = networkCmProxyInventoryFacade + subscribeAndClearPreviousMessages() } - @Ignore - def 'CM Handle registration is successful.'() { + def cleanupSpec() { + kafkaConsumer.unsubscribe() + kafkaConsumer.close() + } + + def 'CM Handle registration.'() { given: 'DMI will return modules when requested' dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2'] dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2'] - and: 'consumer subscribed to topic' - kafkaConsumer.subscribe(['ncmp-events']) - when: 'a CM-handle is registered for creation' def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId) def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]) @@ -68,32 +69,33 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { and: 'CM-handle is initially in ADVISED state' assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState - and: 'the module sync watchdog is triggered' + then: 'the module sync watchdog is triggered' moduleSyncWatchdog.moduleSyncAdvisedCmHandles() - and: 'CM-handle goes to READY state after module sync' + then: 'CM-handle goes to READY state after module sync' new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> { assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState }) - and: 'the messages is polled' - def message = kafkaConsumer.poll(Duration.ofMillis(10000)) - def records = message.records(new TopicPartition('ncmp-events', 0)) - - and: 'the newest lcm event notification is received with READY state' - def notificationMessage = jsonObjectMapper.convertJsonString(records.last().value().toString(), LcmEvent) - /*TODO (Toine) This test was failing intermittently (when running as part of suite). - I suspect that it often gave false positives as the message being assert here was any random message created by previous tests - By checking the cm-handle and using an unique cm-handle in this test this flaw became obvious. - I have now ignored this test as it is out of scope of this commit to fix it. - Created: https://lf-onap.atlassian.net/browse/CPS-2468 to fix this instead - */ - assert notificationMessage.event.cmHandleId == uniqueId - assert notificationMessage.event.newValues.cmHandleState.value() == 'READY' - and: 'the CM-handle has expected modules' assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort() + then: 'get the latest messages' + def consumerRecords = getLatestConsumerRecords() + + and: 'both converted messages are for the correct cm handle' + def notificationMessages = [] + for (def consumerRecord : consumerRecords) { + notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent)) + } + assert notificationMessages.event.cmHandleId == [ uniqueId, uniqueId ] + + and: 'the oldest event is about the update to ADVISED state' + notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED' + + and: 'the next event is about update to READY state' + notificationMessages[1].event.newValues.cmHandleState.value() == 'READY' + cleanup: 'deregister CM handle' deregisterCmHandle(DMI1_URL, uniqueId) } @@ -224,4 +226,23 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { cleanup: 'deregister CM handles' deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2']) } + + def subscribeAndClearPreviousMessages() { + kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class) + kafkaConsumer.subscribe(['ncmp-events']) + kafkaConsumer.poll(Duration.ofMillis(500)) + } + + def getLatestConsumerRecords() { + def consumerRecords = [] + def retryAttempts = 10 + while (consumerRecords.size() < 2) { + retryAttempts-- + consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100))) + if (retryAttempts == 0) + break + } + consumerRecords + } + } diff --git a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java index d41f752912..ff4aec4175 100644 --- a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java +++ b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java @@ -21,6 +21,7 @@ package org.onap.cps.integration; import java.util.HashMap; import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -33,11 +34,12 @@ import org.testcontainers.utility.DockerImageName; * This ensures only one instance of Kafka container across the integration tests. * Avoid unnecessary resource and time consumption. */ +@Slf4j public class KafkaTestContainer extends KafkaContainer { private static final String IMAGE_NAME_AND_VERSION = "registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1"; - private static KafkaTestContainer kafkaTestContainer; + private static volatile KafkaTestContainer kafkaTestContainer; private KafkaTestContainer() { super(DockerImageName.parse(IMAGE_NAME_AND_VERSION).asCompatibleSubstituteFor("confluentinc/cp-kafka")); @@ -51,8 +53,15 @@ public class KafkaTestContainer extends KafkaContainer { */ public static KafkaTestContainer getInstance() { if (kafkaTestContainer == null) { - kafkaTestContainer = new KafkaTestContainer(); - Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::close)); + synchronized (KafkaTestContainer.class) { + if (kafkaTestContainer == null) { + kafkaTestContainer = new KafkaTestContainer(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutting down KafkaTestContainer..."); + kafkaTestContainer.stop(); + })); + } + } } return kafkaTestContainer; } @@ -63,8 +72,11 @@ public class KafkaTestContainer extends KafkaContainer { @Override public void start() { - super.start(); - System.setProperty("spring.kafka.properties.bootstrap.servers", kafkaTestContainer.getBootstrapServers()); + if (!isRunning()) { + super.start(); + System.setProperty("spring.kafka.properties.bootstrap.servers", getBootstrapServers()); + log.info("KafkaTestContainer started at {}", getBootstrapServers()); + } } @Override @@ -78,8 +90,9 @@ public class KafkaTestContainer extends KafkaContainer { configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers()); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); - configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.MAX_VALUE); return configProps; } |