diff options
Diffstat (limited to 'cps-ncmp-service/src')
2 files changed, 101 insertions, 9 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsService.java index 10aebfa45d..192667175e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsService.java @@ -20,13 +20,18 @@ package org.onap.cps.ncmp.impl.inventory.sync.lcm; -import io.micrometer.core.annotation.Timed; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.events.EventsPublisher; import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; +import org.onap.cps.ncmp.events.lcm.v1.Values; import org.onap.cps.utils.JsonObjectMapper; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.KafkaException; @@ -41,8 +46,12 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class LcmEventsService { + private static final Tag TAG_METHOD = Tag.of("method", "publishLcmEvent"); + private static final Tag TAG_CLASS = Tag.of("class", LcmEventsService.class.getName()); + private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A"; private final EventsPublisher<LcmEvent> eventsPublisher; private final JsonObjectMapper jsonObjectMapper; + private final MeterRegistry meterRegistry; @Value("${app.lcm.events.topic:ncmp-events}") private String topicName; @@ -51,24 +60,58 @@ public class LcmEventsService { private boolean notificationsEnabled; /** - * Publish the LcmEvent with header to the public topic. + * Publishes an LCM event to the dedicated topic with optional notification headers. + * Capture and log KafkaException If an error occurs while publishing the event to Kafka * - * @param cmHandleId Cm Handle Id - * @param lcmEvent Lcm Event - * @param lcmEventHeader Lcm Event Header + * @param cmHandleId Cm Handle Id associated with the LCM event + * @param lcmEvent The LCM event object to be published + * @param lcmEventHeader Optional headers associated with the LCM event */ - @Timed(value = "cps.ncmp.lcm.events.publish", description = "Time taken to publish a LCM event") public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) { + if (notificationsEnabled) { + final Timer.Sample timerSample = Timer.start(meterRegistry); try { final Map<String, Object> lcmEventHeadersMap = jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class); eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent); } catch (final KafkaException e) { log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage()); + } finally { + recordMetrics(lcmEvent, timerSample); } } else { log.debug("Notifications disabled."); } } + + private void recordMetrics(final LcmEvent lcmEvent, final Timer.Sample timerSample) { + final List<Tag> tags = new ArrayList<>(4); + tags.add(TAG_CLASS); + tags.add(TAG_METHOD); + + final String oldCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getOldValues()); + tags.add(Tag.of("oldCmHandleState", oldCmHandleState)); + + final String newCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getNewValues()); + tags.add(Tag.of("newCmHandleState", newCmHandleState)); + + timerSample.stop(Timer.builder("cps.ncmp.lcm.events.publish") + .description("Time taken to publish a LCM event") + .tags(tags) + .register(meterRegistry)); + } + + /** + * Extracts the CM handle state value from the given Values object. + * If the provided Values object or its CM handle state is null, returns a default value. + * + * @param values The Values object containing CM handle state information. + * @return The CM handle state value as a string, or a default value if null. + */ + private String extractCmHandleStateValue(final Values values) { + return (values != null && values.getCmHandleState() != null) + ? values.getCmHandleState().value() + : UNAVAILABLE_CM_HANDLE_STATE; + } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsServiceSpec.groovy index b745734f0b..73c66089a3 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsServiceSpec.groovy @@ -20,9 +20,16 @@ package org.onap.cps.ncmp.impl.inventory.sync.lcm +import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.ADVISED +import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.READY + +import io.micrometer.core.instrument.Tag +import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.onap.cps.events.EventsPublisher +import org.onap.cps.ncmp.events.lcm.v1.Event import org.onap.cps.ncmp.events.lcm.v1.LcmEvent import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader +import org.onap.cps.ncmp.events.lcm.v1.Values import org.onap.cps.utils.JsonObjectMapper import org.springframework.kafka.KafkaException import spock.lang.Specification @@ -31,14 +38,16 @@ class LcmEventsServiceSpec extends Specification { def mockLcmEventsPublisher = Mock(EventsPublisher) def mockJsonObjectMapper = Mock(JsonObjectMapper) + def meterRegistry = new SimpleMeterRegistry() - def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher, mockJsonObjectMapper) + def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher, mockJsonObjectMapper, meterRegistry) def 'Create and Publish lcm event where events are #scenario'() { given: 'a cm handle id, Lcm Event, and headers' def cmHandleId = 'test-cm-handle-id' def eventId = UUID.randomUUID().toString() - def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId) + def event = getEventWithCmHandleState(ADVISED, READY) + def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId) and: 'we also have a lcm event header' def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId) and: 'notificationsEnabled is #notificationsEnabled and it will be true as default' @@ -57,6 +66,16 @@ class LcmEventsServiceSpec extends Specification { assert eventHeaders.get('eventCorrelationId') == cmHandleId } } + and: 'metrics are recorded with correct tags' + def timer = meterRegistry.find('cps.ncmp.lcm.events.publish').timer() + if (notificationsEnabled) { + assert timer != null + assert timer.count() == expectedTimesMethodCalled + def tags = timer.getId().getTags() + assert tags.containsAll(Tag.of('oldCmHandleState', ADVISED.value()), Tag.of('newCmHandleState', READY.value())) + } else { + assert timer == null + } where: 'the following values are used' scenario | notificationsEnabled || expectedTimesMethodCalled 'enabled' | true || 1 @@ -67,7 +86,8 @@ class LcmEventsServiceSpec extends Specification { given: 'a cm handle id and Lcm Event and notification enabled' def cmHandleId = 'test-cm-handle-id' def eventId = UUID.randomUUID().toString() - def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId) + and: 'event #event' + def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId) def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId) objectUnderTest.notificationsEnabled = true when: 'publisher set to throw an exception' @@ -76,6 +96,35 @@ class LcmEventsServiceSpec extends Specification { objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader) then: 'the exception is just logged and not bubbled up' noExceptionThrown() + and: 'metrics are recorded with error tags' + def timer = meterRegistry.find('cps.ncmp.lcm.events.publish').timer() + assert timer != null + assert timer.count() == 1 + def expectedTags = [Tag.of('oldCmHandleState', 'N/A'), Tag.of('newCmHandleState', 'N/A')] + def tags = timer.getId().getTags() + assert tags.containsAll(expectedTags) + where: 'the following values are used' + scenario | event + 'without values' | new Event() + 'without cm handle state' | getEvent() } + def getEvent() { + def event = new Event() + def values = new Values() + event.setOldValues(values) + event.setNewValues(values) + event + } + + def getEventWithCmHandleState(oldCmHandleState, newCmHandleState) { + def event = new Event() + def advisedCmHandleStateValues = new Values() + advisedCmHandleStateValues.setCmHandleState(oldCmHandleState) + event.setOldValues(advisedCmHandleStateValues) + def readyCmHandleStateValues = new Values() + readyCmHandleStateValues.setCmHandleState(newCmHandleState) + event.setNewValues(readyCmHandleStateValues) + return event + } } |