summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java17
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java35
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java35
3 files changed, 24 insertions, 63 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index 7b28b4cd5f..e61e7729be 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -51,6 +51,19 @@ public class EventsPublisher<T> {
private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
/**
+ * Generic CloudEvent publisher.
+ *
+ * @param topicName valid topic name
+ * @param eventKey message key
+ * @param event message payload
+ */
+ public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
+ final ListenableFuture<SendResult<String, CloudEvent>> eventFuture
+ = cloudEventKafkaTemplate.send(topicName, eventKey, event);
+ eventFuture.addCallback(handleCallback(topicName));
+ }
+
+ /**
* Generic Event publisher.
*
* @param topicName valid topic name
@@ -95,7 +108,7 @@ public class EventsPublisher<T> {
publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
}
- private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) {
+ private ListenableFutureCallback<SendResult<String, ?>> handleCallback(final String topicName) {
return new ListenableFutureCallback<>() {
@Override
public void onFailure(final Throwable throwable) {
@@ -103,7 +116,7 @@ public class EventsPublisher<T> {
}
@Override
- public void onSuccess(final SendResult<String, T> sendResult) {
+ public void onSuccess(final SendResult<String, ?> sendResult) {
log.debug("Successfully published event to topic : {} , Event : {}",
sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value());
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
index f37497abe6..b5ca176d1d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
@@ -20,19 +20,17 @@
package org.onap.cps.ncmp.api.impl.events.avc;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeader;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
-import org.springframework.util.SerializationUtils;
/**
* Listener for AVC events.
@@ -47,34 +45,19 @@ public class AvcEventConsumer {
@Value("${app.ncmp.avc.cm-events-topic}")
private String cmEventsTopicName;
- private final EventsPublisher<AvcEvent> eventsPublisher;
- private final AvcEventMapper avcEventMapper;
-
+ private final EventsPublisher<CloudEvent> eventsPublisher;
/**
* Incoming AvcEvent in the form of Consumer Record.
*
* @param avcEventConsumerRecord Incoming raw consumer record
*/
- @KafkaListener(topics = "${app.dmi.cm-events.topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"})
- public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) {
+ @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+ public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
- final String mutatedEventId = UUID.randomUUID().toString();
- mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId);
- final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value());
- eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(),
- outgoingAvcEvent);
- }
-
- private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
- final String eventId = "eventId";
- final String existingEventId =
- (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value());
- eventHeaders.remove(eventId);
- log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
- mutatedEventId);
- eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId)));
-
+ final String newEventId = UUID.randomUUID().toString();
+ final CloudEvent outgoingAvcEvent =
+ CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+ eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java
deleted file mode 100644
index 8246ed4802..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.avc;
-
-import org.mapstruct.Mapper;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
-
-
-/**
- * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}.
- */
-@Mapper(componentModel = "spring")
-public interface AvcEventMapper {
-
- AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent);
-
-}