aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorraviteja.karumuri <raviteja.karumuri@est.tech>2023-06-02 10:45:13 +0100
committerraviteja.karumuri <raviteja.karumuri@est.tech>2023-06-06 14:01:08 +0100
commit9438a53cf6e15b0d49fa40cb85dc978dc9fb7f19 (patch)
tree722b76246ca5e3a3d9432a3edccf13817ef3458e
parentec3a1d19456b034593b2a365c044a3904c32e98d (diff)
NCMP : forward bulk response messages to client topic
# Fixing the avc subscription event is not consuming even there is a record published on to the topic. Issue-ID: CPS-1557 Signed-off-by: raviteja.karumuri <raviteja.karumuri@est.tech> Change-Id: If09fd1849f467785141cc56639839ddda9f2c0de
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java14
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy2
2 files changed, 8 insertions, 8 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
index 2c7659949c..b343d70a7a 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.async;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.header.Header;
+import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -40,16 +41,15 @@ public class BatchRecordFilterStrategy {
* @return boolean value.
*/
@Bean
- public RecordFilterStrategy<Object, Object> filterBatchDataResponseEvent() {
+ public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() {
return consumedRecord -> {
final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
- if (eventTypeHeader != null) {
- final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
- return !(eventTypeHeaderValue != null
- && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
- } else {
- return true;
+ if (eventTypeHeader == null) {
+ return false;
}
+ final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
+ return !(eventTypeHeaderValue != null
+ && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
};
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
index 65c43a011d..28464bb91c 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
@@ -55,7 +55,7 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
JsonObjectMapper jsonObjectMapper
@Autowired
- RecordFilterStrategy<Object, Object> recordFilterStrategy
+ RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy
def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
def static clientTopic = 'client-topic'