diff options
Diffstat (limited to 'cps-ncmp-service/src')
5 files changed, 41 insertions, 18 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java index 9e90eabbc4..2d1f64802b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java @@ -46,7 +46,8 @@ public class CmAvcEventConsumer { private final EventsPublisher<CloudEvent> eventsPublisher; /** - * Incoming AvcEvent in the form of Consumer Record. + * Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic. + * The key from incoming record will be used as key for the target topic as well to preserve the message ordering. * * @param cmAvcEventAsConsumerRecord Incoming raw consumer record */ @@ -55,7 +56,8 @@ public class CmAvcEventConsumer { public void consumeAndForward( final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) { final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value(); - log.debug("Consuming AVC event {} ...", outgoingAvcEvent); - eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEvent.getId(), outgoingAvcEvent); + final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key(); + log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent); + eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java index 70d08dccdc..e13d3c2328 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java @@ -32,6 +32,7 @@ import org.onap.cps.ncmp.api.datajobs.models.DmiWriteOperation; import org.onap.cps.ncmp.api.datajobs.models.ProducerKey; import org.onap.cps.ncmp.api.datajobs.models.WriteOperation; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; +import org.onap.cps.ncmp.impl.models.RequiredDmiService; import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher; import org.onap.cps.ncmp.impl.utils.YangDataConverter; import org.onap.cps.spi.model.DataNode; @@ -69,9 +70,11 @@ public class WriteRequestExaminer { final DataNode dataNode = alternateIdMatcher .getCmHandleDataNodeByLongestMatchingAlternateId(writeOperation.path(), PATH_SEPARATOR); - final DmiWriteOperation dmiWriteOperation = createDmiWriteOperation(writeOperation, dataNode); + final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(dataNode); + + final DmiWriteOperation dmiWriteOperation = createDmiWriteOperation(writeOperation, yangModelCmHandle); - final ProducerKey producerKey = createProducerKey(dataNode); + final ProducerKey producerKey = createProducerKey(yangModelCmHandle); final List<DmiWriteOperation> dmiWriteOperations; if (dmiWriteOperationsPerProducerKey.containsKey(producerKey)) { dmiWriteOperations = dmiWriteOperationsPerProducerKey.get(producerKey); @@ -82,24 +85,23 @@ public class WriteRequestExaminer { dmiWriteOperations.add(dmiWriteOperation); } - private ProducerKey createProducerKey(final DataNode dataNode) { - return new ProducerKey((String) dataNode.getLeaves().get("dmi-service-name"), - (String) dataNode.getLeaves().get("data-producer-identifier")); + private ProducerKey createProducerKey(final YangModelCmHandle yangModelCmHandle) { + return new ProducerKey(yangModelCmHandle.resolveDmiServiceName(RequiredDmiService.DATA), + yangModelCmHandle.getDataProducerIdentifier()); } private DmiWriteOperation createDmiWriteOperation(final WriteOperation writeOperation, - final DataNode dataNode) { + final YangModelCmHandle yangModelCmHandle) { return new DmiWriteOperation( writeOperation.path(), writeOperation.op(), - (String) dataNode.getLeaves().get("module-set-tag"), + yangModelCmHandle.getModuleSetTag(), writeOperation.value(), writeOperation.operationId(), - getPrivatePropertiesFromDataNode(dataNode)); + getPrivatePropertiesFromDataNode(yangModelCmHandle)); } - private Map<String, String> getPrivatePropertiesFromDataNode(final DataNode dataNode) { - final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(dataNode); + private Map<String, String> getPrivatePropertiesFromDataNode(final YangModelCmHandle yangModelCmHandle) { final Map<String, String> cmHandleDmiProperties = new LinkedHashMap<>(); yangModelCmHandle.getDmiProperties() .forEach(dmiProperty -> cmHandleDmiProperties.put(dmiProperty.getName(), dmiProperty.getValue())); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java index 8ae942eb7b..eefabd1079 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java @@ -57,7 +57,7 @@ public class WebClientConfiguration { .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serviceConfig.getConnectionTimeoutInSeconds() * 1000) .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler( serviceConfig.getReadTimeoutInSeconds(), TimeUnit.SECONDS)).addHandlerLast( - new WriteTimeoutHandler(serviceConfig.getWriteTimeoutInSeconds(), TimeUnit.SECONDS))) + new WriteTimeoutHandler(serviceConfig.getWriteTimeoutInSeconds(), TimeUnit.SECONDS))) .resolver(DefaultAddressResolverGroup.INSTANCE) .compress(true); } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy index 06651be913..ad5f42ed94 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy @@ -64,6 +64,7 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec { cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>) and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') + def testEventKey = 'sample-eventid-key' def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) def testCloudEventSent = CloudEventBuilder.v1() .withData(jsonObjectMapper.asJsonBytes(testEventSent)) @@ -72,17 +73,19 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec { .withSource(URI.create('sample-test-source')) .withExtension('correlationid', 'test-cmhandle1').build() and: 'event has header information' - def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent) - when: 'the event is consumed' + def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent) + when: 'the event is consumed and forwarded to target topic' acvEventConsumer.consumeAndForward(consumerRecord) - and: 'the topic is polled' + and: 'the target topic is polled' def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 - and: 'record can be converted to AVC event' + and: 'target record can be converted to AVC event' def record = records.iterator().next() def cloudEvent = record.value() as CloudEvent def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class) + and: 'the target event has the same key as the source event to maintain the ordering in a partition' + assert record.key() == consumerRecord.key() and: 'we have correct headers forwarded where correlation id matches' assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id is same between consumed and forwarded' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy index 84eb78b751..47b57669ca 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy @@ -3,6 +3,8 @@ package org.onap.cps.ncmp.impl.datajobs import org.onap.cps.ncmp.api.datajobs.models.DataJobWriteRequest import org.onap.cps.ncmp.api.datajobs.models.WriteOperation +import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle +import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher import org.onap.cps.spi.model.DataNode import spock.lang.Specification @@ -60,4 +62,18 @@ class WriteRequestExaminerSpec extends Specification { then: 'we get the operation ids in the expected order.' assert dmiWriteOperations.operationId == ['1', '2', '3'] } + + def 'Validate the creation of a ProducerKey with correct dmiservicename.'() { + given: 'yangModelCmHandles with service name: "#dmiServiceName" and data service name: "#dataServiceName"' + def yangModelCmHandle = YangModelCmHandle.toYangModelCmHandle(dmiServiceName, dataServiceName, '', new NcmpServiceCmHandle(cmHandleId: 'cm-handle-id-1'), '', '', 'dpi1') + when: 'the ProducerKey is created' + def result = objectUnderTest.createProducerKey(yangModelCmHandle).toString() + then: 'we get the ProducerKey with the correct service name' + assert result == expectedProducerKey + where: 'the following services are registered' + dmiServiceName | dataServiceName || expectedProducerKey + 'dmi-service-name' | '' || 'dmi-service-name#dpi1' + '' | 'dmi-data-service-name' || 'dmi-data-service-name#dpi1' + 'dmi-service-name' | 'dmi-data-service-name' || 'dmi-service-name#dpi1' + } } |