aboutsummaryrefslogtreecommitdiffstats
path: root/participant/participant-intermediary/src/main
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2021-08-18 15:25:59 +0100
committerFrancescoFioraEst <francesco.fiora@est.tech>2021-08-26 13:44:57 +0100
commit6d02de6b9ea3f4e6fc588813fd2177c732a2af92 (patch)
tree71d74f431b35e950767be889a2b6d7ed1de7af45 /participant/participant-intermediary/src/main
parent281a36c50d68f29e0e47dfec10ee8be38f5e5761 (diff)
Fix issue in event handling in participants
Fix issue in event handling in participants and refactor Participant Publisher and Listener Issue-ID: POLICY-3544 Change-Id: Ic92ffa79d303adfb1c3319fbfefb1faef911a9d4 Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant/participant-intermediary/src/main')
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java13
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java8
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java37
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java55
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java69
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java40
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java11
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java34
14 files changed, 203 insertions, 100 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
index 0b9110bd0..d24f32f2f 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -39,4 +40,9 @@ public class ControlLoopStateChangeListener extends ParticipantListener<ControlL
super(ControlLoopStateChange.class, participantHandler,
participantHandler::handleControlLoopStateChange);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
index 56bc1fd9a..f9dec1863 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -38,4 +39,9 @@ public class ControlLoopUpdateListener extends ParticipantListener<ControlLoopUp
public ControlLoopUpdateListener(final ParticipantHandler participantHandler) {
super(ControlLoopUpdate.class, participantHandler, participantHandler::handleControlLoopUpdate);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.CONTROL_LOOP_UPDATE.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java
index 4b7224938..113f75dd5 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java
@@ -24,6 +24,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import java.util.function.Consumer;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
@@ -32,7 +33,8 @@ import org.onap.policy.common.utils.coder.StandardCoderObject;
/**
* Abstract Listener for Participant Ack messages sent by runtime.
*/
-public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T> {
+public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T>
+ implements Listener {
private final ParticipantHandler participantHandler;
private final Consumer<T> consumer;
@@ -52,6 +54,13 @@ public abstract class ParticipantAckListener<T extends ParticipantAckMessage> ex
@Override
public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) {
- consumer.accept(message);
+ if (participantHandler.appliesTo(message)) {
+ consumer.accept(message);
+ }
+ }
+
+ @Override
+ public ScoListener<T> getScoListener() {
+ return this;
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java
index e20f481f8..5440e005b 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -39,4 +40,9 @@ public class ParticipantDeregisterAckListener extends ParticipantAckListener<Par
public ParticipantDeregisterAckListener(final ParticipantHandler participantHandler) {
super(ParticipantDeregisterAck.class, participantHandler, participantHandler::handleParticipantDeregisterAck);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java
index c6ad900b3..67af5c844 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import java.util.function.Consumer;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
@@ -30,7 +31,7 @@ import org.onap.policy.common.utils.coder.StandardCoderObject;
/**
* Abstract Listener for Participant messages sent by CLAMP.
*/
-public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> {
+public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> implements Listener {
private final ParticipantHandler participantHandler;
private final Consumer<T> consumer;
@@ -54,4 +55,9 @@ public abstract class ParticipantListener<T extends ParticipantMessage> extends
consumer.accept(message);
}
}
+
+ @Override
+ public ScoListener<T> getScoListener() {
+ return this;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java
index d8cc9eb6b..2941e9fd8 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java
@@ -21,35 +21,43 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import java.util.List;
+import javax.ws.rs.core.Response.Status;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Publisher;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* This class is used to send Participant Status messages to clamp using TopicSinkClient.
*
*/
-public class ParticipantMessagePublisher {
+@Component
+public class ParticipantMessagePublisher implements Publisher {
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class);
- private final TopicSinkClient topicSinkClient;
+ private boolean active = false;
+ private TopicSinkClient topicSinkClient;
/**
* Constructor for instantiating ParticipantMessagePublisher.
*
* @param topicSinks the topic sinks
*/
- public ParticipantMessagePublisher(List<TopicSink> topicSinks) {
+ @Override
+ public void active(List<TopicSink> topicSinks) {
if (topicSinks.size() != 1) {
throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1");
}
this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ active = true;
}
/**
@@ -58,6 +66,9 @@ public class ParticipantMessagePublisher {
* @param participantStatus the Participant Status
*/
public void sendParticipantStatus(final ParticipantStatus participantStatus) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantStatus);
LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus);
}
@@ -68,6 +79,9 @@ public class ParticipantMessagePublisher {
* @param participantRegister the Participant Status
*/
public void sendParticipantRegister(final ParticipantRegister participantRegister) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantRegister);
LOGGER.debug("Sent Participant Register message to CLAMP - {}", participantRegister);
}
@@ -78,6 +92,9 @@ public class ParticipantMessagePublisher {
* @param participantDeregister the Participant Status
*/
public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantDeregister);
LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister);
}
@@ -88,6 +105,9 @@ public class ParticipantMessagePublisher {
* @param participantUpdateAck the Participant Update Ack
*/
public void sendParticipantUpdateAck(final ParticipantUpdateAck participantUpdateAck) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantUpdateAck);
LOGGER.debug("Sent Participant Update Ack message to CLAMP - {}", participantUpdateAck);
}
@@ -98,6 +118,9 @@ public class ParticipantMessagePublisher {
* @param controlLoopAck ControlLoop Update/StateChange Ack
*/
public void sendControlLoopAck(final ControlLoopAck controlLoopAck) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(controlLoopAck);
LOGGER.debug("Sent ControlLoop Update/StateChange Ack to runtime - {}", controlLoopAck);
}
@@ -108,7 +131,15 @@ public class ParticipantMessagePublisher {
* @param participantStatus the Participant Status
*/
public void sendHeartbeat(final ParticipantStatus participantStatus) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantStatus);
LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus);
}
+
+ @Override
+ public void stop() {
+ active = false;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java
index a15a2a850..7be460815 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -39,4 +40,9 @@ public class ParticipantRegisterAckListener extends ParticipantAckListener<Parti
public ParticipantRegisterAckListener(final ParticipantHandler participantHandler) {
super(ParticipantRegisterAck.class, participantHandler, participantHandler::handleParticipantRegisterAck);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java
index 0881edb19..9e978fe75 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -38,4 +39,9 @@ public class ParticipantStatusReqListener extends ParticipantListener<Participan
public ParticipantStatusReqListener(final ParticipantHandler participantHandler) {
super(ParticipantStatusReq.class, participantHandler, participantHandler::handleParticipantStatusReq);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_STATUS_REQ.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java
index 42bd52d9a..da45501e7 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
@@ -38,4 +39,9 @@ public class ParticipantUpdateListener extends ParticipantListener<ParticipantUp
public ParticipantUpdateListener(final ParticipantHandler participantHandler) {
super(ParticipantUpdate.class, participantHandler, participantHandler::handleParticipantUpdate);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_UPDATE.name();
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java
deleted file mode 100644
index e363504a5..000000000
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.clamp.controlloop.participant.intermediary.config;
-
-import java.util.List;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class BeanFactory {
-
- // Name of the message type for messages on topics
- private static final String[] MSG_TYPE_NAMES = {"messageType"};
-
- /**
- * create ParticipantMessagePublisher.
- *
- * @param parameters the ParticipantParameters
- * @return ParticipantMessagePublisher
- */
- @Bean
- public ParticipantMessagePublisher publisher(final ParticipantParameters parameters) {
- List<TopicSink> topicSinks = TopicEndpointManager.getManager()
- .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks());
- return new ParticipantMessagePublisher(topicSinks);
- }
-
- @Bean
- public MessageTypeDispatcher msgDispatcher() {
- return new MessageTypeDispatcher(MSG_TYPE_NAMES);
- }
-}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
index 4fc0ae1b1..754bf2887 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
@@ -23,20 +23,13 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusReqListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener;
import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
-import org.springframework.context.ApplicationContext;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
@@ -48,32 +41,50 @@ import org.springframework.stereotype.Component;
@Component
public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
- private final ApplicationContext applicationContext;
+ private static final String[] MSG_TYPE_NAMES = {"messageType"};
// Topics from which the participant receives and to which the participant sends messages
+ private List<TopicSink> topicSinks;
private List<TopicSource> topicSources;
ParticipantIntermediaryApi participantIntermediaryApi;
+ private final MessageTypeDispatcher msgDispatcher;
+
/**
* Instantiate the activator for participant.
*
- * @param applicationContext ApplicationContext
* @param parameters the ParticipantParameters
+ * @param publishers list of Publishers
+ * @param listeners list of Listeners
*/
- public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters,
- ParticipantIntermediaryApi participantIntermediaryApi) {
- this.applicationContext = applicationContext;
+ public IntermediaryActivator(final ParticipantParameters parameters,
+ ParticipantIntermediaryApi participantIntermediaryApi, List<Publisher> publishers,
+ List<Listener> listeners) {
this.participantIntermediaryApi = participantIntermediaryApi;
+ topicSinks = TopicEndpointManager.getManager()
+ .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks());
+
topicSources = TopicEndpointManager.getManager()
.addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources());
- // @formatter:off
+ msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ // @formatter:off
addAction("Topic endpoint management",
- () -> TopicEndpointManager.getManager().start(),
- () -> TopicEndpointManager.getManager().shutdown());
+ () -> TopicEndpointManager.getManager().start(),
+ () -> TopicEndpointManager.getManager().shutdown());
+
+ publishers.forEach(publisher ->
+ addAction("Publisher " + publisher.getClass().getSimpleName(),
+ () -> publisher.active(topicSinks),
+ publisher::stop));
+
+ listeners.forEach(listener ->
+ addAction("Listener " + listener.getClass().getSimpleName(),
+ () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
+ () -> msgDispatcher.unregister(listener.getType())));
addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
// @formatter:on
@@ -117,26 +128,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher() {
- MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS_REQ.name(),
- applicationContext.getBean(ParticipantStatusReqListener.class));
-
- msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name(),
- applicationContext.getBean(ControlLoopStateChangeListener.class));
-
- msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_UPDATE.name(),
- applicationContext.getBean(ControlLoopUpdateListener.class));
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(),
- applicationContext.getBean(ParticipantRegisterAckListener.class));
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(),
- applicationContext.getBean(ParticipantDeregisterAckListener.class));
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_UPDATE.name(),
- applicationContext.getBean(ParticipantUpdateListener.class));
-
for (final TopicSource source : topicSources) {
source.register(msgDispatcher);
}
@@ -146,8 +137,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
* Unregisters the dispatcher from the topic source(s).
*/
private void unregisterMsgDispatcher() {
- MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
-
for (final TopicSource source : topicSources) {
source.unregister(msgDispatcher);
}
@@ -155,6 +144,8 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
@Override
public void close() throws IOException {
- super.shutdown();
+ if (isAlive()) {
+ super.shutdown();
+ }
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java
new file mode 100644
index 000000000..bca71afda
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+
+public interface Listener {
+
+ /**
+ * Get the type of message of interest to the listener.
+ *
+ * @return type of message of interest to the listener
+ */
+ String getType();
+
+ /**
+ * Get listener to register.
+ *
+ * @return listener to register
+ */
+ <T> ScoListener<T> getScoListener();
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
index 1947fda1a..66e09e7f6 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
@@ -43,6 +43,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
@@ -197,6 +198,16 @@ public class ParticipantHandler implements Closeable {
}
/**
+ * Check if a participant message applies to this participant handler.
+ *
+ * @param participantMsg the message to check
+ * @return true if it applies, false otherwise
+ */
+ public boolean appliesTo(ParticipantAckMessage participantMsg) {
+ return participantMsg.appliesTo(participantType, participantId);
+ }
+
+ /**
* Method to send ParticipantRegister message to controlloop runtime.
*/
public void sendParticipantRegister() {
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java
new file mode 100644
index 000000000..287d7c055
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import java.util.List;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+
+/**
+ * Publisher.
+ */
+public interface Publisher {
+
+ void active(List<TopicSink> topicSinks);
+
+ void stop();
+}