aboutsummaryrefslogtreecommitdiffstats
path: root/runtime-controlloop
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2021-08-18 15:25:59 +0100
committerFrancescoFioraEst <francesco.fiora@est.tech>2021-08-26 13:44:57 +0100
commit6d02de6b9ea3f4e6fc588813fd2177c732a2af92 (patch)
tree71d74f431b35e950767be889a2b6d7ed1de7af45 /runtime-controlloop
parent281a36c50d68f29e0e47dfec10ee8be38f5e5761 (diff)
Fix issue in event handling in participants
Fix issue in event handling in participants and refactor Participant Publisher and Listener Issue-ID: POLICY-3544 Change-Id: Ic92ffa79d303adfb1c3319fbfefb1faef911a9d4 Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-controlloop')
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java38
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java11
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java2
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java7
-rw-r--r--runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java5
5 files changed, 38 insertions, 25 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java
index 891dab9ae..d196dd193 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java
@@ -23,8 +23,6 @@ package org.onap.policy.clamp.controlloop.runtime.config.messaging;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import java.util.stream.Stream;
-import javax.ws.rs.core.Response.Status;
import lombok.Getter;
import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
@@ -33,6 +31,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@@ -53,36 +52,31 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
* Constructor.
*
* @param clRuntimeParameterGroup the parameters for the control loop runtime service
- * @param publishers array of Publishers
- * @param listeners array of Listeners
+ * @param publishers list of Publishers
+ * @param listeners list of Listeners
* @throws ControlLoopRuntimeException if the activator does not start
*/
- public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, Publisher[] publishers,
- Listener[] listeners) {
+ public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, List<Publisher> publishers,
+ List<Listener> listeners) {
topicSinks = TopicEndpointManager.getManager()
.addTopicSinks(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSinks());
topicSources = TopicEndpointManager.getManager()
.addTopicSources(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSources());
- try {
- msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
- } catch (final RuntimeException e) {
- throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
- "topic message dispatcher failed to start", e);
- }
+ msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
// @formatter:off
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
- Stream.of(publishers).forEach(publisher ->
+ publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
() -> publisher.active(topicSinks),
- () -> publisher.stop()));
+ publisher::stop));
- Stream.of(listeners).forEach(listener ->
+ listeners.forEach(listener ->
addAction("Listener " + listener.getClass().getSimpleName(),
() -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
() -> msgDispatcher.unregister(listener.getType())));
@@ -121,10 +115,22 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
}
}
+ /**
+ * Handle ContextClosedEvent.
+ *
+ * @param ctxClosedEvent ContextClosedEvent
+ */
+ @EventListener
+ public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
+ if (isAlive()) {
+ stop();
+ }
+ }
+
@Override
public void close() throws IOException {
if (isAlive()) {
- stop();
+ super.shutdown();
}
}
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
index 2cc0f94e2..b39573461 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
@@ -139,7 +139,8 @@ public class SupervisionHandler {
public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
LOGGER.debug("Participant Register received {}", participantRegisterMessage);
- participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId());
+ participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId(),
+ participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType());
participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(),
participantRegisterMessage.getParticipantType(), true);
@@ -358,15 +359,15 @@ public class SupervisionHandler {
throws PfModelException, ControlLoopException {
if (participantStatusMessage.getControlLoopInfoList() != null) {
for (ControlLoopInfo clEntry : participantStatusMessage.getControlLoopInfoList()) {
- var dbControlLoop = controlLoopProvider.getControlLoop(
- new ToscaConceptIdentifier(clEntry.getControlLoopId()));
+ var dbControlLoop =
+ controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(clEntry.getControlLoopId()));
if (dbControlLoop == null) {
exceptionOccured(Response.Status.NOT_FOUND,
"PARTICIPANT_STATUS control loop not found in database: " + clEntry.getControlLoopId());
}
dbControlLoop.setState(clEntry.getState());
- monitoringProvider.createClElementStatistics(clEntry.getControlLoopStatistics()
- .getClElementStatisticsList().getClElementStatistics());
+ monitoringProvider.createClElementStatistics(
+ clEntry.getControlLoopStatistics().getClElementStatisticsList().getClElementStatistics());
}
}
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
index 151b04cbf..a05337991 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
@@ -147,7 +147,7 @@ public class SupervisionScanner {
if (participantUpdateCounter.count(id)) {
LOGGER.debug("retry message ParticipantUpdate");
- participantUpdatePublisher.send(id.getLeft(), id.getRight());
+ participantUpdatePublisher.send(id.getLeft(), id.getRight(), true);
} else {
LOGGER.debug("report Participant Update fault");
participantUpdateCounter.setFault(id);
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
index 73860b5c2..8cbaec8b1 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
import java.util.UUID;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.springframework.stereotype.Component;
/**
@@ -34,9 +35,13 @@ public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPubli
* Send ParticipantRegisterAck to Participant.
*
* @param responseTo the original request id in the request.
+ * @param participantId the participant Id
+ * @param participantType the participant Type
*/
- public void send(UUID responseTo) {
+ public void send(UUID responseTo, ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) {
var message = new ParticipantRegisterAck();
+ message.setParticipantId(participantId);
+ message.setParticipantType(participantType);
message.setResponseTo(responseTo);
message.setMessage("Participant Register Ack");
message.setResult(true);
diff --git a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java
index 461c8b558..936bb1444 100644
--- a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java
+++ b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.List;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
@@ -56,7 +57,7 @@ class MessageDispatcherActivatorTest {
var publisherFirst = spy(mock(Publisher.class));
var publisherSecond = spy(mock(Publisher.class));
- var publishers = new Publisher[] {publisherFirst, publisherSecond};
+ var publishers = List.of(publisherFirst, publisherSecond);
var listenerFirst = spy(mock(ParticipantStatusListener.class));
when(listenerFirst.getType()).thenReturn(TOPIC_FIRST);
@@ -66,7 +67,7 @@ class MessageDispatcherActivatorTest {
when(listenerSecond.getType()).thenReturn(TOPIC_SECOND);
when(listenerSecond.getScoListener()).thenReturn(listenerSecond);
- var listeners = new Listener[] {listenerFirst, listenerSecond};
+ List<Listener> listeners = List.of(listenerFirst, listenerSecond);
try (var activator = new MessageDispatcherActivator(parameterGroup, publishers, listeners)) {