diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-08-18 15:25:59 +0100 |
---|---|---|
committer | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-08-26 13:44:57 +0100 |
commit | 6d02de6b9ea3f4e6fc588813fd2177c732a2af92 (patch) | |
tree | 71d74f431b35e950767be889a2b6d7ed1de7af45 | |
parent | 281a36c50d68f29e0e47dfec10ee8be38f5e5761 (diff) |
Fix issue in event handling in participants
Fix issue in event handling in participants
and refactor Participant Publisher and Listener
Issue-ID: POLICY-3544
Change-Id: Ic92ffa79d303adfb1c3319fbfefb1faef911a9d4
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
24 files changed, 337 insertions, 150 deletions
diff --git a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java index 8b59a1801..c6f5c61b9 100644 --- a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java +++ b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java @@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant; import java.util.UUID; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; import lombok.ToString; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; @@ -77,4 +78,31 @@ public class ParticipantAckMessage { this.participantType = source.participantType; this.participantId = source.participantId; } + + /** + * Determines if this message applies to this participant type. + * + * @param participantType type of the participant to match against + * @param participantId id of the participant to match against + * @return {@code true} if this message applies to this participant, {@code false} otherwise + */ + public boolean appliesTo(@NonNull final ToscaConceptIdentifier participantType, + @NonNull final ToscaConceptIdentifier participantId) { + // Broadcast message to all participants + if (this.participantType == null) { + return true; + } + + if (!participantType.equals(this.participantType)) { + return false; + } + + // Broadcast message to all control loop elements on this participant + if (this.participantId == null) { + return true; + } + + // Targeted message at this specific participant + return participantId.equals(this.participantId); + } } diff --git a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java index 3ca4d3d34..f98a88c3b 100644 --- a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java +++ b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java @@ -97,17 +97,16 @@ public class ParticipantMessage { return true; } - // Broadcast message to all control loop elements on this participant - if (participantType.equals(this.participantType) && this.participantId == null) { - return true; + if (!participantType.equals(this.participantType)) { + return false; } - // Targeted message at this specific participant - if (participantType.equals(this.participantType) && participantId.equals(this.participantId)) { + // Broadcast message to all control loop elements on this participant + if (this.participantId == null) { return true; } - // Message is not for this participant - return false; + // Targeted message at this specific participant + return participantId.equals(this.participantId); } } diff --git a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java index b9c1053f3..df82ab071 100644 --- a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java +++ b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java @@ -22,15 +22,23 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.assertSerializable; import java.util.UUID; import org.junit.jupiter.api.Test; import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; class ParticipantAckMessageTest { private ParticipantAckMessage message; + private static final ToscaConceptIdentifier PTYPE_456 = new ToscaConceptIdentifier("PType", "4.5.6"); + private static final ToscaConceptIdentifier PTYPE_457 = new ToscaConceptIdentifier("PType", "4.5.7"); + private static final ToscaConceptIdentifier ID_123 = new ToscaConceptIdentifier("id", "1.2.3"); + private static final ToscaConceptIdentifier ID_124 = new ToscaConceptIdentifier("id", "1.2.4"); + @Test void testCopyConstructor() throws CoderException { assertThatThrownBy(() -> new ParticipantAckMessage((ParticipantAckMessage) null)) @@ -51,9 +59,43 @@ class ParticipantAckMessageTest { assertSerializable(message, ParticipantAckMessage.class); } + @Test + void testAppliesTo_NullParticipantId() { + message = makeMessage(); + + assertThatThrownBy(() -> message.appliesTo(null, null)).isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> message.appliesTo(PTYPE_456, null)).isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> message.appliesTo(null, ID_123)).isInstanceOf(NullPointerException.class); + } + + @Test + void testAppliesTo_ParticipantIdMatches() { + message = makeMessage(); + + // ParticipantId matches + assertTrue(message.appliesTo(PTYPE_456, ID_123)); + assertFalse(message.appliesTo(PTYPE_456, ID_124)); + assertFalse(message.appliesTo(PTYPE_457, ID_123)); + } + + @Test + void testAppliesTo_ParticipantIdNoMatch() { + message = makeMessage(); + + // ParticipantId does not match + ToscaConceptIdentifier id = new ToscaConceptIdentifier(); + id.setName("id1111"); + id.setVersion("3.2.1"); + assertFalse(message.appliesTo(id, id)); + message.setParticipantType(null); + assertTrue(message.appliesTo(id, id)); + } + private ParticipantAckMessage makeMessage() { ParticipantAckMessage msg = new ParticipantAckMessage(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK); + msg.setParticipantType(PTYPE_456); + msg.setParticipantId(ID_123); msg.setMessage("Successfull Ack"); msg.setResult(true); msg.setResponseTo(UUID.randomUUID()); diff --git a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java index 924ad8fa8..58d3afebc 100644 --- a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java +++ b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java @@ -35,6 +35,11 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; class ParticipantMessageTest { private ParticipantMessage message; + private static final ToscaConceptIdentifier PTYPE_456 = new ToscaConceptIdentifier("PType", "4.5.6"); + private static final ToscaConceptIdentifier PTYPE_457 = new ToscaConceptIdentifier("PType", "4.5.7"); + private static final ToscaConceptIdentifier ID_123 = new ToscaConceptIdentifier("id", "1.2.3"); + private static final ToscaConceptIdentifier ID_124 = new ToscaConceptIdentifier("id", "1.2.4"); + @Test void testCopyConstructor() throws CoderException { assertThatThrownBy(() -> new ParticipantMessage((ParticipantMessage) null)) @@ -62,10 +67,8 @@ class ParticipantMessageTest { message = makeMessage(); assertThatThrownBy(() -> message.appliesTo(null, null)).isInstanceOf(NullPointerException.class); - assertThatThrownBy(() -> message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"), null)) - .isInstanceOf(NullPointerException.class); - assertThatThrownBy(() -> message.appliesTo(null, new ToscaConceptIdentifier("id", "1.2.3"))) - .isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> message.appliesTo(PTYPE_456, null)).isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> message.appliesTo(null, ID_123)).isInstanceOf(NullPointerException.class); } @Test @@ -73,12 +76,9 @@ class ParticipantMessageTest { message = makeMessage(); // ParticipantId matches - assertTrue(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"), - new ToscaConceptIdentifier("id", "1.2.3"))); - assertFalse(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"), - new ToscaConceptIdentifier("id", "1.2.4"))); - assertFalse(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.7"), - new ToscaConceptIdentifier("id", "1.2.3"))); + assertTrue(message.appliesTo(PTYPE_456, ID_123)); + assertFalse(message.appliesTo(PTYPE_456, ID_124)); + assertFalse(message.appliesTo(PTYPE_457, ID_123)); } @Test @@ -97,8 +97,8 @@ class ParticipantMessageTest { private ParticipantMessage makeMessage() { ParticipantMessage msg = new ParticipantMessage(ParticipantMessageType.PARTICIPANT_STATE_CHANGE); - msg.setParticipantType(new ToscaConceptIdentifier("PType", "4.5.6")); - msg.setParticipantId(new ToscaConceptIdentifier("id", "1.2.3")); + msg.setParticipantType(PTYPE_456); + msg.setParticipantId(ID_123); msg.setMessageId(UUID.randomUUID()); msg.setTimestamp(Instant.ofEpochMilli(3000)); diff --git a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java index 699df25e8..8187378ae 100644 --- a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java +++ b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java @@ -85,7 +85,8 @@ class ParticipantMessagesTest { synchronized (lockit) { ParticipantMessagePublisher participantMessagePublisher = - new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class))); + new ParticipantMessagePublisher(); + participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class))); participantMessagePublisher.sendParticipantRegister(participantRegisterMsg); } } @@ -113,7 +114,8 @@ class ParticipantMessagesTest { synchronized (lockit) { ParticipantMessagePublisher participantMessagePublisher = - new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class))); + new ParticipantMessagePublisher(); + participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class))); participantMessagePublisher.sendParticipantDeregister(participantDeregisterMsg); } } @@ -153,8 +155,8 @@ class ParticipantMessagesTest { participantUpdateAckMsg.setResult(true); synchronized (lockit) { - ParticipantMessagePublisher participantMessagePublisher = - new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class))); + ParticipantMessagePublisher participantMessagePublisher = new ParticipantMessagePublisher(); + participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class))); participantMessagePublisher.sendParticipantUpdateAck(participantUpdateAckMsg); } } @@ -163,8 +165,8 @@ class ParticipantMessagesTest { void testParticipantStatusHeartbeat() throws Exception { final ParticipantStatus heartbeat = participantHandler.makeHeartbeat(true); synchronized (lockit) { - ParticipantMessagePublisher publisher = - new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class))); + ParticipantMessagePublisher publisher = new ParticipantMessagePublisher(); + publisher.active(Collections.singletonList(Mockito.mock(TopicSink.class))); assertThatCode(() -> publisher.sendHeartbeat(heartbeat)).doesNotThrowAnyException(); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java index 0b9110bd0..d24f32f2f 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -39,4 +40,9 @@ public class ControlLoopStateChangeListener extends ParticipantListener<ControlL super(ControlLoopStateChange.class, participantHandler, participantHandler::handleControlLoopStateChange); } + + @Override + public String getType() { + return ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java index 56bc1fd9a..f9dec1863 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -38,4 +39,9 @@ public class ControlLoopUpdateListener extends ParticipantListener<ControlLoopUp public ControlLoopUpdateListener(final ParticipantHandler participantHandler) { super(ControlLoopUpdate.class, participantHandler, participantHandler::handleControlLoopUpdate); } + + @Override + public String getType() { + return ParticipantMessageType.CONTROL_LOOP_UPDATE.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java index 4b7224938..113f75dd5 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java @@ -24,6 +24,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import java.util.function.Consumer; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; @@ -32,7 +33,8 @@ import org.onap.policy.common.utils.coder.StandardCoderObject; /** * Abstract Listener for Participant Ack messages sent by runtime. */ -public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T> { +public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T> + implements Listener { private final ParticipantHandler participantHandler; private final Consumer<T> consumer; @@ -52,6 +54,13 @@ public abstract class ParticipantAckListener<T extends ParticipantAckMessage> ex @Override public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { - consumer.accept(message); + if (participantHandler.appliesTo(message)) { + consumer.accept(message); + } + } + + @Override + public ScoListener<T> getScoListener() { + return this; } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java index e20f481f8..5440e005b 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -39,4 +40,9 @@ public class ParticipantDeregisterAckListener extends ParticipantAckListener<Par public ParticipantDeregisterAckListener(final ParticipantHandler participantHandler) { super(ParticipantDeregisterAck.class, participantHandler, participantHandler::handleParticipantDeregisterAck); } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java index c6ad900b3..67af5c844 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java @@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import java.util.function.Consumer; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; @@ -30,7 +31,7 @@ import org.onap.policy.common.utils.coder.StandardCoderObject; /** * Abstract Listener for Participant messages sent by CLAMP. */ -public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> { +public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> implements Listener { private final ParticipantHandler participantHandler; private final Consumer<T> consumer; @@ -54,4 +55,9 @@ public abstract class ParticipantListener<T extends ParticipantMessage> extends consumer.accept(message); } } + + @Override + public ScoListener<T> getScoListener() { + return this; + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java index d8cc9eb6b..2941e9fd8 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java @@ -21,35 +21,43 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import java.util.List; +import javax.ws.rs.core.Response.Status; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Publisher; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * This class is used to send Participant Status messages to clamp using TopicSinkClient. * */ -public class ParticipantMessagePublisher { +@Component +public class ParticipantMessagePublisher implements Publisher { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class); - private final TopicSinkClient topicSinkClient; + private boolean active = false; + private TopicSinkClient topicSinkClient; /** * Constructor for instantiating ParticipantMessagePublisher. * * @param topicSinks the topic sinks */ - public ParticipantMessagePublisher(List<TopicSink> topicSinks) { + @Override + public void active(List<TopicSink> topicSinks) { if (topicSinks.size() != 1) { throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1"); } this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + active = true; } /** @@ -58,6 +66,9 @@ public class ParticipantMessagePublisher { * @param participantStatus the Participant Status */ public void sendParticipantStatus(final ParticipantStatus participantStatus) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantStatus); LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus); } @@ -68,6 +79,9 @@ public class ParticipantMessagePublisher { * @param participantRegister the Participant Status */ public void sendParticipantRegister(final ParticipantRegister participantRegister) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantRegister); LOGGER.debug("Sent Participant Register message to CLAMP - {}", participantRegister); } @@ -78,6 +92,9 @@ public class ParticipantMessagePublisher { * @param participantDeregister the Participant Status */ public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantDeregister); LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister); } @@ -88,6 +105,9 @@ public class ParticipantMessagePublisher { * @param participantUpdateAck the Participant Update Ack */ public void sendParticipantUpdateAck(final ParticipantUpdateAck participantUpdateAck) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantUpdateAck); LOGGER.debug("Sent Participant Update Ack message to CLAMP - {}", participantUpdateAck); } @@ -98,6 +118,9 @@ public class ParticipantMessagePublisher { * @param controlLoopAck ControlLoop Update/StateChange Ack */ public void sendControlLoopAck(final ControlLoopAck controlLoopAck) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(controlLoopAck); LOGGER.debug("Sent ControlLoop Update/StateChange Ack to runtime - {}", controlLoopAck); } @@ -108,7 +131,15 @@ public class ParticipantMessagePublisher { * @param participantStatus the Participant Status */ public void sendHeartbeat(final ParticipantStatus participantStatus) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantStatus); LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus); } + + @Override + public void stop() { + active = false; + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java index a15a2a850..7be460815 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -39,4 +40,9 @@ public class ParticipantRegisterAckListener extends ParticipantAckListener<Parti public ParticipantRegisterAckListener(final ParticipantHandler participantHandler) { super(ParticipantRegisterAck.class, participantHandler, participantHandler::handleParticipantRegisterAck); } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java index 0881edb19..9e978fe75 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -38,4 +39,9 @@ public class ParticipantStatusReqListener extends ParticipantListener<Participan public ParticipantStatusReqListener(final ParticipantHandler participantHandler) { super(ParticipantStatusReq.class, participantHandler, participantHandler::handleParticipantStatusReq); } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_STATUS_REQ.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java index 42bd52d9a..da45501e7 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -38,4 +39,9 @@ public class ParticipantUpdateListener extends ParticipantListener<ParticipantUp public ParticipantUpdateListener(final ParticipantHandler participantHandler) { super(ParticipantUpdate.class, participantHandler, participantHandler::handleParticipantUpdate); } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_UPDATE.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java deleted file mode 100644 index e363504a5..000000000 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.clamp.controlloop.participant.intermediary.config; - -import java.util.List; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; -import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class BeanFactory { - - // Name of the message type for messages on topics - private static final String[] MSG_TYPE_NAMES = {"messageType"}; - - /** - * create ParticipantMessagePublisher. - * - * @param parameters the ParticipantParameters - * @return ParticipantMessagePublisher - */ - @Bean - public ParticipantMessagePublisher publisher(final ParticipantParameters parameters) { - List<TopicSink> topicSinks = TopicEndpointManager.getManager() - .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); - return new ParticipantMessagePublisher(topicSinks); - } - - @Bean - public MessageTypeDispatcher msgDispatcher() { - return new MessageTypeDispatcher(MSG_TYPE_NAMES); - } -} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java index 4fc0ae1b1..754bf2887 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -23,20 +23,13 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; import java.io.Closeable; import java.io.IOException; import java.util.List; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusReqListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.utils.services.ServiceManagerContainer; -import org.springframework.context.ApplicationContext; import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; @@ -48,32 +41,50 @@ import org.springframework.stereotype.Component; @Component public class IntermediaryActivator extends ServiceManagerContainer implements Closeable { - private final ApplicationContext applicationContext; + private static final String[] MSG_TYPE_NAMES = {"messageType"}; // Topics from which the participant receives and to which the participant sends messages + private List<TopicSink> topicSinks; private List<TopicSource> topicSources; ParticipantIntermediaryApi participantIntermediaryApi; + private final MessageTypeDispatcher msgDispatcher; + /** * Instantiate the activator for participant. * - * @param applicationContext ApplicationContext * @param parameters the ParticipantParameters + * @param publishers list of Publishers + * @param listeners list of Listeners */ - public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters, - ParticipantIntermediaryApi participantIntermediaryApi) { - this.applicationContext = applicationContext; + public IntermediaryActivator(final ParticipantParameters parameters, + ParticipantIntermediaryApi participantIntermediaryApi, List<Publisher> publishers, + List<Listener> listeners) { this.participantIntermediaryApi = participantIntermediaryApi; + topicSinks = TopicEndpointManager.getManager() + .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); + topicSources = TopicEndpointManager.getManager() .addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources()); - // @formatter:off + msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + // @formatter:off addAction("Topic endpoint management", - () -> TopicEndpointManager.getManager().start(), - () -> TopicEndpointManager.getManager().shutdown()); + () -> TopicEndpointManager.getManager().start(), + () -> TopicEndpointManager.getManager().shutdown()); + + publishers.forEach(publisher -> + addAction("Publisher " + publisher.getClass().getSimpleName(), + () -> publisher.active(topicSinks), + publisher::stop)); + + listeners.forEach(listener -> + addAction("Listener " + listener.getClass().getSimpleName(), + () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), + () -> msgDispatcher.unregister(listener.getType()))); addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); // @formatter:on @@ -117,26 +128,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { - MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS_REQ.name(), - applicationContext.getBean(ParticipantStatusReqListener.class)); - - msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name(), - applicationContext.getBean(ControlLoopStateChangeListener.class)); - - msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_UPDATE.name(), - applicationContext.getBean(ControlLoopUpdateListener.class)); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(), - applicationContext.getBean(ParticipantRegisterAckListener.class)); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(), - applicationContext.getBean(ParticipantDeregisterAckListener.class)); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_UPDATE.name(), - applicationContext.getBean(ParticipantUpdateListener.class)); - for (final TopicSource source : topicSources) { source.register(msgDispatcher); } @@ -146,8 +137,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { - MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); - for (final TopicSource source : topicSources) { source.unregister(msgDispatcher); } @@ -155,6 +144,8 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl @Override public void close() throws IOException { - super.shutdown(); + if (isAlive()) { + super.shutdown(); + } } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java new file mode 100644 index 000000000..bca71afda --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import org.onap.policy.common.endpoints.listeners.ScoListener; + +public interface Listener { + + /** + * Get the type of message of interest to the listener. + * + * @return type of message of interest to the listener + */ + String getType(); + + /** + * Get listener to register. + * + * @return listener to register + */ + <T> ScoListener<T> getScoListener(); +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java index 1947fda1a..66e09e7f6 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -43,6 +43,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; @@ -197,6 +198,16 @@ public class ParticipantHandler implements Closeable { } /** + * Check if a participant message applies to this participant handler. + * + * @param participantMsg the message to check + * @return true if it applies, false otherwise + */ + public boolean appliesTo(ParticipantAckMessage participantMsg) { + return participantMsg.appliesTo(participantType, participantId); + } + + /** * Method to send ParticipantRegister message to controlloop runtime. */ public void sendParticipantRegister() { diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java new file mode 100644 index 000000000..287d7c055 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.TopicSink; + +/** + * Publisher. + */ +public interface Publisher { + + void active(List<TopicSink> topicSinks); + + void stop(); +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java index 891dab9ae..d196dd193 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java @@ -23,8 +23,6 @@ package org.onap.policy.clamp.controlloop.runtime.config.messaging; import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.stream.Stream; -import javax.ws.rs.core.Response.Status; import lombok.Getter; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; @@ -33,6 +31,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @@ -53,36 +52,31 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen * Constructor. * * @param clRuntimeParameterGroup the parameters for the control loop runtime service - * @param publishers array of Publishers - * @param listeners array of Listeners + * @param publishers list of Publishers + * @param listeners list of Listeners * @throws ControlLoopRuntimeException if the activator does not start */ - public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, Publisher[] publishers, - Listener[] listeners) { + public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, List<Publisher> publishers, + List<Listener> listeners) { topicSinks = TopicEndpointManager.getManager() .addTopicSinks(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSinks()); topicSources = TopicEndpointManager.getManager() .addTopicSources(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSources()); - try { - msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - } catch (final RuntimeException e) { - throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, - "topic message dispatcher failed to start", e); - } + msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); // @formatter:off addAction("Topic endpoint management", () -> TopicEndpointManager.getManager().start(), () -> TopicEndpointManager.getManager().shutdown()); - Stream.of(publishers).forEach(publisher -> + publishers.forEach(publisher -> addAction("Publisher " + publisher.getClass().getSimpleName(), () -> publisher.active(topicSinks), - () -> publisher.stop())); + publisher::stop)); - Stream.of(listeners).forEach(listener -> + listeners.forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(), () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), () -> msgDispatcher.unregister(listener.getType()))); @@ -121,10 +115,22 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen } } + /** + * Handle ContextClosedEvent. + * + * @param ctxClosedEvent ContextClosedEvent + */ + @EventListener + public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) { + if (isAlive()) { + stop(); + } + } + @Override public void close() throws IOException { if (isAlive()) { - stop(); + super.shutdown(); } } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java index 2cc0f94e2..b39573461 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java @@ -139,7 +139,8 @@ public class SupervisionHandler { public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) { LOGGER.debug("Participant Register received {}", participantRegisterMessage); - participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId()); + participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId(), + participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType()); participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType(), true); @@ -358,15 +359,15 @@ public class SupervisionHandler { throws PfModelException, ControlLoopException { if (participantStatusMessage.getControlLoopInfoList() != null) { for (ControlLoopInfo clEntry : participantStatusMessage.getControlLoopInfoList()) { - var dbControlLoop = controlLoopProvider.getControlLoop( - new ToscaConceptIdentifier(clEntry.getControlLoopId())); + var dbControlLoop = + controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(clEntry.getControlLoopId())); if (dbControlLoop == null) { exceptionOccured(Response.Status.NOT_FOUND, "PARTICIPANT_STATUS control loop not found in database: " + clEntry.getControlLoopId()); } dbControlLoop.setState(clEntry.getState()); - monitoringProvider.createClElementStatistics(clEntry.getControlLoopStatistics() - .getClElementStatisticsList().getClElementStatistics()); + monitoringProvider.createClElementStatistics( + clEntry.getControlLoopStatistics().getClElementStatisticsList().getClElementStatistics()); } } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java index 151b04cbf..a05337991 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java @@ -147,7 +147,7 @@ public class SupervisionScanner { if (participantUpdateCounter.count(id)) { LOGGER.debug("retry message ParticipantUpdate"); - participantUpdatePublisher.send(id.getLeft(), id.getRight()); + participantUpdatePublisher.send(id.getLeft(), id.getRight(), true); } else { LOGGER.debug("report Participant Update fault"); participantUpdateCounter.setFault(id); diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java index 73860b5c2..8cbaec8b1 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java @@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; import java.util.UUID; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.springframework.stereotype.Component; /** @@ -34,9 +35,13 @@ public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPubli * Send ParticipantRegisterAck to Participant. * * @param responseTo the original request id in the request. + * @param participantId the participant Id + * @param participantType the participant Type */ - public void send(UUID responseTo) { + public void send(UUID responseTo, ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) { var message = new ParticipantRegisterAck(); + message.setParticipantId(participantId); + message.setParticipantType(participantType); message.setResponseTo(responseTo); message.setMessage("Participant Register Ack"); message.setResult(true); diff --git a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java index 461c8b558..936bb1444 100644 --- a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java +++ b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.List; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener; @@ -56,7 +57,7 @@ class MessageDispatcherActivatorTest { var publisherFirst = spy(mock(Publisher.class)); var publisherSecond = spy(mock(Publisher.class)); - var publishers = new Publisher[] {publisherFirst, publisherSecond}; + var publishers = List.of(publisherFirst, publisherSecond); var listenerFirst = spy(mock(ParticipantStatusListener.class)); when(listenerFirst.getType()).thenReturn(TOPIC_FIRST); @@ -66,7 +67,7 @@ class MessageDispatcherActivatorTest { when(listenerSecond.getType()).thenReturn(TOPIC_SECOND); when(listenerSecond.getScoListener()).thenReturn(listenerSecond); - var listeners = new Listener[] {listenerFirst, listenerSecond}; + List<Listener> listeners = List.of(listenerFirst, listenerSecond); try (var activator = new MessageDispatcherActivator(parameterGroup, publishers, listeners)) { |