aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service
diff options
context:
space:
mode:
authormpriyank <priyank.maheshwari@est.tech>2023-06-16 15:55:52 +0100
committermpriyank <priyank.maheshwari@est.tech>2023-06-19 11:54:30 +0100
commit3032261ec743bde6eb136cd2030774c3dc358fa9 (patch)
tree6008f44c0c0274d6f22e653ed31da50f12774356 /cps-ncmp-service
parent08898672e0de04238acdedea4266c58f17c2b7e0 (diff)
DMI Data AVC to cloud events
- DMI Data AVC to be consumed as CloudEvents now - Removed the schema header as it is taken care by cloudevent headers - Implemented naming and packaging comments on the schema - Test cases refactoring Issue-ID: CPS-1719 Change-Id: I9864f90446720fe903fb3c1502d86035d886751d Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
Diffstat (limited to 'cps-ncmp-service')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java17
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java35
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java35
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy53
-rw-r--r--cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json2
5 files changed, 51 insertions, 91 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index 7b28b4cd5f..e61e7729be 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -51,6 +51,19 @@ public class EventsPublisher<T> {
private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
/**
+ * Generic CloudEvent publisher.
+ *
+ * @param topicName valid topic name
+ * @param eventKey message key
+ * @param event message payload
+ */
+ public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
+ final ListenableFuture<SendResult<String, CloudEvent>> eventFuture
+ = cloudEventKafkaTemplate.send(topicName, eventKey, event);
+ eventFuture.addCallback(handleCallback(topicName));
+ }
+
+ /**
* Generic Event publisher.
*
* @param topicName valid topic name
@@ -95,7 +108,7 @@ public class EventsPublisher<T> {
publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
}
- private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) {
+ private ListenableFutureCallback<SendResult<String, ?>> handleCallback(final String topicName) {
return new ListenableFutureCallback<>() {
@Override
public void onFailure(final Throwable throwable) {
@@ -103,7 +116,7 @@ public class EventsPublisher<T> {
}
@Override
- public void onSuccess(final SendResult<String, T> sendResult) {
+ public void onSuccess(final SendResult<String, ?> sendResult) {
log.debug("Successfully published event to topic : {} , Event : {}",
sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value());
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
index f37497abe6..b5ca176d1d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
@@ -20,19 +20,17 @@
package org.onap.cps.ncmp.api.impl.events.avc;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeader;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
-import org.springframework.util.SerializationUtils;
/**
* Listener for AVC events.
@@ -47,34 +45,19 @@ public class AvcEventConsumer {
@Value("${app.ncmp.avc.cm-events-topic}")
private String cmEventsTopicName;
- private final EventsPublisher<AvcEvent> eventsPublisher;
- private final AvcEventMapper avcEventMapper;
-
+ private final EventsPublisher<CloudEvent> eventsPublisher;
/**
* Incoming AvcEvent in the form of Consumer Record.
*
* @param avcEventConsumerRecord Incoming raw consumer record
*/
- @KafkaListener(topics = "${app.dmi.cm-events.topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"})
- public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) {
+ @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+ public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
- final String mutatedEventId = UUID.randomUUID().toString();
- mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId);
- final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value());
- eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(),
- outgoingAvcEvent);
- }
-
- private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
- final String eventId = "eventId";
- final String existingEventId =
- (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value());
- eventHeaders.remove(eventId);
- log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
- mutatedEventId);
- eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId)));
-
+ final String newEventId = UUID.randomUUID().toString();
+ final CloudEvent outgoingAvcEvent =
+ CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+ eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java
deleted file mode 100644
index 8246ed4802..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.avc;
-
-import org.mapstruct.Mapper;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
-
-
-/**
- * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}.
- */
-@Mapper(componentModel = "spring")
-public interface AvcEventMapper {
-
- AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent);
-
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
index 3dffac714b..4a9e3ee811 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
@@ -21,21 +21,23 @@
package org.onap.cps.ncmp.api.impl.events.avc
import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.jackson.PojoCloudEventDataMapper
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.mapstruct.factory.Mappers
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
-import org.springframework.util.SerializationUtils
import org.testcontainers.spock.Testcontainers
import java.time.Duration
@@ -46,52 +48,49 @@ import java.time.Duration
class AvcEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class)
+ EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
- EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
-
- @SpringBean
- AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper)
+ AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher)
@Autowired
JsonObjectMapper jsonObjectMapper
- def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
+ @Autowired
+ ObjectMapper objectMapper
+
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer))
def 'Consume and forward valid message'() {
given: 'consumer has a subscription on a topic'
def cmEventsTopicName = 'cm-events'
acvEventConsumer.cmEventsTopicName = cmEventsTopicName
- legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
+ cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
and: 'an event is sent'
def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
+ def testCloudEventSent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(testEventSent))
+ .withId('sample-eventid')
+ .withType('sample-test-type')
+ .withSource(URI.create('sample-test-source'))
+ .withExtension('correlationid', 'test-cmhandle1').build()
and: 'event has header information'
- def consumerRecord = new ConsumerRecord<String,AvcEvent>(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent)
- consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid')))
- consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1')))
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
when: 'the event is consumed'
acvEventConsumer.consumeAndForward(consumerRecord)
and: 'the topic is polled'
- def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
+ def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
and: 'record can be converted to AVC event'
def record = records.iterator().next()
- def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class)
+ def cloudevent = record.value() as CloudEvent
+ def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
and: 'we have correct headers forwarded where correlation id matches'
- record.headers().forEach(header -> {
- if (header.key().equals('eventCorrelationId')) {
- assert SerializationUtils.deserialize(header.value()) == 'cmhandle1'
- }
- })
+ assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
and: 'event id differs(as per requirement) between consumed and forwarded'
- record.headers().forEach(header -> {
- if (header.key().equals('eventId')) {
- assert SerializationUtils.deserialize(header.value()) != 'sample-eventid'
- }
- })
+ assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid'
and: 'the event payload still matches'
assert testEventSent == convertedAvcEvent
}
diff --git a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json
index 569343fed9..5b297c86c2 100644
--- a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json
+++ b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json
@@ -1,5 +1,5 @@
{
- "event":{
+ "data":{
"push-change-update":{
"datastore-changes":{
"ietf-yang-patch:yang-patch":{