summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java)10
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java)31
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java17
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java35
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java35
5 files changed, 45 insertions, 83 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
index b343d70a7a..9e2b66a2c1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
@@ -22,17 +22,17 @@ package org.onap.cps.ncmp.api.impl.async;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
/**
- * Batch Record filter strategy, which helps to filter the consumer records.
+ * Data operation record filter strategy, which helps to filter the consumer records.
*
*/
@Configuration
-public class BatchRecordFilterStrategy {
+public class DataOperationRecordFilterStrategy {
/**
* Filtering the consumer records based on the eventType header, It
@@ -41,7 +41,7 @@ public class BatchRecordFilterStrategy {
* @return boolean value.
*/
@Bean
- public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() {
+ public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
return consumedRecord -> {
final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
if (eventTypeHeader == null) {
@@ -49,7 +49,7 @@ public class BatchRecordFilterStrategy {
}
final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
return !(eventTypeHeaderValue != null
- && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
+ && eventTypeHeaderValue.contains("DataOperationEvent"));
};
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
index 2a332d0037..995a4d5a67 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
@@ -25,40 +25,41 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
- * Listener for cps-ncmp async batch events.
+ * Listener for cps-ncmp async data operation events.
*/
@Component
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class NcmpAsyncBatchEventConsumer {
+public class NcmpAsyncDataOperationEventConsumer {
- private final EventsPublisher<BatchDataResponseEventV1> eventsPublisher;
+ private final EventsPublisher<DataOperationEvent> eventsPublisher;
/**
- * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic'
+ * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
* and publish the same to the client specified topic.
*
- * @param batchEventConsumerRecord consuming event as a ConsumerRecord.
+ * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord.
*/
@KafkaListener(
topics = "${app.ncmp.async-m2m.topic}",
- filter = "filterBatchDataResponseEvent",
- groupId = "ncmp-batch-event-group",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"})
- public void consumeAndPublish(final ConsumerRecord<String, BatchDataResponseEventV1> batchEventConsumerRecord) {
- log.info("Consuming event payload {} ...", batchEventConsumerRecord.value());
+ filter = "includeDataOperationEventsOnly",
+ groupId = "ncmp-data-operation-event-group",
+ properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
+ public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
+ dataOperationEventConsumerRecord) {
+ log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
final String eventTarget = SerializationUtils
- .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value());
+ .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
final String eventId = SerializationUtils
- .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value());
- eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(),
- batchEventConsumerRecord.value());
+ .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
+ eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
+ dataOperationEventConsumerRecord.value());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index 7b28b4cd5f..e61e7729be 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -51,6 +51,19 @@ public class EventsPublisher<T> {
private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
/**
+ * Generic CloudEvent publisher.
+ *
+ * @param topicName valid topic name
+ * @param eventKey message key
+ * @param event message payload
+ */
+ public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
+ final ListenableFuture<SendResult<String, CloudEvent>> eventFuture
+ = cloudEventKafkaTemplate.send(topicName, eventKey, event);
+ eventFuture.addCallback(handleCallback(topicName));
+ }
+
+ /**
* Generic Event publisher.
*
* @param topicName valid topic name
@@ -95,7 +108,7 @@ public class EventsPublisher<T> {
publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
}
- private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) {
+ private ListenableFutureCallback<SendResult<String, ?>> handleCallback(final String topicName) {
return new ListenableFutureCallback<>() {
@Override
public void onFailure(final Throwable throwable) {
@@ -103,7 +116,7 @@ public class EventsPublisher<T> {
}
@Override
- public void onSuccess(final SendResult<String, T> sendResult) {
+ public void onSuccess(final SendResult<String, ?> sendResult) {
log.debug("Successfully published event to topic : {} , Event : {}",
sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value());
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
index f37497abe6..b5ca176d1d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
@@ -20,19 +20,17 @@
package org.onap.cps.ncmp.api.impl.events.avc;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeader;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
-import org.springframework.util.SerializationUtils;
/**
* Listener for AVC events.
@@ -47,34 +45,19 @@ public class AvcEventConsumer {
@Value("${app.ncmp.avc.cm-events-topic}")
private String cmEventsTopicName;
- private final EventsPublisher<AvcEvent> eventsPublisher;
- private final AvcEventMapper avcEventMapper;
-
+ private final EventsPublisher<CloudEvent> eventsPublisher;
/**
* Incoming AvcEvent in the form of Consumer Record.
*
* @param avcEventConsumerRecord Incoming raw consumer record
*/
- @KafkaListener(topics = "${app.dmi.cm-events.topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"})
- public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) {
+ @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+ public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
- final String mutatedEventId = UUID.randomUUID().toString();
- mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId);
- final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value());
- eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(),
- outgoingAvcEvent);
- }
-
- private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
- final String eventId = "eventId";
- final String existingEventId =
- (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value());
- eventHeaders.remove(eventId);
- log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
- mutatedEventId);
- eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId)));
-
+ final String newEventId = UUID.randomUUID().toString();
+ final CloudEvent outgoingAvcEvent =
+ CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+ eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java
deleted file mode 100644
index 8246ed4802..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.avc;
-
-import org.mapstruct.Mapper;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
-
-
-/**
- * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}.
- */
-@Mapper(componentModel = "spring")
-public interface AvcEventMapper {
-
- AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent);
-
-}