summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java23
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java36
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java11
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java34
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java2
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java22
6 files changed, 113 insertions, 15 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index 4c84629304..b0b091a2f6 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -20,13 +20,16 @@
package org.onap.cps.ncmp.api.impl.events;
+import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
+import org.springframework.util.SerializationUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@@ -70,6 +73,20 @@ public class EventsPublisher<T> {
eventFuture.addCallback(handleCallback(topicName));
}
+ /**
+ * Generic Event Publisher with headers.
+ *
+ * @param topicName valid topic name
+ * @param eventKey message key
+ * @param eventHeaders map of event headers
+ * @param event message payload
+ */
+ public void publishEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
+ final T event) {
+
+ publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
+ }
+
private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) {
return new ListenableFutureCallback<>() {
@Override
@@ -85,4 +102,10 @@ public class EventsPublisher<T> {
};
}
+ private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) {
+ final Headers eventHeaders = new RecordHeaders();
+ eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value)));
+ return eventHeaders;
+ }
+
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java
new file mode 100644
index 0000000000..f7707d9f76
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.lcm;
+
+import org.mapstruct.Mapper;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
+
+@Mapper(componentModel = "spring")
+public interface LcmEventHeaderMapper {
+
+ /**
+ * Mapper for converting incoming {@link LcmEvent} to outgoing {@link LcmEventHeader}.
+ */
+
+ LcmEventHeader toLcmEventHeader(LcmEvent lcmEvent);
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
index 9d518432ad..f42cd39d4d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
@@ -43,7 +43,8 @@ import org.onap.cps.ncmp.api.inventory.CompositeState;
import org.onap.cps.ncmp.api.inventory.CompositeStateUtils;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@@ -76,7 +77,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
@Override
@Timed(value = "cps.ncmp.cmhandle.state.update.batch",
- description = "Time taken to update a batch of cm handle states")
+ description = "Time taken to update a batch of cm handle states")
public void updateCmHandleStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) {
final Collection<CmHandleTransitionPair> cmHandleTransitionPairs =
prepareCmHandleTransitionBatch(cmHandleStatePerCmHandle);
@@ -106,9 +107,12 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
private void publishLcmEvent(final NcmpServiceCmHandle targetNcmpServiceCmHandle,
final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId();
+ final LcmEventHeader lcmEventHeader =
+ lcmEventsCreator.populateLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle,
+ existingNcmpServiceCmHandle);
final LcmEvent lcmEvent =
lcmEventsCreator.populateLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
- lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent);
+ lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader);
}
private Collection<CmHandleTransitionPair> prepareCmHandleTransitionBatch(
@@ -221,6 +225,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
@Setter
@NoArgsConstructor
static class CmHandleTransitionPair {
+
private YangModelCmHandle currentYangModelCmHandle;
private YangModelCmHandle targetYangModelCmHandle;
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
index a72e664dcf..3c7c92b129 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
@@ -23,13 +23,15 @@ package org.onap.cps.ncmp.api.impl.events.lcm;
import java.util.UUID;
import lombok.Getter;
import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter;
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.ncmp.cmhandle.event.lcm.Event;
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
-import org.onap.ncmp.cmhandle.event.lcm.Values;
+import org.onap.cps.ncmp.events.lcm.v1.Event;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
import org.springframework.stereotype.Component;
@@ -38,8 +40,11 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
+@RequiredArgsConstructor
public class LcmEventsCreator {
+ private final LcmEventHeaderMapper lcmEventHeaderMapper;
+
/**
* Populate Lifecycle Management Event.
*
@@ -53,6 +58,20 @@ public class LcmEventsCreator {
return createLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
}
+ /**
+ * Populate Lifecycle Management Event Header.
+ *
+ * @param cmHandleId cm handle identifier
+ * @param targetNcmpServiceCmHandle target ncmp service cmhandle
+ * @param existingNcmpServiceCmHandle existing ncmp service cmhandle
+ * @return Populated LcmEventHeader
+ */
+ public LcmEventHeader populateLcmEventHeader(final String cmHandleId,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+ final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
+ return createLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
+ }
+
private LcmEvent createLcmEvent(final String cmHandleId, final NcmpServiceCmHandle targetNcmpServiceCmHandle,
final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
final LcmEventType lcmEventType =
@@ -63,6 +82,15 @@ public class LcmEventsCreator {
return lcmEvent;
}
+ private LcmEventHeader createLcmEventHeader(final String cmHandleId,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+ final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
+ final LcmEventType lcmEventType =
+ LcmEventsCreatorHelper.determineEventType(targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
+ final LcmEvent lcmEventWithHeaderInformation = lcmEventHeader(cmHandleId, lcmEventType);
+ return lcmEventHeaderMapper.toLcmEventHeader(lcmEventWithHeaderInformation);
+ }
+
private Event lcmEventPayload(final String eventCorrelationId, final NcmpServiceCmHandle targetNcmpServiceCmHandle,
final NcmpServiceCmHandle existingNcmpServiceCmHandle, final LcmEventType lcmEventType) {
final Event event = new Event();
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java
index 1322b7277f..d3b45d4a63 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java
@@ -34,7 +34,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.ncmp.cmhandle.event.lcm.Values;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
/**
* LcmEventsCreatorHelper has helper methods to create LcmEvent.
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java
index f258b45976..2e1b914b1d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java
@@ -21,10 +21,13 @@
package org.onap.cps.ncmp.api.impl.events.lcm;
import io.micrometer.core.annotation.Timed;
+import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
+import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.KafkaException;
import org.springframework.stereotype.Service;
@@ -39,6 +42,7 @@ import org.springframework.stereotype.Service;
public class LcmEventsService {
private final EventsPublisher<LcmEvent> eventsPublisher;
+ private final JsonObjectMapper jsonObjectMapper;
@Value("${app.lcm.events.topic:ncmp-events}")
private String topicName;
@@ -47,17 +51,19 @@ public class LcmEventsService {
private boolean notificationsEnabled;
/**
- * Publish the LcmEvent to the public topic.
+ * Publish the LcmEvent with header to the public topic.
*
- * @param cmHandleId Cm Handle Id
- * @param lcmEvent Lcm Event
+ * @param cmHandleId Cm Handle Id
+ * @param lcmEvent Lcm Event
+ * @param lcmEventHeader Lcm Event Header
*/
- @Timed(value = "cps.ncmp.lcm.events.publish",
- description = "Time taken to publish a LCM event")
- public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent) {
+ @Timed(value = "cps.ncmp.lcm.events.publish", description = "Time taken to publish a LCM event")
+ public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) {
if (notificationsEnabled) {
try {
- eventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent);
+ final Map<String, Object> lcmEventHeadersMap =
+ jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
+ eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
} catch (final KafkaException e) {
log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage());
}