aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java8
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java22
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy11
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy16
5 files changed, 41 insertions, 18 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
index 9e90eabbc4..2d1f64802b 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
@@ -46,7 +46,8 @@ public class CmAvcEventConsumer {
private final EventsPublisher<CloudEvent> eventsPublisher;
/**
- * Incoming AvcEvent in the form of Consumer Record.
+ * Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic.
+ * The key from incoming record will be used as key for the target topic as well to preserve the message ordering.
*
* @param cmAvcEventAsConsumerRecord Incoming raw consumer record
*/
@@ -55,7 +56,8 @@ public class CmAvcEventConsumer {
public void consumeAndForward(
final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
- log.debug("Consuming AVC event {} ...", outgoingAvcEvent);
- eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEvent.getId(), outgoingAvcEvent);
+ final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
+ log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
+ eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java
index 70d08dccdc..e13d3c2328 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java
@@ -32,6 +32,7 @@ import org.onap.cps.ncmp.api.datajobs.models.DmiWriteOperation;
import org.onap.cps.ncmp.api.datajobs.models.ProducerKey;
import org.onap.cps.ncmp.api.datajobs.models.WriteOperation;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
+import org.onap.cps.ncmp.impl.models.RequiredDmiService;
import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher;
import org.onap.cps.ncmp.impl.utils.YangDataConverter;
import org.onap.cps.spi.model.DataNode;
@@ -69,9 +70,11 @@ public class WriteRequestExaminer {
final DataNode dataNode = alternateIdMatcher
.getCmHandleDataNodeByLongestMatchingAlternateId(writeOperation.path(), PATH_SEPARATOR);
- final DmiWriteOperation dmiWriteOperation = createDmiWriteOperation(writeOperation, dataNode);
+ final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(dataNode);
+
+ final DmiWriteOperation dmiWriteOperation = createDmiWriteOperation(writeOperation, yangModelCmHandle);
- final ProducerKey producerKey = createProducerKey(dataNode);
+ final ProducerKey producerKey = createProducerKey(yangModelCmHandle);
final List<DmiWriteOperation> dmiWriteOperations;
if (dmiWriteOperationsPerProducerKey.containsKey(producerKey)) {
dmiWriteOperations = dmiWriteOperationsPerProducerKey.get(producerKey);
@@ -82,24 +85,23 @@ public class WriteRequestExaminer {
dmiWriteOperations.add(dmiWriteOperation);
}
- private ProducerKey createProducerKey(final DataNode dataNode) {
- return new ProducerKey((String) dataNode.getLeaves().get("dmi-service-name"),
- (String) dataNode.getLeaves().get("data-producer-identifier"));
+ private ProducerKey createProducerKey(final YangModelCmHandle yangModelCmHandle) {
+ return new ProducerKey(yangModelCmHandle.resolveDmiServiceName(RequiredDmiService.DATA),
+ yangModelCmHandle.getDataProducerIdentifier());
}
private DmiWriteOperation createDmiWriteOperation(final WriteOperation writeOperation,
- final DataNode dataNode) {
+ final YangModelCmHandle yangModelCmHandle) {
return new DmiWriteOperation(
writeOperation.path(),
writeOperation.op(),
- (String) dataNode.getLeaves().get("module-set-tag"),
+ yangModelCmHandle.getModuleSetTag(),
writeOperation.value(),
writeOperation.operationId(),
- getPrivatePropertiesFromDataNode(dataNode));
+ getPrivatePropertiesFromDataNode(yangModelCmHandle));
}
- private Map<String, String> getPrivatePropertiesFromDataNode(final DataNode dataNode) {
- final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(dataNode);
+ private Map<String, String> getPrivatePropertiesFromDataNode(final YangModelCmHandle yangModelCmHandle) {
final Map<String, String> cmHandleDmiProperties = new LinkedHashMap<>();
yangModelCmHandle.getDmiProperties()
.forEach(dmiProperty -> cmHandleDmiProperties.put(dmiProperty.getName(), dmiProperty.getValue()));
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java
index 8ae942eb7b..eefabd1079 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/http/WebClientConfiguration.java
@@ -57,7 +57,7 @@ public class WebClientConfiguration {
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serviceConfig.getConnectionTimeoutInSeconds() * 1000)
.doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(
serviceConfig.getReadTimeoutInSeconds(), TimeUnit.SECONDS)).addHandlerLast(
- new WriteTimeoutHandler(serviceConfig.getWriteTimeoutInSeconds(), TimeUnit.SECONDS)))
+ new WriteTimeoutHandler(serviceConfig.getWriteTimeoutInSeconds(), TimeUnit.SECONDS)))
.resolver(DefaultAddressResolverGroup.INSTANCE)
.compress(true);
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
index 06651be913..ad5f42ed94 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
@@ -64,6 +64,7 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec {
cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
and: 'an event is sent'
def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
+ def testEventKey = 'sample-eventid-key'
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
def testCloudEventSent = CloudEventBuilder.v1()
.withData(jsonObjectMapper.asJsonBytes(testEventSent))
@@ -72,17 +73,19 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec {
.withSource(URI.create('sample-test-source'))
.withExtension('correlationid', 'test-cmhandle1').build()
and: 'event has header information'
- def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
- when: 'the event is consumed'
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent)
+ when: 'the event is consumed and forwarded to target topic'
acvEventConsumer.consumeAndForward(consumerRecord)
- and: 'the topic is polled'
+ and: 'the target topic is polled'
def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
- and: 'record can be converted to AVC event'
+ and: 'target record can be converted to AVC event'
def record = records.iterator().next()
def cloudEvent = record.value() as CloudEvent
def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class)
+ and: 'the target event has the same key as the source event to maintain the ordering in a partition'
+ assert record.key() == consumerRecord.key()
and: 'we have correct headers forwarded where correlation id matches'
assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
and: 'event id is same between consumed and forwarded'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy
index 84eb78b751..47b57669ca 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy
@@ -3,6 +3,8 @@ package org.onap.cps.ncmp.impl.datajobs
import org.onap.cps.ncmp.api.datajobs.models.DataJobWriteRequest
import org.onap.cps.ncmp.api.datajobs.models.WriteOperation
+import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
+import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher
import org.onap.cps.spi.model.DataNode
import spock.lang.Specification
@@ -60,4 +62,18 @@ class WriteRequestExaminerSpec extends Specification {
then: 'we get the operation ids in the expected order.'
assert dmiWriteOperations.operationId == ['1', '2', '3']
}
+
+ def 'Validate the creation of a ProducerKey with correct dmiservicename.'() {
+ given: 'yangModelCmHandles with service name: "#dmiServiceName" and data service name: "#dataServiceName"'
+ def yangModelCmHandle = YangModelCmHandle.toYangModelCmHandle(dmiServiceName, dataServiceName, '', new NcmpServiceCmHandle(cmHandleId: 'cm-handle-id-1'), '', '', 'dpi1')
+ when: 'the ProducerKey is created'
+ def result = objectUnderTest.createProducerKey(yangModelCmHandle).toString()
+ then: 'we get the ProducerKey with the correct service name'
+ assert result == expectedProducerKey
+ where: 'the following services are registered'
+ dmiServiceName | dataServiceName || expectedProducerKey
+ 'dmi-service-name' | '' || 'dmi-service-name#dpi1'
+ '' | 'dmi-data-service-name' || 'dmi-data-service-name#dpi1'
+ 'dmi-service-name' | 'dmi-data-service-name' || 'dmi-service-name#dpi1'
+ }
}