summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java17
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java20
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java2
3 files changed, 16 insertions, 23 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
index 9e2b66a2c1..ce666b1099 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
@@ -20,9 +20,8 @@
package org.onap.cps.ncmp.api.impl.async;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -41,15 +40,11 @@ public class DataOperationRecordFilterStrategy {
* @return boolean value.
*/
@Bean
- public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
+ public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() {
return consumedRecord -> {
- final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
- if (eventTypeHeader == null) {
- return false;
- }
- final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
- return !(eventTypeHeaderValue != null
- && eventTypeHeaderValue.contains("DataOperationEvent"));
+ final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(
+ consumedRecord.headers(), "ce_type");
+ return !(eventTypeHeaderValue.contains("DataOperationEvent"));
};
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
index 995a4d5a67..4a0ec5c493 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
@@ -20,12 +20,12 @@
package org.onap.cps.ncmp.api.impl.async;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@@ -39,7 +39,7 @@ import org.springframework.stereotype.Component;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class NcmpAsyncDataOperationEventConsumer {
- private final EventsPublisher<DataOperationEvent> eventsPublisher;
+ private final EventsPublisher<CloudEvent> eventsPublisher;
/**
* Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
@@ -52,14 +52,12 @@ public class NcmpAsyncDataOperationEventConsumer {
filter = "includeDataOperationEventsOnly",
groupId = "ncmp-data-operation-event-group",
properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
- public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
- dataOperationEventConsumerRecord) {
+ public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
- final String eventTarget = SerializationUtils
- .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
- final String eventId = SerializationUtils
- .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
- eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
- dataOperationEventConsumerRecord.value());
+ final String eventTarget = KafkaHeaders.getParsedKafkaHeader(
+ dataOperationEventConsumerRecord.headers(), "ce_destination");
+ final String eventId = KafkaHeaders.getParsedKafkaHeader(
+ dataOperationEventConsumerRecord.headers(), "ce_id");
+ eventsPublisher.publishCloudEvent(eventTarget, eventId, dataOperationEventConsumerRecord.value());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index e61e7729be..05c731d946 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -71,7 +71,7 @@ public class EventsPublisher<T> {
* @param event message payload
* @deprecated This method is not needed anymore since the use of headers will be in place.
*/
- @Deprecated
+ @Deprecated(forRemoval = true)
public void publishEvent(final String topicName, final String eventKey, final T event) {
final ListenableFuture<SendResult<String, T>> eventFuture
= legacyKafkaEventTemplate.send(topicName, eventKey, event);