diff options
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java) | 10 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java) | 31 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java | 17 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java | 35 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java | 35 |
5 files changed, 45 insertions, 83 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java index b343d70a7a..9e2b66a2c1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java @@ -22,17 +22,17 @@ package org.onap.cps.ncmp.api.impl.async; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.header.Header; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; /** - * Batch Record filter strategy, which helps to filter the consumer records. + * Data operation record filter strategy, which helps to filter the consumer records. * */ @Configuration -public class BatchRecordFilterStrategy { +public class DataOperationRecordFilterStrategy { /** * Filtering the consumer records based on the eventType header, It @@ -41,7 +41,7 @@ public class BatchRecordFilterStrategy { * @return boolean value. */ @Bean - public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() { + public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() { return consumedRecord -> { final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); if (eventTypeHeader == null) { @@ -49,7 +49,7 @@ public class BatchRecordFilterStrategy { } final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); return !(eventTypeHeaderValue != null - && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); + && eventTypeHeaderValue.contains("DataOperationEvent")); }; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java index 2a332d0037..995a4d5a67 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java @@ -25,40 +25,41 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** - * Listener for cps-ncmp async batch events. + * Listener for cps-ncmp async data operation events. */ @Component @Slf4j @RequiredArgsConstructor @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class NcmpAsyncBatchEventConsumer { +public class NcmpAsyncDataOperationEventConsumer { - private final EventsPublisher<BatchDataResponseEventV1> eventsPublisher; + private final EventsPublisher<DataOperationEvent> eventsPublisher; /** - * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic' + * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic' * and publish the same to the client specified topic. * - * @param batchEventConsumerRecord consuming event as a ConsumerRecord. + * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord. */ @KafkaListener( topics = "${app.ncmp.async-m2m.topic}", - filter = "filterBatchDataResponseEvent", - groupId = "ncmp-batch-event-group", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"}) - public void consumeAndPublish(final ConsumerRecord<String, BatchDataResponseEventV1> batchEventConsumerRecord) { - log.info("Consuming event payload {} ...", batchEventConsumerRecord.value()); + filter = "includeDataOperationEventsOnly", + groupId = "ncmp-data-operation-event-group", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"}) + public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent> + dataOperationEventConsumerRecord) { + log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value()); final String eventTarget = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value()); final String eventId = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value()); - eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(), - batchEventConsumerRecord.value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value()); + eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(), + dataOperationEventConsumerRecord.value()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 7b28b4cd5f..e61e7729be 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -51,6 +51,19 @@ public class EventsPublisher<T> { private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate; /** + * Generic CloudEvent publisher. + * + * @param topicName valid topic name + * @param eventKey message key + * @param event message payload + */ + public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { + final ListenableFuture<SendResult<String, CloudEvent>> eventFuture + = cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + + /** * Generic Event publisher. * * @param topicName valid topic name @@ -95,7 +108,7 @@ public class EventsPublisher<T> { publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } - private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) { + private ListenableFutureCallback<SendResult<String, ?>> handleCallback(final String topicName) { return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { @@ -103,7 +116,7 @@ public class EventsPublisher<T> { } @Override - public void onSuccess(final SendResult<String, T> sendResult) { + public void onSuccess(final SendResult<String, ?> sendResult) { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index f37497abe6..b5ca176d1d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,19 +20,17 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,34 +45,19 @@ public class AvcEventConsumer { @Value("${app.ncmp.avc.cm-events-topic}") private String cmEventsTopicName; - private final EventsPublisher<AvcEvent> eventsPublisher; - private final AvcEventMapper avcEventMapper; - + private final EventsPublisher<CloudEvent> eventsPublisher; /** * Incoming AvcEvent in the form of Consumer Record. * * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener(topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) - public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) { + @KafkaListener(topics = "${app.dmi.cm-events.topic}") + public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) { log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); - final String mutatedEventId = UUID.randomUUID().toString(); - mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); - eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), - outgoingAvcEvent); - } - - private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { - final String eventId = "eventId"; - final String existingEventId = - (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value()); - eventHeaders.remove(eventId); - log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, - mutatedEventId); - eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId))); - + final String newEventId = UUID.randomUUID().toString(); + final CloudEvent outgoingAvcEvent = + CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build(); + eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java deleted file mode 100644 index 8246ed4802..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avc; - -import org.mapstruct.Mapper; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; - - -/** - * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}. - */ -@Mapper(componentModel = "spring") -public interface AvcEventMapper { - - AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); - -} |