diff options
Diffstat (limited to 'cps-ncmp-service/src/main')
6 files changed, 113 insertions, 15 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 4c84629304..b0b091a2f6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -20,13 +20,16 @@ package org.onap.cps.ncmp.api.impl.events; +import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; +import org.springframework.util.SerializationUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -70,6 +73,20 @@ public class EventsPublisher<T> { eventFuture.addCallback(handleCallback(topicName)); } + /** + * Generic Event Publisher with headers. + * + * @param topicName valid topic name + * @param eventKey message key + * @param eventHeaders map of event headers + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders, + final T event) { + + publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); + } + private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) { return new ListenableFutureCallback<>() { @Override @@ -85,4 +102,10 @@ public class EventsPublisher<T> { }; } + private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) { + final Headers eventHeaders = new RecordHeaders(); + eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value))); + return eventHeaders; + } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java new file mode 100644 index 0000000000..f7707d9f76 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.lcm; + +import org.mapstruct.Mapper; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; + +@Mapper(componentModel = "spring") +public interface LcmEventHeaderMapper { + + /** + * Mapper for converting incoming {@link LcmEvent} to outgoing {@link LcmEventHeader}. + */ + + LcmEventHeader toLcmEventHeader(LcmEvent lcmEvent); + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java index 9d518432ad..f42cd39d4d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java @@ -43,7 +43,8 @@ import org.onap.cps.ncmp.api.inventory.CompositeState; import org.onap.cps.ncmp.api.inventory.CompositeStateUtils; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle; -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -76,7 +77,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState @Override @Timed(value = "cps.ncmp.cmhandle.state.update.batch", - description = "Time taken to update a batch of cm handle states") + description = "Time taken to update a batch of cm handle states") public void updateCmHandleStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) { final Collection<CmHandleTransitionPair> cmHandleTransitionPairs = prepareCmHandleTransitionBatch(cmHandleStatePerCmHandle); @@ -106,9 +107,12 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState private void publishLcmEvent(final NcmpServiceCmHandle targetNcmpServiceCmHandle, final NcmpServiceCmHandle existingNcmpServiceCmHandle) { final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId(); + final LcmEventHeader lcmEventHeader = + lcmEventsCreator.populateLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, + existingNcmpServiceCmHandle); final LcmEvent lcmEvent = lcmEventsCreator.populateLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle); - lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent); + lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader); } private Collection<CmHandleTransitionPair> prepareCmHandleTransitionBatch( @@ -221,6 +225,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState @Setter @NoArgsConstructor static class CmHandleTransitionPair { + private YangModelCmHandle currentYangModelCmHandle; private YangModelCmHandle targetYangModelCmHandle; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java index a72e664dcf..3c7c92b129 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java @@ -23,13 +23,15 @@ package org.onap.cps.ncmp.api.impl.events.lcm; import java.util.UUID; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter; import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle; -import org.onap.ncmp.cmhandle.event.lcm.Event; -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; -import org.onap.ncmp.cmhandle.event.lcm.Values; +import org.onap.cps.ncmp.events.lcm.v1.Event; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; +import org.onap.cps.ncmp.events.lcm.v1.Values; import org.springframework.stereotype.Component; @@ -38,8 +40,11 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component +@RequiredArgsConstructor public class LcmEventsCreator { + private final LcmEventHeaderMapper lcmEventHeaderMapper; + /** * Populate Lifecycle Management Event. * @@ -53,6 +58,20 @@ public class LcmEventsCreator { return createLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle); } + /** + * Populate Lifecycle Management Event Header. + * + * @param cmHandleId cm handle identifier + * @param targetNcmpServiceCmHandle target ncmp service cmhandle + * @param existingNcmpServiceCmHandle existing ncmp service cmhandle + * @return Populated LcmEventHeader + */ + public LcmEventHeader populateLcmEventHeader(final String cmHandleId, + final NcmpServiceCmHandle targetNcmpServiceCmHandle, + final NcmpServiceCmHandle existingNcmpServiceCmHandle) { + return createLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle); + } + private LcmEvent createLcmEvent(final String cmHandleId, final NcmpServiceCmHandle targetNcmpServiceCmHandle, final NcmpServiceCmHandle existingNcmpServiceCmHandle) { final LcmEventType lcmEventType = @@ -63,6 +82,15 @@ public class LcmEventsCreator { return lcmEvent; } + private LcmEventHeader createLcmEventHeader(final String cmHandleId, + final NcmpServiceCmHandle targetNcmpServiceCmHandle, + final NcmpServiceCmHandle existingNcmpServiceCmHandle) { + final LcmEventType lcmEventType = + LcmEventsCreatorHelper.determineEventType(targetNcmpServiceCmHandle, existingNcmpServiceCmHandle); + final LcmEvent lcmEventWithHeaderInformation = lcmEventHeader(cmHandleId, lcmEventType); + return lcmEventHeaderMapper.toLcmEventHeader(lcmEventWithHeaderInformation); + } + private Event lcmEventPayload(final String eventCorrelationId, final NcmpServiceCmHandle targetNcmpServiceCmHandle, final NcmpServiceCmHandle existingNcmpServiceCmHandle, final LcmEventType lcmEventType) { final Event event = new Event(); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java index 1322b7277f..d3b45d4a63 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java @@ -34,7 +34,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle; -import org.onap.ncmp.cmhandle.event.lcm.Values; +import org.onap.cps.ncmp.events.lcm.v1.Values; /** * LcmEventsCreatorHelper has helper methods to create LcmEvent. diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java index f258b45976..2e1b914b1d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java @@ -21,10 +21,13 @@ package org.onap.cps.ncmp.api.impl.events.lcm; import io.micrometer.core.annotation.Timed; +import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; +import org.onap.cps.utils.JsonObjectMapper; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.KafkaException; import org.springframework.stereotype.Service; @@ -39,6 +42,7 @@ import org.springframework.stereotype.Service; public class LcmEventsService { private final EventsPublisher<LcmEvent> eventsPublisher; + private final JsonObjectMapper jsonObjectMapper; @Value("${app.lcm.events.topic:ncmp-events}") private String topicName; @@ -47,17 +51,19 @@ public class LcmEventsService { private boolean notificationsEnabled; /** - * Publish the LcmEvent to the public topic. + * Publish the LcmEvent with header to the public topic. * - * @param cmHandleId Cm Handle Id - * @param lcmEvent Lcm Event + * @param cmHandleId Cm Handle Id + * @param lcmEvent Lcm Event + * @param lcmEventHeader Lcm Event Header */ - @Timed(value = "cps.ncmp.lcm.events.publish", - description = "Time taken to publish a LCM event") - public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent) { + @Timed(value = "cps.ncmp.lcm.events.publish", description = "Time taken to publish a LCM event") + public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) { if (notificationsEnabled) { try { - eventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent); + final Map<String, Object> lcmEventHeadersMap = + jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class); + eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent); } catch (final KafkaException e) { log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage()); } |