diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-08-18 15:25:59 +0100 |
---|---|---|
committer | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-08-26 13:44:57 +0100 |
commit | 6d02de6b9ea3f4e6fc588813fd2177c732a2af92 (patch) | |
tree | 71d74f431b35e950767be889a2b6d7ed1de7af45 /runtime-controlloop | |
parent | 281a36c50d68f29e0e47dfec10ee8be38f5e5761 (diff) |
Fix issue in event handling in participants
Fix issue in event handling in participants
and refactor Participant Publisher and Listener
Issue-ID: POLICY-3544
Change-Id: Ic92ffa79d303adfb1c3319fbfefb1faef911a9d4
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-controlloop')
5 files changed, 38 insertions, 25 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java index 891dab9ae..d196dd193 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java @@ -23,8 +23,6 @@ package org.onap.policy.clamp.controlloop.runtime.config.messaging; import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.stream.Stream; -import javax.ws.rs.core.Response.Status; import lombok.Getter; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; @@ -33,6 +31,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @@ -53,36 +52,31 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen * Constructor. * * @param clRuntimeParameterGroup the parameters for the control loop runtime service - * @param publishers array of Publishers - * @param listeners array of Listeners + * @param publishers list of Publishers + * @param listeners list of Listeners * @throws ControlLoopRuntimeException if the activator does not start */ - public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, Publisher[] publishers, - Listener[] listeners) { + public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, List<Publisher> publishers, + List<Listener> listeners) { topicSinks = TopicEndpointManager.getManager() .addTopicSinks(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSinks()); topicSources = TopicEndpointManager.getManager() .addTopicSources(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSources()); - try { - msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - } catch (final RuntimeException e) { - throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, - "topic message dispatcher failed to start", e); - } + msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); // @formatter:off addAction("Topic endpoint management", () -> TopicEndpointManager.getManager().start(), () -> TopicEndpointManager.getManager().shutdown()); - Stream.of(publishers).forEach(publisher -> + publishers.forEach(publisher -> addAction("Publisher " + publisher.getClass().getSimpleName(), () -> publisher.active(topicSinks), - () -> publisher.stop())); + publisher::stop)); - Stream.of(listeners).forEach(listener -> + listeners.forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(), () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), () -> msgDispatcher.unregister(listener.getType()))); @@ -121,10 +115,22 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen } } + /** + * Handle ContextClosedEvent. + * + * @param ctxClosedEvent ContextClosedEvent + */ + @EventListener + public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) { + if (isAlive()) { + stop(); + } + } + @Override public void close() throws IOException { if (isAlive()) { - stop(); + super.shutdown(); } } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java index 2cc0f94e2..b39573461 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java @@ -139,7 +139,8 @@ public class SupervisionHandler { public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) { LOGGER.debug("Participant Register received {}", participantRegisterMessage); - participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId()); + participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId(), + participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType()); participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType(), true); @@ -358,15 +359,15 @@ public class SupervisionHandler { throws PfModelException, ControlLoopException { if (participantStatusMessage.getControlLoopInfoList() != null) { for (ControlLoopInfo clEntry : participantStatusMessage.getControlLoopInfoList()) { - var dbControlLoop = controlLoopProvider.getControlLoop( - new ToscaConceptIdentifier(clEntry.getControlLoopId())); + var dbControlLoop = + controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(clEntry.getControlLoopId())); if (dbControlLoop == null) { exceptionOccured(Response.Status.NOT_FOUND, "PARTICIPANT_STATUS control loop not found in database: " + clEntry.getControlLoopId()); } dbControlLoop.setState(clEntry.getState()); - monitoringProvider.createClElementStatistics(clEntry.getControlLoopStatistics() - .getClElementStatisticsList().getClElementStatistics()); + monitoringProvider.createClElementStatistics( + clEntry.getControlLoopStatistics().getClElementStatisticsList().getClElementStatistics()); } } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java index 151b04cbf..a05337991 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java @@ -147,7 +147,7 @@ public class SupervisionScanner { if (participantUpdateCounter.count(id)) { LOGGER.debug("retry message ParticipantUpdate"); - participantUpdatePublisher.send(id.getLeft(), id.getRight()); + participantUpdatePublisher.send(id.getLeft(), id.getRight(), true); } else { LOGGER.debug("report Participant Update fault"); participantUpdateCounter.setFault(id); diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java index 73860b5c2..8cbaec8b1 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java @@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; import java.util.UUID; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.springframework.stereotype.Component; /** @@ -34,9 +35,13 @@ public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPubli * Send ParticipantRegisterAck to Participant. * * @param responseTo the original request id in the request. + * @param participantId the participant Id + * @param participantType the participant Type */ - public void send(UUID responseTo) { + public void send(UUID responseTo, ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) { var message = new ParticipantRegisterAck(); + message.setParticipantId(participantId); + message.setParticipantType(participantType); message.setResponseTo(responseTo); message.setMessage("Participant Register Ack"); message.setResult(true); diff --git a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java index 461c8b558..936bb1444 100644 --- a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java +++ b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.List; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener; @@ -56,7 +57,7 @@ class MessageDispatcherActivatorTest { var publisherFirst = spy(mock(Publisher.class)); var publisherSecond = spy(mock(Publisher.class)); - var publishers = new Publisher[] {publisherFirst, publisherSecond}; + var publishers = List.of(publisherFirst, publisherSecond); var listenerFirst = spy(mock(ParticipantStatusListener.class)); when(listenerFirst.getType()).thenReturn(TOPIC_FIRST); @@ -66,7 +67,7 @@ class MessageDispatcherActivatorTest { when(listenerSecond.getType()).thenReturn(TOPIC_SECOND); when(listenerSecond.getScoListener()).thenReturn(listenerSecond); - var listeners = new Listener[] {listenerFirst, listenerSecond}; + List<Listener> listeners = List.of(listenerFirst, listenerSecond); try (var activator = new MessageDispatcherActivator(parameterGroup, publishers, listeners)) { |