summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main/java')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java9
1 files changed, 7 insertions, 2 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
index ce666b1099..76cc0c4b7b 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.async;
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.impl.KafkaHeaders;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -31,6 +32,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
*
*/
@Configuration
+@Slf4j
public class DataOperationRecordFilterStrategy {
/**
@@ -42,8 +44,11 @@ public class DataOperationRecordFilterStrategy {
@Bean
public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() {
return consumedRecord -> {
- final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(
- consumedRecord.headers(), "ce_type");
+ final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(consumedRecord.headers(), "ce_type");
+ if (eventTypeHeaderValue == null) {
+ log.trace("No ce_type header found, possibly a legacy event (ignored)");
+ return true;
+ }
return !(eventTypeHeaderValue.contains("DataOperationEvent"));
};
}