summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src
diff options
context:
space:
mode:
authorLuke Gleeson <luke.gleeson@est.tech>2023-05-22 08:48:28 +0000
committerGerrit Code Review <gerrit@onap.org>2023-05-22 08:48:28 +0000
commit9b059e3ad4edc1ea8d2f62ec4bf82543a1e157c2 (patch)
treee66599abf3742dff43dac60dff7843b1815ac4be /cps-ncmp-service/src
parent9fde458a5c3efe083710eef0815e3964ba873f88 (diff)
parent850656b7a159c0fe3070551990311aadec9b6a7f (diff)
Merge "LcmEvent to have header now"
Diffstat (limited to 'cps-ncmp-service/src')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java23
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java36
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java11
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java34
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java2
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java22
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy16
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy18
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy40
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy34
-rw-r--r--cps-ncmp-service/src/test/resources/expectedLcmEvent.json2
11 files changed, 194 insertions, 44 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index 4c84629304..b0b091a2f6 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -20,13 +20,16 @@
package org.onap.cps.ncmp.api.impl.events;
+import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
+import org.springframework.util.SerializationUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@@ -70,6 +73,20 @@ public class EventsPublisher<T> {
eventFuture.addCallback(handleCallback(topicName));
}
+ /**
+ * Generic Event Publisher with headers.
+ *
+ * @param topicName valid topic name
+ * @param eventKey message key
+ * @param eventHeaders map of event headers
+ * @param event message payload
+ */
+ public void publishEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
+ final T event) {
+
+ publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
+ }
+
private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) {
return new ListenableFutureCallback<>() {
@Override
@@ -85,4 +102,10 @@ public class EventsPublisher<T> {
};
}
+ private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) {
+ final Headers eventHeaders = new RecordHeaders();
+ eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value)));
+ return eventHeaders;
+ }
+
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java
new file mode 100644
index 0000000000..f7707d9f76
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.lcm;
+
+import org.mapstruct.Mapper;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
+
+@Mapper(componentModel = "spring")
+public interface LcmEventHeaderMapper {
+
+ /**
+ * Mapper for converting incoming {@link LcmEvent} to outgoing {@link LcmEventHeader}.
+ */
+
+ LcmEventHeader toLcmEventHeader(LcmEvent lcmEvent);
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
index 9d518432ad..f42cd39d4d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
@@ -43,7 +43,8 @@ import org.onap.cps.ncmp.api.inventory.CompositeState;
import org.onap.cps.ncmp.api.inventory.CompositeStateUtils;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@@ -76,7 +77,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
@Override
@Timed(value = "cps.ncmp.cmhandle.state.update.batch",
- description = "Time taken to update a batch of cm handle states")
+ description = "Time taken to update a batch of cm handle states")
public void updateCmHandleStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) {
final Collection<CmHandleTransitionPair> cmHandleTransitionPairs =
prepareCmHandleTransitionBatch(cmHandleStatePerCmHandle);
@@ -106,9 +107,12 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
private void publishLcmEvent(final NcmpServiceCmHandle targetNcmpServiceCmHandle,
final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId();
+ final LcmEventHeader lcmEventHeader =
+ lcmEventsCreator.populateLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle,
+ existingNcmpServiceCmHandle);
final LcmEvent lcmEvent =
lcmEventsCreator.populateLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
- lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent);
+ lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader);
}
private Collection<CmHandleTransitionPair> prepareCmHandleTransitionBatch(
@@ -221,6 +225,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
@Setter
@NoArgsConstructor
static class CmHandleTransitionPair {
+
private YangModelCmHandle currentYangModelCmHandle;
private YangModelCmHandle targetYangModelCmHandle;
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
index a72e664dcf..3c7c92b129 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
@@ -23,13 +23,15 @@ package org.onap.cps.ncmp.api.impl.events.lcm;
import java.util.UUID;
import lombok.Getter;
import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter;
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.ncmp.cmhandle.event.lcm.Event;
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
-import org.onap.ncmp.cmhandle.event.lcm.Values;
+import org.onap.cps.ncmp.events.lcm.v1.Event;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
import org.springframework.stereotype.Component;
@@ -38,8 +40,11 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
+@RequiredArgsConstructor
public class LcmEventsCreator {
+ private final LcmEventHeaderMapper lcmEventHeaderMapper;
+
/**
* Populate Lifecycle Management Event.
*
@@ -53,6 +58,20 @@ public class LcmEventsCreator {
return createLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
}
+ /**
+ * Populate Lifecycle Management Event Header.
+ *
+ * @param cmHandleId cm handle identifier
+ * @param targetNcmpServiceCmHandle target ncmp service cmhandle
+ * @param existingNcmpServiceCmHandle existing ncmp service cmhandle
+ * @return Populated LcmEventHeader
+ */
+ public LcmEventHeader populateLcmEventHeader(final String cmHandleId,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+ final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
+ return createLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
+ }
+
private LcmEvent createLcmEvent(final String cmHandleId, final NcmpServiceCmHandle targetNcmpServiceCmHandle,
final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
final LcmEventType lcmEventType =
@@ -63,6 +82,15 @@ public class LcmEventsCreator {
return lcmEvent;
}
+ private LcmEventHeader createLcmEventHeader(final String cmHandleId,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+ final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
+ final LcmEventType lcmEventType =
+ LcmEventsCreatorHelper.determineEventType(targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
+ final LcmEvent lcmEventWithHeaderInformation = lcmEventHeader(cmHandleId, lcmEventType);
+ return lcmEventHeaderMapper.toLcmEventHeader(lcmEventWithHeaderInformation);
+ }
+
private Event lcmEventPayload(final String eventCorrelationId, final NcmpServiceCmHandle targetNcmpServiceCmHandle,
final NcmpServiceCmHandle existingNcmpServiceCmHandle, final LcmEventType lcmEventType) {
final Event event = new Event();
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java
index 1322b7277f..d3b45d4a63 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java
@@ -34,7 +34,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.ncmp.cmhandle.event.lcm.Values;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
/**
* LcmEventsCreatorHelper has helper methods to create LcmEvent.
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java
index f258b45976..2e1b914b1d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java
@@ -21,10 +21,13 @@
package org.onap.cps.ncmp.api.impl.events.lcm;
import io.micrometer.core.annotation.Timed;
+import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
+import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.KafkaException;
import org.springframework.stereotype.Service;
@@ -39,6 +42,7 @@ import org.springframework.stereotype.Service;
public class LcmEventsService {
private final EventsPublisher<LcmEvent> eventsPublisher;
+ private final JsonObjectMapper jsonObjectMapper;
@Value("${app.lcm.events.topic:ncmp-events}")
private String topicName;
@@ -47,17 +51,19 @@ public class LcmEventsService {
private boolean notificationsEnabled;
/**
- * Publish the LcmEvent to the public topic.
+ * Publish the LcmEvent with header to the public topic.
*
- * @param cmHandleId Cm Handle Id
- * @param lcmEvent Lcm Event
+ * @param cmHandleId Cm Handle Id
+ * @param lcmEvent Lcm Event
+ * @param lcmEventHeader Lcm Event Header
*/
- @Timed(value = "cps.ncmp.lcm.events.publish",
- description = "Time taken to publish a LCM event")
- public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent) {
+ @Timed(value = "cps.ncmp.lcm.events.publish", description = "Time taken to publish a LCM event")
+ public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) {
if (notificationsEnabled) {
try {
- eventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent);
+ final Map<String, Object> lcmEventHeadersMap =
+ jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
+ eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
} catch (final KafkaException e) {
log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage());
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy
index f660be7103..e449d65ac2 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy
@@ -54,7 +54,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
then: 'state is saved using inventory persistence'
expectedCallsToInventoryPersistence * mockInventoryPersistence.saveCmHandleState(cmHandleId, _)
and: 'event service is called to publish event'
- expectedCallsToEventService * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+ expectedCallsToEventService * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
where: 'state change parameters are provided'
stateChange | fromCmHandleState | toCmHandleState || expectedCallsToInventoryPersistence | expectedCallsToEventService
'ADVISED to READY' | ADVISED | READY || 1 | 1
@@ -73,7 +73,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
then: 'state is saved using inventory persistence'
1 * mockInventoryPersistence.saveCmHandle(yangModelCmHandle)
and: 'event service is called to publish event'
- 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+ 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
}
def 'Update and Publish Events on State Change from LOCKED to ADVISED'() {
@@ -90,7 +90,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
}
}
and: 'event service is called to publish event'
- 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+ 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
}
def 'Update and Publish Events on State Change to READY'() {
@@ -111,7 +111,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
}
}
and: 'event service is called to publish event'
- 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+ 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
}
def 'Update cmHandle state to "DELETING"' (){
@@ -125,7 +125,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
and: 'method to persist cm handle state is called once'
1 * mockInventoryPersistence.saveCmHandleState(yangModelCmHandle.getId(), yangModelCmHandle.getCompositeState())
and: 'the method to publish Lcm event is called once'
- 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+ 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
}
def 'Update cmHandle state to "DELETED"' (){
@@ -137,7 +137,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
then: 'the cm handle state is as expected'
yangModelCmHandle.getCompositeState().getCmHandleState() == DELETED
and: 'the method to publish Lcm event is called once'
- 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+ 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
}
def 'No state change and no event to be published'() {
@@ -167,7 +167,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
}
}
and: 'event service is called to publish event'
- 2 * mockLcmEventsService.publishLcmEvent(_, _)
+ 2 * mockLcmEventsService.publishLcmEvent(_, _, _)
}
@@ -183,7 +183,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
}
}
and: 'event service is called to publish event'
- 2 * mockLcmEventsService.publishLcmEvent(_, _)
+ 2 * mockLcmEventsService.publishLcmEvent(_, _, _)
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy
index f4adfc587c..6d7d6250f1 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy
@@ -20,10 +20,11 @@
package org.onap.cps.ncmp.api.impl.events.lcm
+import org.mapstruct.factory.Mappers
import org.onap.cps.ncmp.api.inventory.CmHandleState
import org.onap.cps.ncmp.api.inventory.CompositeState
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
-import org.onap.ncmp.cmhandle.event.lcm.Values
+import org.onap.cps.ncmp.events.lcm.v1.Values
import spock.lang.Specification
import static org.onap.cps.ncmp.api.inventory.CmHandleState.ADVISED
@@ -32,7 +33,9 @@ import static org.onap.cps.ncmp.api.inventory.CmHandleState.READY
class LcmEventsCreatorSpec extends Specification {
- def objectUnderTest = new LcmEventsCreator()
+ LcmEventHeaderMapper lcmEventsHeaderMapper = Mappers.getMapper(LcmEventHeaderMapper)
+
+ def objectUnderTest = new LcmEventsCreator(lcmEventsHeaderMapper)
def cmHandleId = 'test-cm-handle'
def 'Map the LcmEvent for #operation'() {
@@ -159,4 +162,15 @@ class LcmEventsCreatorSpec extends Specification {
'null to null' | null | null
}
+
+ def 'Map the LcmEventHeader'() {
+ given: 'NCMP cm handle details with current and old details'
+ def existingNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: ADVISED))
+ def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: READY))
+ when: 'the event header is populated'
+ def result = objectUnderTest.populateLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle)
+ then: 'the header has fields populated'
+ assert result.eventCorrelationId == cmHandleId
+ assert result.eventId != null
+ }
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
index 7c9464dccb..93741261f6 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
@@ -24,14 +24,15 @@ import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.events.lcm.v1.Event
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
-import org.onap.ncmp.cmhandle.event.lcm.Event
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent
import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
+import org.springframework.util.SerializationUtils
import org.testcontainers.spock.Testcontainers
import java.time.Duration
@@ -55,19 +56,35 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec {
def 'Produce and Consume Lcm Event'() {
given: 'event key and event data'
def eventKey = 'lcm'
+ def eventId = 'test-uuid'
+ def eventCorrelationId = 'cmhandle-test'
+ def eventSource = 'org.onap.ncmp'
+ def eventTime = '2022-12-31T20:30:40.000+0000'
+ def eventType = 'org.onap.ncmp.cmhandle.lcm.event'
+ def eventSchema = 'org.onap.ncmp.cmhandle.lcm.event'
+ def eventSchemaVersion = 'v1'
def eventData = new LcmEvent(
- eventId: 'test-uuid',
- eventCorrelationId: 'cmhandle-as-correlationid',
- eventSource: 'org.onap.ncmp',
- eventTime: '2022-12-31T20:30:40.000+0000',
- eventType: 'org.onap.ncmp.cmhandle.lcm.event',
- eventSchema: 'org.onap.ncmp.cmhandle.lcm.event',
- eventSchemaVersion: 'v1',
+ eventId: eventId,
+ eventCorrelationId: eventCorrelationId,
+ eventSource: eventSource,
+ eventTime: eventTime,
+ eventType: eventType,
+ eventSchema: eventSchema,
+ eventSchemaVersion: eventSchemaVersion,
event: new Event(cmHandleId: 'cmhandle-test'))
+ and: 'we have a event header'
+ def eventHeader = [
+ eventId : eventId,
+ eventCorrelationId: eventCorrelationId,
+ eventSource : eventSource,
+ eventTime : eventTime,
+ eventType : eventType,
+ eventSchema : eventSchema,
+ eventSchemaVersion: eventSchemaVersion]
and: 'consumer has a subscription'
kafkaConsumer.subscribe([testTopic] as List<String>)
when: 'an event is published'
- lcmEventsPublisher.publishEvent(testTopic, eventKey, eventData)
+ lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData)
and: 'topic is polled'
def records = kafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
@@ -79,5 +96,8 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec {
def expectedJsonString = TestUtils.getResourceFileContent('expectedLcmEvent.json')
def expectedLcmEvent = jsonObjectMapper.convertJsonString(expectedJsonString, LcmEvent.class)
assert expectedLcmEvent == jsonObjectMapper.convertJsonString(record.value, LcmEvent.class)
+ and: 'record header matches the expected parameters'
+ assert SerializationUtils.deserialize(record.headers().lastHeader('eventId').value()) == eventId
+ assert SerializationUtils.deserialize(record.headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy
index 65f4d50c68..2d3f8ac516 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy
@@ -21,26 +21,42 @@
package org.onap.cps.ncmp.api.impl.events.lcm
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader
+import org.onap.cps.utils.JsonObjectMapper
import org.springframework.kafka.KafkaException
import spock.lang.Specification
class LcmEventsServiceSpec extends Specification {
def mockLcmEventsPublisher = Mock(EventsPublisher)
+ def mockJsonObjectMapper = Mock(JsonObjectMapper)
- def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher)
+ def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher, mockJsonObjectMapper)
def 'Create and Publish lcm event where events are #scenario'() {
given: 'a cm handle id and Lcm Event'
def cmHandleId = 'test-cm-handle-id'
- def lcmEvent = new LcmEvent(eventId: UUID.randomUUID().toString(), eventCorrelationId: cmHandleId)
+ def eventId = UUID.randomUUID().toString()
+ def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId)
+ and: 'we also have a lcm event header'
+ def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
and: 'notificationsEnabled is #notificationsEnabled and it will be true as default'
objectUnderTest.notificationsEnabled = notificationsEnabled
+ and: 'lcm event header is transformed to headers map'
+ mockJsonObjectMapper.convertToValueType(lcmEventHeader, Map.class) >> ['eventId': eventId, 'eventCorrelationId': cmHandleId]
when: 'service is called to publish lcm event'
- objectUnderTest.publishLcmEvent('test-cm-handle-id', lcmEvent)
+ objectUnderTest.publishLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader)
then: 'publisher is called #expectedTimesMethodCalled times'
- expectedTimesMethodCalled * mockLcmEventsPublisher.publishEvent(_, cmHandleId, lcmEvent)
+ expectedTimesMethodCalled * mockLcmEventsPublisher.publishEvent(_, cmHandleId, _, lcmEvent) >> {
+ args -> {
+ def eventHeaders = (args[2] as Map<String,Object>)
+ assert eventHeaders.containsKey('eventId')
+ assert eventHeaders.containsKey('eventCorrelationId')
+ assert eventHeaders.get('eventId') == eventId
+ assert eventHeaders.get('eventCorrelationId') == cmHandleId
+ }
+ }
where: 'the following values are used'
scenario | notificationsEnabled || expectedTimesMethodCalled
'enabled' | true || 1
@@ -50,12 +66,14 @@ class LcmEventsServiceSpec extends Specification {
def 'Unable to send message'(){
given: 'a cm handle id and Lcm Event and notification enabled'
def cmHandleId = 'test-cm-handle-id'
- def lcmEvent = new LcmEvent(eventId: UUID.randomUUID().toString(), eventCorrelationId: cmHandleId)
+ def eventId = UUID.randomUUID().toString()
+ def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId)
+ def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
objectUnderTest.notificationsEnabled = true
when: 'publisher set to throw an exception'
- mockLcmEventsPublisher.publishEvent(*_) >> { throw new KafkaException('publishing failed')}
+ mockLcmEventsPublisher.publishEvent(_, _, _, _) >> { throw new KafkaException('publishing failed')}
and: 'an event is publised'
- objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent)
+ objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader)
then: 'the exception is just logged and not bubbled up'
noExceptionThrown()
}
diff --git a/cps-ncmp-service/src/test/resources/expectedLcmEvent.json b/cps-ncmp-service/src/test/resources/expectedLcmEvent.json
index 1db16ee82a..20d557dc4f 100644
--- a/cps-ncmp-service/src/test/resources/expectedLcmEvent.json
+++ b/cps-ncmp-service/src/test/resources/expectedLcmEvent.json
@@ -1,6 +1,6 @@
{
"eventId": "test-uuid",
- "eventCorrelationId": "cmhandle-as-correlationid",
+ "eventCorrelationId": "cmhandle-test",
"eventTime": "2022-12-31T20:30:40.000+0000",
"eventSource": "org.onap.ncmp",
"eventType": "org.onap.ncmp.cmhandle.lcm.event",