diff options
author | raviteja.karumuri <raviteja.karumuri@est.tech> | 2023-06-01 10:05:44 +0100 |
---|---|---|
committer | raviteja.karumuri <raviteja.karumuri@est.tech> | 2023-06-01 10:28:02 +0100 |
commit | ec3a1d19456b034593b2a365c044a3904c32e98d (patch) | |
tree | 818ffcaf535117dd29c06f5514d806286aa2f48e | |
parent | 53ae115e95876f8b51a142574cbf308f42cccbc6 (diff) |
NCMP : forward bulk response messages to client topic
# Fixing the NullPointer Exception if the 'eventType' header not availabe.
# eventType header is a mandatory header but still we are doing null check for supporting old events (AsyncResponseEvent is not moved to separate kafka headers)
Issue-ID: CPS-1557
Signed-off-by: raviteja.karumuri <raviteja.karumuri@est.tech>
Change-Id: Ie7923d0e2674402fa36cb28cde966575b899cedb
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java index 088e96564c..2c7659949c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java @@ -21,6 +21,7 @@ package org.onap.cps.ncmp.api.impl.async; import org.apache.commons.lang3.SerializationUtils; +import org.apache.kafka.common.header.Header; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; @@ -41,10 +42,14 @@ public class BatchRecordFilterStrategy { @Bean public RecordFilterStrategy<Object, Object> filterBatchDataResponseEvent() { return consumedRecord -> { - final String headerValue = SerializationUtils - .deserialize(consumedRecord.headers().lastHeader("eventType").value()); - return !(headerValue != null - && headerValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); + final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); + if (eventTypeHeader != null) { + final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); + return !(eventTypeHeaderValue != null + && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); + } else { + return true; + } }; } } |