aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java28
-rw-r--r--models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java13
-rw-r--r--models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java42
-rw-r--r--models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java24
-rw-r--r--participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java14
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java13
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java8
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java37
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java55
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java69
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java40
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java11
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java34
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java38
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java11
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java2
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java7
-rw-r--r--runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java5
24 files changed, 337 insertions, 150 deletions
diff --git a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java
index 8b59a1801..c6f5c61b9 100644
--- a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java
+++ b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessage.java
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant;
import java.util.UUID;
import lombok.Getter;
+import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -77,4 +78,31 @@ public class ParticipantAckMessage {
this.participantType = source.participantType;
this.participantId = source.participantId;
}
+
+ /**
+ * Determines if this message applies to this participant type.
+ *
+ * @param participantType type of the participant to match against
+ * @param participantId id of the participant to match against
+ * @return {@code true} if this message applies to this participant, {@code false} otherwise
+ */
+ public boolean appliesTo(@NonNull final ToscaConceptIdentifier participantType,
+ @NonNull final ToscaConceptIdentifier participantId) {
+ // Broadcast message to all participants
+ if (this.participantType == null) {
+ return true;
+ }
+
+ if (!participantType.equals(this.participantType)) {
+ return false;
+ }
+
+ // Broadcast message to all control loop elements on this participant
+ if (this.participantId == null) {
+ return true;
+ }
+
+ // Targeted message at this specific participant
+ return participantId.equals(this.participantId);
+ }
}
diff --git a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java
index 3ca4d3d34..f98a88c3b 100644
--- a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java
+++ b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessage.java
@@ -97,17 +97,16 @@ public class ParticipantMessage {
return true;
}
- // Broadcast message to all control loop elements on this participant
- if (participantType.equals(this.participantType) && this.participantId == null) {
- return true;
+ if (!participantType.equals(this.participantType)) {
+ return false;
}
- // Targeted message at this specific participant
- if (participantType.equals(this.participantType) && participantId.equals(this.participantId)) {
+ // Broadcast message to all control loop elements on this participant
+ if (this.participantId == null) {
return true;
}
- // Message is not for this participant
- return false;
+ // Targeted message at this specific participant
+ return participantId.equals(this.participantId);
}
}
diff --git a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java
index b9c1053f3..df82ab071 100644
--- a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java
+++ b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantAckMessageTest.java
@@ -22,15 +22,23 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.assertSerializable;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
class ParticipantAckMessageTest {
private ParticipantAckMessage message;
+ private static final ToscaConceptIdentifier PTYPE_456 = new ToscaConceptIdentifier("PType", "4.5.6");
+ private static final ToscaConceptIdentifier PTYPE_457 = new ToscaConceptIdentifier("PType", "4.5.7");
+ private static final ToscaConceptIdentifier ID_123 = new ToscaConceptIdentifier("id", "1.2.3");
+ private static final ToscaConceptIdentifier ID_124 = new ToscaConceptIdentifier("id", "1.2.4");
+
@Test
void testCopyConstructor() throws CoderException {
assertThatThrownBy(() -> new ParticipantAckMessage((ParticipantAckMessage) null))
@@ -51,9 +59,43 @@ class ParticipantAckMessageTest {
assertSerializable(message, ParticipantAckMessage.class);
}
+ @Test
+ void testAppliesTo_NullParticipantId() {
+ message = makeMessage();
+
+ assertThatThrownBy(() -> message.appliesTo(null, null)).isInstanceOf(NullPointerException.class);
+ assertThatThrownBy(() -> message.appliesTo(PTYPE_456, null)).isInstanceOf(NullPointerException.class);
+ assertThatThrownBy(() -> message.appliesTo(null, ID_123)).isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ void testAppliesTo_ParticipantIdMatches() {
+ message = makeMessage();
+
+ // ParticipantId matches
+ assertTrue(message.appliesTo(PTYPE_456, ID_123));
+ assertFalse(message.appliesTo(PTYPE_456, ID_124));
+ assertFalse(message.appliesTo(PTYPE_457, ID_123));
+ }
+
+ @Test
+ void testAppliesTo_ParticipantIdNoMatch() {
+ message = makeMessage();
+
+ // ParticipantId does not match
+ ToscaConceptIdentifier id = new ToscaConceptIdentifier();
+ id.setName("id1111");
+ id.setVersion("3.2.1");
+ assertFalse(message.appliesTo(id, id));
+ message.setParticipantType(null);
+ assertTrue(message.appliesTo(id, id));
+ }
+
private ParticipantAckMessage makeMessage() {
ParticipantAckMessage msg = new ParticipantAckMessage(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK);
+ msg.setParticipantType(PTYPE_456);
+ msg.setParticipantId(ID_123);
msg.setMessage("Successfull Ack");
msg.setResult(true);
msg.setResponseTo(UUID.randomUUID());
diff --git a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java
index 924ad8fa8..58d3afebc 100644
--- a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java
+++ b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ParticipantMessageTest.java
@@ -35,6 +35,11 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
class ParticipantMessageTest {
private ParticipantMessage message;
+ private static final ToscaConceptIdentifier PTYPE_456 = new ToscaConceptIdentifier("PType", "4.5.6");
+ private static final ToscaConceptIdentifier PTYPE_457 = new ToscaConceptIdentifier("PType", "4.5.7");
+ private static final ToscaConceptIdentifier ID_123 = new ToscaConceptIdentifier("id", "1.2.3");
+ private static final ToscaConceptIdentifier ID_124 = new ToscaConceptIdentifier("id", "1.2.4");
+
@Test
void testCopyConstructor() throws CoderException {
assertThatThrownBy(() -> new ParticipantMessage((ParticipantMessage) null))
@@ -62,10 +67,8 @@ class ParticipantMessageTest {
message = makeMessage();
assertThatThrownBy(() -> message.appliesTo(null, null)).isInstanceOf(NullPointerException.class);
- assertThatThrownBy(() -> message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"), null))
- .isInstanceOf(NullPointerException.class);
- assertThatThrownBy(() -> message.appliesTo(null, new ToscaConceptIdentifier("id", "1.2.3")))
- .isInstanceOf(NullPointerException.class);
+ assertThatThrownBy(() -> message.appliesTo(PTYPE_456, null)).isInstanceOf(NullPointerException.class);
+ assertThatThrownBy(() -> message.appliesTo(null, ID_123)).isInstanceOf(NullPointerException.class);
}
@Test
@@ -73,12 +76,9 @@ class ParticipantMessageTest {
message = makeMessage();
// ParticipantId matches
- assertTrue(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"),
- new ToscaConceptIdentifier("id", "1.2.3")));
- assertFalse(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"),
- new ToscaConceptIdentifier("id", "1.2.4")));
- assertFalse(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.7"),
- new ToscaConceptIdentifier("id", "1.2.3")));
+ assertTrue(message.appliesTo(PTYPE_456, ID_123));
+ assertFalse(message.appliesTo(PTYPE_456, ID_124));
+ assertFalse(message.appliesTo(PTYPE_457, ID_123));
}
@Test
@@ -97,8 +97,8 @@ class ParticipantMessageTest {
private ParticipantMessage makeMessage() {
ParticipantMessage msg = new ParticipantMessage(ParticipantMessageType.PARTICIPANT_STATE_CHANGE);
- msg.setParticipantType(new ToscaConceptIdentifier("PType", "4.5.6"));
- msg.setParticipantId(new ToscaConceptIdentifier("id", "1.2.3"));
+ msg.setParticipantType(PTYPE_456);
+ msg.setParticipantId(ID_123);
msg.setMessageId(UUID.randomUUID());
msg.setTimestamp(Instant.ofEpochMilli(3000));
diff --git a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java
index 699df25e8..8187378ae 100644
--- a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java
+++ b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java
@@ -85,7 +85,8 @@ class ParticipantMessagesTest {
synchronized (lockit) {
ParticipantMessagePublisher participantMessagePublisher =
- new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+ new ParticipantMessagePublisher();
+ participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
participantMessagePublisher.sendParticipantRegister(participantRegisterMsg);
}
}
@@ -113,7 +114,8 @@ class ParticipantMessagesTest {
synchronized (lockit) {
ParticipantMessagePublisher participantMessagePublisher =
- new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+ new ParticipantMessagePublisher();
+ participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
participantMessagePublisher.sendParticipantDeregister(participantDeregisterMsg);
}
}
@@ -153,8 +155,8 @@ class ParticipantMessagesTest {
participantUpdateAckMsg.setResult(true);
synchronized (lockit) {
- ParticipantMessagePublisher participantMessagePublisher =
- new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+ ParticipantMessagePublisher participantMessagePublisher = new ParticipantMessagePublisher();
+ participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
participantMessagePublisher.sendParticipantUpdateAck(participantUpdateAckMsg);
}
}
@@ -163,8 +165,8 @@ class ParticipantMessagesTest {
void testParticipantStatusHeartbeat() throws Exception {
final ParticipantStatus heartbeat = participantHandler.makeHeartbeat(true);
synchronized (lockit) {
- ParticipantMessagePublisher publisher =
- new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+ ParticipantMessagePublisher publisher = new ParticipantMessagePublisher();
+ publisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
assertThatCode(() -> publisher.sendHeartbeat(heartbeat)).doesNotThrowAnyException();
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
index 0b9110bd0..d24f32f2f 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -39,4 +40,9 @@ public class ControlLoopStateChangeListener extends ParticipantListener<ControlL
super(ControlLoopStateChange.class, participantHandler,
participantHandler::handleControlLoopStateChange);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
index 56bc1fd9a..f9dec1863 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -38,4 +39,9 @@ public class ControlLoopUpdateListener extends ParticipantListener<ControlLoopUp
public ControlLoopUpdateListener(final ParticipantHandler participantHandler) {
super(ControlLoopUpdate.class, participantHandler, participantHandler::handleControlLoopUpdate);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.CONTROL_LOOP_UPDATE.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java
index 4b7224938..113f75dd5 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java
@@ -24,6 +24,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import java.util.function.Consumer;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
@@ -32,7 +33,8 @@ import org.onap.policy.common.utils.coder.StandardCoderObject;
/**
* Abstract Listener for Participant Ack messages sent by runtime.
*/
-public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T> {
+public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T>
+ implements Listener {
private final ParticipantHandler participantHandler;
private final Consumer<T> consumer;
@@ -52,6 +54,13 @@ public abstract class ParticipantAckListener<T extends ParticipantAckMessage> ex
@Override
public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) {
- consumer.accept(message);
+ if (participantHandler.appliesTo(message)) {
+ consumer.accept(message);
+ }
+ }
+
+ @Override
+ public ScoListener<T> getScoListener() {
+ return this;
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java
index e20f481f8..5440e005b 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -39,4 +40,9 @@ public class ParticipantDeregisterAckListener extends ParticipantAckListener<Par
public ParticipantDeregisterAckListener(final ParticipantHandler participantHandler) {
super(ParticipantDeregisterAck.class, participantHandler, participantHandler::handleParticipantDeregisterAck);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java
index c6ad900b3..67af5c844 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import java.util.function.Consumer;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
@@ -30,7 +31,7 @@ import org.onap.policy.common.utils.coder.StandardCoderObject;
/**
* Abstract Listener for Participant messages sent by CLAMP.
*/
-public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> {
+public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> implements Listener {
private final ParticipantHandler participantHandler;
private final Consumer<T> consumer;
@@ -54,4 +55,9 @@ public abstract class ParticipantListener<T extends ParticipantMessage> extends
consumer.accept(message);
}
}
+
+ @Override
+ public ScoListener<T> getScoListener() {
+ return this;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java
index d8cc9eb6b..2941e9fd8 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java
@@ -21,35 +21,43 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import java.util.List;
+import javax.ws.rs.core.Response.Status;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Publisher;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* This class is used to send Participant Status messages to clamp using TopicSinkClient.
*
*/
-public class ParticipantMessagePublisher {
+@Component
+public class ParticipantMessagePublisher implements Publisher {
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class);
- private final TopicSinkClient topicSinkClient;
+ private boolean active = false;
+ private TopicSinkClient topicSinkClient;
/**
* Constructor for instantiating ParticipantMessagePublisher.
*
* @param topicSinks the topic sinks
*/
- public ParticipantMessagePublisher(List<TopicSink> topicSinks) {
+ @Override
+ public void active(List<TopicSink> topicSinks) {
if (topicSinks.size() != 1) {
throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1");
}
this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ active = true;
}
/**
@@ -58,6 +66,9 @@ public class ParticipantMessagePublisher {
* @param participantStatus the Participant Status
*/
public void sendParticipantStatus(final ParticipantStatus participantStatus) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantStatus);
LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus);
}
@@ -68,6 +79,9 @@ public class ParticipantMessagePublisher {
* @param participantRegister the Participant Status
*/
public void sendParticipantRegister(final ParticipantRegister participantRegister) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantRegister);
LOGGER.debug("Sent Participant Register message to CLAMP - {}", participantRegister);
}
@@ -78,6 +92,9 @@ public class ParticipantMessagePublisher {
* @param participantDeregister the Participant Status
*/
public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantDeregister);
LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister);
}
@@ -88,6 +105,9 @@ public class ParticipantMessagePublisher {
* @param participantUpdateAck the Participant Update Ack
*/
public void sendParticipantUpdateAck(final ParticipantUpdateAck participantUpdateAck) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantUpdateAck);
LOGGER.debug("Sent Participant Update Ack message to CLAMP - {}", participantUpdateAck);
}
@@ -98,6 +118,9 @@ public class ParticipantMessagePublisher {
* @param controlLoopAck ControlLoop Update/StateChange Ack
*/
public void sendControlLoopAck(final ControlLoopAck controlLoopAck) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(controlLoopAck);
LOGGER.debug("Sent ControlLoop Update/StateChange Ack to runtime - {}", controlLoopAck);
}
@@ -108,7 +131,15 @@ public class ParticipantMessagePublisher {
* @param participantStatus the Participant Status
*/
public void sendHeartbeat(final ParticipantStatus participantStatus) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantStatus);
LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus);
}
+
+ @Override
+ public void stop() {
+ active = false;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java
index a15a2a850..7be460815 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -39,4 +40,9 @@ public class ParticipantRegisterAckListener extends ParticipantAckListener<Parti
public ParticipantRegisterAckListener(final ParticipantHandler participantHandler) {
super(ParticipantRegisterAck.class, participantHandler, participantHandler::handleParticipantRegisterAck);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java
index 0881edb19..9e978fe75 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -38,4 +39,9 @@ public class ParticipantStatusReqListener extends ParticipantListener<Participan
public ParticipantStatusReqListener(final ParticipantHandler participantHandler) {
super(ParticipantStatusReq.class, participantHandler, participantHandler::handleParticipantStatusReq);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_STATUS_REQ.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java
index 42bd52d9a..da45501e7 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -38,4 +39,9 @@ public class ParticipantUpdateListener extends ParticipantListener<ParticipantUp
public ParticipantUpdateListener(final ParticipantHandler participantHandler) {
super(ParticipantUpdate.class, participantHandler, participantHandler::handleParticipantUpdate);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_UPDATE.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java
deleted file mode 100644
index e363504a5..000000000
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.clamp.controlloop.participant.intermediary.config;
-
-import java.util.List;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class BeanFactory {
-
- // Name of the message type for messages on topics
- private static final String[] MSG_TYPE_NAMES = {"messageType"};
-
- /**
- * create ParticipantMessagePublisher.
- *
- * @param parameters the ParticipantParameters
- * @return ParticipantMessagePublisher
- */
- @Bean
- public ParticipantMessagePublisher publisher(final ParticipantParameters parameters) {
- List<TopicSink> topicSinks = TopicEndpointManager.getManager()
- .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks());
- return new ParticipantMessagePublisher(topicSinks);
- }
-
- @Bean
- public MessageTypeDispatcher msgDispatcher() {
- return new MessageTypeDispatcher(MSG_TYPE_NAMES);
- }
-}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
index 4fc0ae1b1..754bf2887 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
@@ -23,20 +23,13 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusReqListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener;
import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
-import org.springframework.context.ApplicationContext;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
@@ -48,32 +41,50 @@ import org.springframework.stereotype.Component;
@Component
public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
- private final ApplicationContext applicationContext;
+ private static final String[] MSG_TYPE_NAMES = {"messageType"};
// Topics from which the participant receives and to which the participant sends messages
+ private List<TopicSink> topicSinks;
private List<TopicSource> topicSources;
ParticipantIntermediaryApi participantIntermediaryApi;
+ private final MessageTypeDispatcher msgDispatcher;
+
/**
* Instantiate the activator for participant.
*
- * @param applicationContext ApplicationContext
* @param parameters the ParticipantParameters
+ * @param publishers list of Publishers
+ * @param listeners list of Listeners
*/
- public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters,
- ParticipantIntermediaryApi participantIntermediaryApi) {
- this.applicationContext = applicationContext;
+ public IntermediaryActivator(final ParticipantParameters parameters,
+ ParticipantIntermediaryApi participantIntermediaryApi, List<Publisher> publishers,
+ List<Listener> listeners) {
this.participantIntermediaryApi = participantIntermediaryApi;
+ topicSinks = TopicEndpointManager.getManager()
+ .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks());
+
topicSources = TopicEndpointManager.getManager()
.addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources());
- // @formatter:off
+ msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ // @formatter:off
addAction("Topic endpoint management",
- () -> TopicEndpointManager.getManager().start(),
- () -> TopicEndpointManager.getManager().shutdown());
+ () -> TopicEndpointManager.getManager().start(),
+ () -> TopicEndpointManager.getManager().shutdown());
+
+ publishers.forEach(publisher ->
+ addAction("Publisher " + publisher.getClass().getSimpleName(),
+ () -> publisher.active(topicSinks),
+ publisher::stop));
+
+ listeners.forEach(listener ->
+ addAction("Listener " + listener.getClass().getSimpleName(),
+ () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
+ () -> msgDispatcher.unregister(listener.getType())));
addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
// @formatter:on
@@ -117,26 +128,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher() {
- MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS_REQ.name(),
- applicationContext.getBean(ParticipantStatusReqListener.class));
-
- msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name(),
- applicationContext.getBean(ControlLoopStateChangeListener.class));
-
- msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_UPDATE.name(),
- applicationContext.getBean(ControlLoopUpdateListener.class));
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(),
- applicationContext.getBean(ParticipantRegisterAckListener.class));
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(),
- applicationContext.getBean(ParticipantDeregisterAckListener.class));
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_UPDATE.name(),
- applicationContext.getBean(ParticipantUpdateListener.class));
-
for (final TopicSource source : topicSources) {
source.register(msgDispatcher);
}
@@ -146,8 +137,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
* Unregisters the dispatcher from the topic source(s).
*/
private void unregisterMsgDispatcher() {
- MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
-
for (final TopicSource source : topicSources) {
source.unregister(msgDispatcher);
}
@@ -155,6 +144,8 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
@Override
public void close() throws IOException {
- super.shutdown();
+ if (isAlive()) {
+ super.shutdown();
+ }
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java
new file mode 100644
index 000000000..bca71afda
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+
+public interface Listener {
+
+ /**
+ * Get the type of message of interest to the listener.
+ *
+ * @return type of message of interest to the listener
+ */
+ String getType();
+
+ /**
+ * Get listener to register.
+ *
+ * @return listener to register
+ */
+ <T> ScoListener<T> getScoListener();
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
index 1947fda1a..66e09e7f6 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
@@ -43,6 +43,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
@@ -197,6 +198,16 @@ public class ParticipantHandler implements Closeable {
}
/**
+ * Check if a participant message applies to this participant handler.
+ *
+ * @param participantMsg the message to check
+ * @return true if it applies, false otherwise
+ */
+ public boolean appliesTo(ParticipantAckMessage participantMsg) {
+ return participantMsg.appliesTo(participantType, participantId);
+ }
+
+ /**
* Method to send ParticipantRegister message to controlloop runtime.
*/
public void sendParticipantRegister() {
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java
new file mode 100644
index 000000000..287d7c055
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import java.util.List;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+
+/**
+ * Publisher.
+ */
+public interface Publisher {
+
+ void active(List<TopicSink> topicSinks);
+
+ void stop();
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java
index 891dab9ae..d196dd193 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java
@@ -23,8 +23,6 @@ package org.onap.policy.clamp.controlloop.runtime.config.messaging;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import java.util.stream.Stream;
-import javax.ws.rs.core.Response.Status;
import lombok.Getter;
import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
@@ -33,6 +31,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@@ -53,36 +52,31 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
* Constructor.
*
* @param clRuntimeParameterGroup the parameters for the control loop runtime service
- * @param publishers array of Publishers
- * @param listeners array of Listeners
+ * @param publishers list of Publishers
+ * @param listeners list of Listeners
* @throws ControlLoopRuntimeException if the activator does not start
*/
- public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, Publisher[] publishers,
- Listener[] listeners) {
+ public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, List<Publisher> publishers,
+ List<Listener> listeners) {
topicSinks = TopicEndpointManager.getManager()
.addTopicSinks(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSinks());
topicSources = TopicEndpointManager.getManager()
.addTopicSources(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSources());
- try {
- msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
- } catch (final RuntimeException e) {
- throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
- "topic message dispatcher failed to start", e);
- }
+ msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
// @formatter:off
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
- Stream.of(publishers).forEach(publisher ->
+ publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
() -> publisher.active(topicSinks),
- () -> publisher.stop()));
+ publisher::stop));
- Stream.of(listeners).forEach(listener ->
+ listeners.forEach(listener ->
addAction("Listener " + listener.getClass().getSimpleName(),
() -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
() -> msgDispatcher.unregister(listener.getType())));
@@ -121,10 +115,22 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
}
}
+ /**
+ * Handle ContextClosedEvent.
+ *
+ * @param ctxClosedEvent ContextClosedEvent
+ */
+ @EventListener
+ public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
+ if (isAlive()) {
+ stop();
+ }
+ }
+
@Override
public void close() throws IOException {
if (isAlive()) {
- stop();
+ super.shutdown();
}
}
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
index 2cc0f94e2..b39573461 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
@@ -139,7 +139,8 @@ public class SupervisionHandler {
public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
LOGGER.debug("Participant Register received {}", participantRegisterMessage);
- participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId());
+ participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId(),
+ participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType());
participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(),
participantRegisterMessage.getParticipantType(), true);
@@ -358,15 +359,15 @@ public class SupervisionHandler {
throws PfModelException, ControlLoopException {
if (participantStatusMessage.getControlLoopInfoList() != null) {
for (ControlLoopInfo clEntry : participantStatusMessage.getControlLoopInfoList()) {
- var dbControlLoop = controlLoopProvider.getControlLoop(
- new ToscaConceptIdentifier(clEntry.getControlLoopId()));
+ var dbControlLoop =
+ controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(clEntry.getControlLoopId()));
if (dbControlLoop == null) {
exceptionOccured(Response.Status.NOT_FOUND,
"PARTICIPANT_STATUS control loop not found in database: " + clEntry.getControlLoopId());
}
dbControlLoop.setState(clEntry.getState());
- monitoringProvider.createClElementStatistics(clEntry.getControlLoopStatistics()
- .getClElementStatisticsList().getClElementStatistics());
+ monitoringProvider.createClElementStatistics(
+ clEntry.getControlLoopStatistics().getClElementStatisticsList().getClElementStatistics());
}
}
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
index 151b04cbf..a05337991 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
@@ -147,7 +147,7 @@ public class SupervisionScanner {
if (participantUpdateCounter.count(id)) {
LOGGER.debug("retry message ParticipantUpdate");
- participantUpdatePublisher.send(id.getLeft(), id.getRight());
+ participantUpdatePublisher.send(id.getLeft(), id.getRight(), true);
} else {
LOGGER.debug("report Participant Update fault");
participantUpdateCounter.setFault(id);
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
index 73860b5c2..8cbaec8b1 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
import java.util.UUID;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.springframework.stereotype.Component;
/**
@@ -34,9 +35,13 @@ public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPubli
* Send ParticipantRegisterAck to Participant.
*
* @param responseTo the original request id in the request.
+ * @param participantId the participant Id
+ * @param participantType the participant Type
*/
- public void send(UUID responseTo) {
+ public void send(UUID responseTo, ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) {
var message = new ParticipantRegisterAck();
+ message.setParticipantId(participantId);
+ message.setParticipantType(participantType);
message.setResponseTo(responseTo);
message.setMessage("Participant Register Ack");
message.setResult(true);
diff --git a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java
index 461c8b558..936bb1444 100644
--- a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java
+++ b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivatorTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.List;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
@@ -56,7 +57,7 @@ class MessageDispatcherActivatorTest {
var publisherFirst = spy(mock(Publisher.class));
var publisherSecond = spy(mock(Publisher.class));
- var publishers = new Publisher[] {publisherFirst, publisherSecond};
+ var publishers = List.of(publisherFirst, publisherSecond);
var listenerFirst = spy(mock(ParticipantStatusListener.class));
when(listenerFirst.getType()).thenReturn(TOPIC_FIRST);
@@ -66,7 +67,7 @@ class MessageDispatcherActivatorTest {
when(listenerSecond.getType()).thenReturn(TOPIC_SECOND);
when(listenerSecond.getScoListener()).thenReturn(listenerSecond);
- var listeners = new Listener[] {listenerFirst, listenerSecond};
+ List<Listener> listeners = List.of(listenerFirst, listenerSecond);
try (var activator = new MessageDispatcherActivator(parameterGroup, publishers, listeners)) {