aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main
diff options
context:
space:
mode:
authorToine Siebelink <toine.siebelink@est.tech>2023-06-20 08:36:39 +0000
committerGerrit Code Review <gerrit@onap.org>2023-06-20 08:36:39 +0000
commit7eae3fd589942c856f365600820aed18d104a98c (patch)
tree245d011e231be8aa1a30c6764652178989a1b94b /cps-ncmp-service/src/main
parentcbf4044e840ea9473cd42f0a47c53dcafee8ba94 (diff)
parentf4c3f0fcebec726ea74b44f9bca3b68e66176671 (diff)
Merge "Patch # 1: Data operation response event (NCMP → Client App) to comply with CloudEvents"
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java)10
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java)31
2 files changed, 21 insertions, 20 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
index b343d70a7..9e2b66a2c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
@@ -22,17 +22,17 @@ package org.onap.cps.ncmp.api.impl.async;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
/**
- * Batch Record filter strategy, which helps to filter the consumer records.
+ * Data operation record filter strategy, which helps to filter the consumer records.
*
*/
@Configuration
-public class BatchRecordFilterStrategy {
+public class DataOperationRecordFilterStrategy {
/**
* Filtering the consumer records based on the eventType header, It
@@ -41,7 +41,7 @@ public class BatchRecordFilterStrategy {
* @return boolean value.
*/
@Bean
- public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() {
+ public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
return consumedRecord -> {
final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
if (eventTypeHeader == null) {
@@ -49,7 +49,7 @@ public class BatchRecordFilterStrategy {
}
final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
return !(eventTypeHeaderValue != null
- && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
+ && eventTypeHeaderValue.contains("DataOperationEvent"));
};
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
index 2a332d003..995a4d5a6 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
@@ -25,40 +25,41 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
- * Listener for cps-ncmp async batch events.
+ * Listener for cps-ncmp async data operation events.
*/
@Component
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class NcmpAsyncBatchEventConsumer {
+public class NcmpAsyncDataOperationEventConsumer {
- private final EventsPublisher<BatchDataResponseEventV1> eventsPublisher;
+ private final EventsPublisher<DataOperationEvent> eventsPublisher;
/**
- * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic'
+ * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
* and publish the same to the client specified topic.
*
- * @param batchEventConsumerRecord consuming event as a ConsumerRecord.
+ * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord.
*/
@KafkaListener(
topics = "${app.ncmp.async-m2m.topic}",
- filter = "filterBatchDataResponseEvent",
- groupId = "ncmp-batch-event-group",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"})
- public void consumeAndPublish(final ConsumerRecord<String, BatchDataResponseEventV1> batchEventConsumerRecord) {
- log.info("Consuming event payload {} ...", batchEventConsumerRecord.value());
+ filter = "includeDataOperationEventsOnly",
+ groupId = "ncmp-data-operation-event-group",
+ properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
+ public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
+ dataOperationEventConsumerRecord) {
+ log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
final String eventTarget = SerializationUtils
- .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value());
+ .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
final String eventId = SerializationUtils
- .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value());
- eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(),
- batchEventConsumerRecord.value());
+ .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
+ eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
+ dataOperationEventConsumerRecord.value());
}
}