aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorraviteja.karumuri <raviteja.karumuri@est.tech>2023-06-01 10:05:44 +0100
committerraviteja.karumuri <raviteja.karumuri@est.tech>2023-06-01 10:28:02 +0100
commitec3a1d19456b034593b2a365c044a3904c32e98d (patch)
tree818ffcaf535117dd29c06f5514d806286aa2f48e
parent53ae115e95876f8b51a142574cbf308f42cccbc6 (diff)
NCMP : forward bulk response messages to client topic
# Fixing the NullPointer Exception if the 'eventType' header not availabe. # eventType header is a mandatory header but still we are doing null check for supporting old events (AsyncResponseEvent is not moved to separate kafka headers) Issue-ID: CPS-1557 Signed-off-by: raviteja.karumuri <raviteja.karumuri@est.tech> Change-Id: Ie7923d0e2674402fa36cb28cde966575b899cedb
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java13
1 files changed, 9 insertions, 4 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
index 088e96564..2c7659949 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
@@ -21,6 +21,7 @@
package org.onap.cps.ncmp.api.impl.async;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kafka.common.header.Header;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -41,10 +42,14 @@ public class BatchRecordFilterStrategy {
@Bean
public RecordFilterStrategy<Object, Object> filterBatchDataResponseEvent() {
return consumedRecord -> {
- final String headerValue = SerializationUtils
- .deserialize(consumedRecord.headers().lastHeader("eventType").value());
- return !(headerValue != null
- && headerValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
+ final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
+ if (eventTypeHeader != null) {
+ final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
+ return !(eventTypeHeaderValue != null
+ && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
+ } else {
+ return true;
+ }
};
}
}