diff options
author | Luke Gleeson <luke.gleeson@est.tech> | 2023-05-22 08:48:28 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2023-05-22 08:48:28 +0000 |
commit | 9b059e3ad4edc1ea8d2f62ec4bf82543a1e157c2 (patch) | |
tree | e66599abf3742dff43dac60dff7843b1815ac4be /cps-ncmp-service/src | |
parent | 9fde458a5c3efe083710eef0815e3964ba873f88 (diff) | |
parent | 850656b7a159c0fe3070551990311aadec9b6a7f (diff) |
Merge "LcmEvent to have header now"
Diffstat (limited to 'cps-ncmp-service/src')
11 files changed, 194 insertions, 44 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 4c84629304..b0b091a2f6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -20,13 +20,16 @@ package org.onap.cps.ncmp.api.impl.events; +import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; +import org.springframework.util.SerializationUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -70,6 +73,20 @@ public class EventsPublisher<T> { eventFuture.addCallback(handleCallback(topicName)); } + /** + * Generic Event Publisher with headers. + * + * @param topicName valid topic name + * @param eventKey message key + * @param eventHeaders map of event headers + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders, + final T event) { + + publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); + } + private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) { return new ListenableFutureCallback<>() { @Override @@ -85,4 +102,10 @@ public class EventsPublisher<T> { }; } + private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) { + final Headers eventHeaders = new RecordHeaders(); + eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value))); + return eventHeaders; + } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java new file mode 100644 index 0000000000..f7707d9f76 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.lcm; + +import org.mapstruct.Mapper; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; + +@Mapper(componentModel = "spring") +public interface LcmEventHeaderMapper { + + /** + * Mapper for converting incoming {@link LcmEvent} to outgoing {@link LcmEventHeader}. + */ + + LcmEventHeader toLcmEventHeader(LcmEvent lcmEvent); + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java index 9d518432ad..f42cd39d4d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java @@ -43,7 +43,8 @@ import org.onap.cps.ncmp.api.inventory.CompositeState; import org.onap.cps.ncmp.api.inventory.CompositeStateUtils; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle; -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -76,7 +77,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState @Override @Timed(value = "cps.ncmp.cmhandle.state.update.batch", - description = "Time taken to update a batch of cm handle states") + description = "Time taken to update a batch of cm handle states") public void updateCmHandleStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) { final Collection<CmHandleTransitionPair> cmHandleTransitionPairs = prepareCmHandleTransitionBatch(cmHandleStatePerCmHandle); @@ -106,9 +107,12 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState private void publishLcmEvent(final NcmpServiceCmHandle targetNcmpServiceCmHandle, final NcmpServiceCmHandle existingNcmpServiceCmHandle) { final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId(); + final LcmEventHeader lcmEventHeader = + lcmEventsCreator.populateLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, + existingNcmpServiceCmHandle); final LcmEvent lcmEvent = lcmEventsCreator.populateLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle); - lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent); + lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader); } private Collection<CmHandleTransitionPair> prepareCmHandleTransitionBatch( @@ -221,6 +225,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState @Setter @NoArgsConstructor static class CmHandleTransitionPair { + private YangModelCmHandle currentYangModelCmHandle; private YangModelCmHandle targetYangModelCmHandle; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java index a72e664dcf..3c7c92b129 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java @@ -23,13 +23,15 @@ package org.onap.cps.ncmp.api.impl.events.lcm; import java.util.UUID; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter; import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle; -import org.onap.ncmp.cmhandle.event.lcm.Event; -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; -import org.onap.ncmp.cmhandle.event.lcm.Values; +import org.onap.cps.ncmp.events.lcm.v1.Event; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; +import org.onap.cps.ncmp.events.lcm.v1.Values; import org.springframework.stereotype.Component; @@ -38,8 +40,11 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component +@RequiredArgsConstructor public class LcmEventsCreator { + private final LcmEventHeaderMapper lcmEventHeaderMapper; + /** * Populate Lifecycle Management Event. * @@ -53,6 +58,20 @@ public class LcmEventsCreator { return createLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle); } + /** + * Populate Lifecycle Management Event Header. + * + * @param cmHandleId cm handle identifier + * @param targetNcmpServiceCmHandle target ncmp service cmhandle + * @param existingNcmpServiceCmHandle existing ncmp service cmhandle + * @return Populated LcmEventHeader + */ + public LcmEventHeader populateLcmEventHeader(final String cmHandleId, + final NcmpServiceCmHandle targetNcmpServiceCmHandle, + final NcmpServiceCmHandle existingNcmpServiceCmHandle) { + return createLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle); + } + private LcmEvent createLcmEvent(final String cmHandleId, final NcmpServiceCmHandle targetNcmpServiceCmHandle, final NcmpServiceCmHandle existingNcmpServiceCmHandle) { final LcmEventType lcmEventType = @@ -63,6 +82,15 @@ public class LcmEventsCreator { return lcmEvent; } + private LcmEventHeader createLcmEventHeader(final String cmHandleId, + final NcmpServiceCmHandle targetNcmpServiceCmHandle, + final NcmpServiceCmHandle existingNcmpServiceCmHandle) { + final LcmEventType lcmEventType = + LcmEventsCreatorHelper.determineEventType(targetNcmpServiceCmHandle, existingNcmpServiceCmHandle); + final LcmEvent lcmEventWithHeaderInformation = lcmEventHeader(cmHandleId, lcmEventType); + return lcmEventHeaderMapper.toLcmEventHeader(lcmEventWithHeaderInformation); + } + private Event lcmEventPayload(final String eventCorrelationId, final NcmpServiceCmHandle targetNcmpServiceCmHandle, final NcmpServiceCmHandle existingNcmpServiceCmHandle, final LcmEventType lcmEventType) { final Event event = new Event(); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java index 1322b7277f..d3b45d4a63 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java @@ -34,7 +34,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle; -import org.onap.ncmp.cmhandle.event.lcm.Values; +import org.onap.cps.ncmp.events.lcm.v1.Values; /** * LcmEventsCreatorHelper has helper methods to create LcmEvent. diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java index f258b45976..2e1b914b1d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java @@ -21,10 +21,13 @@ package org.onap.cps.ncmp.api.impl.events.lcm; import io.micrometer.core.annotation.Timed; +import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; +import org.onap.cps.utils.JsonObjectMapper; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.KafkaException; import org.springframework.stereotype.Service; @@ -39,6 +42,7 @@ import org.springframework.stereotype.Service; public class LcmEventsService { private final EventsPublisher<LcmEvent> eventsPublisher; + private final JsonObjectMapper jsonObjectMapper; @Value("${app.lcm.events.topic:ncmp-events}") private String topicName; @@ -47,17 +51,19 @@ public class LcmEventsService { private boolean notificationsEnabled; /** - * Publish the LcmEvent to the public topic. + * Publish the LcmEvent with header to the public topic. * - * @param cmHandleId Cm Handle Id - * @param lcmEvent Lcm Event + * @param cmHandleId Cm Handle Id + * @param lcmEvent Lcm Event + * @param lcmEventHeader Lcm Event Header */ - @Timed(value = "cps.ncmp.lcm.events.publish", - description = "Time taken to publish a LCM event") - public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent) { + @Timed(value = "cps.ncmp.lcm.events.publish", description = "Time taken to publish a LCM event") + public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) { if (notificationsEnabled) { try { - eventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent); + final Map<String, Object> lcmEventHeadersMap = + jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class); + eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent); } catch (final KafkaException e) { log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage()); } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy index f660be7103..e449d65ac2 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy @@ -54,7 +54,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { then: 'state is saved using inventory persistence' expectedCallsToInventoryPersistence * mockInventoryPersistence.saveCmHandleState(cmHandleId, _) and: 'event service is called to publish event' - expectedCallsToEventService * mockLcmEventsService.publishLcmEvent(cmHandleId, _) + expectedCallsToEventService * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) where: 'state change parameters are provided' stateChange | fromCmHandleState | toCmHandleState || expectedCallsToInventoryPersistence | expectedCallsToEventService 'ADVISED to READY' | ADVISED | READY || 1 | 1 @@ -73,7 +73,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { then: 'state is saved using inventory persistence' 1 * mockInventoryPersistence.saveCmHandle(yangModelCmHandle) and: 'event service is called to publish event' - 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _) + 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) } def 'Update and Publish Events on State Change from LOCKED to ADVISED'() { @@ -90,7 +90,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { } } and: 'event service is called to publish event' - 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _) + 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) } def 'Update and Publish Events on State Change to READY'() { @@ -111,7 +111,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { } } and: 'event service is called to publish event' - 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _) + 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) } def 'Update cmHandle state to "DELETING"' (){ @@ -125,7 +125,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { and: 'method to persist cm handle state is called once' 1 * mockInventoryPersistence.saveCmHandleState(yangModelCmHandle.getId(), yangModelCmHandle.getCompositeState()) and: 'the method to publish Lcm event is called once' - 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _) + 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) } def 'Update cmHandle state to "DELETED"' (){ @@ -137,7 +137,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { then: 'the cm handle state is as expected' yangModelCmHandle.getCompositeState().getCmHandleState() == DELETED and: 'the method to publish Lcm event is called once' - 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _) + 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) } def 'No state change and no event to be published'() { @@ -167,7 +167,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { } } and: 'event service is called to publish event' - 2 * mockLcmEventsService.publishLcmEvent(_, _) + 2 * mockLcmEventsService.publishLcmEvent(_, _, _) } @@ -183,7 +183,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { } } and: 'event service is called to publish event' - 2 * mockLcmEventsService.publishLcmEvent(_, _) + 2 * mockLcmEventsService.publishLcmEvent(_, _, _) } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy index f4adfc587c..6d7d6250f1 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy @@ -20,10 +20,11 @@ package org.onap.cps.ncmp.api.impl.events.lcm +import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.inventory.CmHandleState import org.onap.cps.ncmp.api.inventory.CompositeState import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle -import org.onap.ncmp.cmhandle.event.lcm.Values +import org.onap.cps.ncmp.events.lcm.v1.Values import spock.lang.Specification import static org.onap.cps.ncmp.api.inventory.CmHandleState.ADVISED @@ -32,7 +33,9 @@ import static org.onap.cps.ncmp.api.inventory.CmHandleState.READY class LcmEventsCreatorSpec extends Specification { - def objectUnderTest = new LcmEventsCreator() + LcmEventHeaderMapper lcmEventsHeaderMapper = Mappers.getMapper(LcmEventHeaderMapper) + + def objectUnderTest = new LcmEventsCreator(lcmEventsHeaderMapper) def cmHandleId = 'test-cm-handle' def 'Map the LcmEvent for #operation'() { @@ -159,4 +162,15 @@ class LcmEventsCreatorSpec extends Specification { 'null to null' | null | null } + + def 'Map the LcmEventHeader'() { + given: 'NCMP cm handle details with current and old details' + def existingNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: ADVISED)) + def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: READY)) + when: 'the event header is populated' + def result = objectUnderTest.populateLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle) + then: 'the header has fields populated' + assert result.eventCorrelationId == cmHandleId + assert result.eventId != null + } }
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy index 7c9464dccb..93741261f6 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy @@ -24,14 +24,15 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec +import org.onap.cps.ncmp.events.lcm.v1.Event +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper -import org.onap.ncmp.cmhandle.event.lcm.Event -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext +import org.springframework.util.SerializationUtils import org.testcontainers.spock.Testcontainers import java.time.Duration @@ -55,19 +56,35 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec { def 'Produce and Consume Lcm Event'() { given: 'event key and event data' def eventKey = 'lcm' + def eventId = 'test-uuid' + def eventCorrelationId = 'cmhandle-test' + def eventSource = 'org.onap.ncmp' + def eventTime = '2022-12-31T20:30:40.000+0000' + def eventType = 'org.onap.ncmp.cmhandle.lcm.event' + def eventSchema = 'org.onap.ncmp.cmhandle.lcm.event' + def eventSchemaVersion = 'v1' def eventData = new LcmEvent( - eventId: 'test-uuid', - eventCorrelationId: 'cmhandle-as-correlationid', - eventSource: 'org.onap.ncmp', - eventTime: '2022-12-31T20:30:40.000+0000', - eventType: 'org.onap.ncmp.cmhandle.lcm.event', - eventSchema: 'org.onap.ncmp.cmhandle.lcm.event', - eventSchemaVersion: 'v1', + eventId: eventId, + eventCorrelationId: eventCorrelationId, + eventSource: eventSource, + eventTime: eventTime, + eventType: eventType, + eventSchema: eventSchema, + eventSchemaVersion: eventSchemaVersion, event: new Event(cmHandleId: 'cmhandle-test')) + and: 'we have a event header' + def eventHeader = [ + eventId : eventId, + eventCorrelationId: eventCorrelationId, + eventSource : eventSource, + eventTime : eventTime, + eventType : eventType, + eventSchema : eventSchema, + eventSchemaVersion: eventSchemaVersion] and: 'consumer has a subscription' kafkaConsumer.subscribe([testTopic] as List<String>) when: 'an event is published' - lcmEventsPublisher.publishEvent(testTopic, eventKey, eventData) + lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData) and: 'topic is polled' def records = kafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' @@ -79,5 +96,8 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec { def expectedJsonString = TestUtils.getResourceFileContent('expectedLcmEvent.json') def expectedLcmEvent = jsonObjectMapper.convertJsonString(expectedJsonString, LcmEvent.class) assert expectedLcmEvent == jsonObjectMapper.convertJsonString(record.value, LcmEvent.class) + and: 'record header matches the expected parameters' + assert SerializationUtils.deserialize(record.headers().lastHeader('eventId').value()) == eventId + assert SerializationUtils.deserialize(record.headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy index 65f4d50c68..2d3f8ac516 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy @@ -21,26 +21,42 @@ package org.onap.cps.ncmp.api.impl.events.lcm import org.onap.cps.ncmp.api.impl.events.EventsPublisher -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent +import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader +import org.onap.cps.utils.JsonObjectMapper import org.springframework.kafka.KafkaException import spock.lang.Specification class LcmEventsServiceSpec extends Specification { def mockLcmEventsPublisher = Mock(EventsPublisher) + def mockJsonObjectMapper = Mock(JsonObjectMapper) - def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher) + def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher, mockJsonObjectMapper) def 'Create and Publish lcm event where events are #scenario'() { given: 'a cm handle id and Lcm Event' def cmHandleId = 'test-cm-handle-id' - def lcmEvent = new LcmEvent(eventId: UUID.randomUUID().toString(), eventCorrelationId: cmHandleId) + def eventId = UUID.randomUUID().toString() + def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId) + and: 'we also have a lcm event header' + def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId) and: 'notificationsEnabled is #notificationsEnabled and it will be true as default' objectUnderTest.notificationsEnabled = notificationsEnabled + and: 'lcm event header is transformed to headers map' + mockJsonObjectMapper.convertToValueType(lcmEventHeader, Map.class) >> ['eventId': eventId, 'eventCorrelationId': cmHandleId] when: 'service is called to publish lcm event' - objectUnderTest.publishLcmEvent('test-cm-handle-id', lcmEvent) + objectUnderTest.publishLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader) then: 'publisher is called #expectedTimesMethodCalled times' - expectedTimesMethodCalled * mockLcmEventsPublisher.publishEvent(_, cmHandleId, lcmEvent) + expectedTimesMethodCalled * mockLcmEventsPublisher.publishEvent(_, cmHandleId, _, lcmEvent) >> { + args -> { + def eventHeaders = (args[2] as Map<String,Object>) + assert eventHeaders.containsKey('eventId') + assert eventHeaders.containsKey('eventCorrelationId') + assert eventHeaders.get('eventId') == eventId + assert eventHeaders.get('eventCorrelationId') == cmHandleId + } + } where: 'the following values are used' scenario | notificationsEnabled || expectedTimesMethodCalled 'enabled' | true || 1 @@ -50,12 +66,14 @@ class LcmEventsServiceSpec extends Specification { def 'Unable to send message'(){ given: 'a cm handle id and Lcm Event and notification enabled' def cmHandleId = 'test-cm-handle-id' - def lcmEvent = new LcmEvent(eventId: UUID.randomUUID().toString(), eventCorrelationId: cmHandleId) + def eventId = UUID.randomUUID().toString() + def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId) + def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId) objectUnderTest.notificationsEnabled = true when: 'publisher set to throw an exception' - mockLcmEventsPublisher.publishEvent(*_) >> { throw new KafkaException('publishing failed')} + mockLcmEventsPublisher.publishEvent(_, _, _, _) >> { throw new KafkaException('publishing failed')} and: 'an event is publised' - objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent) + objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader) then: 'the exception is just logged and not bubbled up' noExceptionThrown() } diff --git a/cps-ncmp-service/src/test/resources/expectedLcmEvent.json b/cps-ncmp-service/src/test/resources/expectedLcmEvent.json index 1db16ee82a..20d557dc4f 100644 --- a/cps-ncmp-service/src/test/resources/expectedLcmEvent.json +++ b/cps-ncmp-service/src/test/resources/expectedLcmEvent.json @@ -1,6 +1,6 @@ { "eventId": "test-uuid", - "eventCorrelationId": "cmhandle-as-correlationid", + "eventCorrelationId": "cmhandle-test", "eventTime": "2022-12-31T20:30:40.000+0000", "eventSource": "org.onap.ncmp", "eventType": "org.onap.ncmp.cmhandle.lcm.event", |