diff options
author | raviteja.karumuri <raviteja.karumuri@est.tech> | 2023-06-02 10:45:13 +0100 |
---|---|---|
committer | raviteja.karumuri <raviteja.karumuri@est.tech> | 2023-06-06 14:01:08 +0100 |
commit | 9438a53cf6e15b0d49fa40cb85dc978dc9fb7f19 (patch) | |
tree | 722b76246ca5e3a3d9432a3edccf13817ef3458e | |
parent | ec3a1d19456b034593b2a365c044a3904c32e98d (diff) |
NCMP : forward bulk response messages to client topic
# Fixing the avc subscription event is not consuming even there is a record published on to the topic.
Issue-ID: CPS-1557
Signed-off-by: raviteja.karumuri <raviteja.karumuri@est.tech>
Change-Id: If09fd1849f467785141cc56639839ddda9f2c0de
2 files changed, 8 insertions, 8 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java index 2c7659949c..b343d70a7a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.async; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.header.Header; +import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; @@ -40,16 +41,15 @@ public class BatchRecordFilterStrategy { * @return boolean value. */ @Bean - public RecordFilterStrategy<Object, Object> filterBatchDataResponseEvent() { + public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() { return consumedRecord -> { final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); - if (eventTypeHeader != null) { - final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); - return !(eventTypeHeaderValue != null - && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); - } else { - return true; + if (eventTypeHeader == null) { + return false; } + final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); + return !(eventTypeHeaderValue != null + && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); }; } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy index 65c43a011d..28464bb91c 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy @@ -55,7 +55,7 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { JsonObjectMapper jsonObjectMapper @Autowired - RecordFilterStrategy<Object, Object> recordFilterStrategy + RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test')) def static clientTopic = 'client-topic' |