diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-08-18 15:25:59 +0100 |
---|---|---|
committer | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-08-26 13:44:57 +0100 |
commit | 6d02de6b9ea3f4e6fc588813fd2177c732a2af92 (patch) | |
tree | 71d74f431b35e950767be889a2b6d7ed1de7af45 /participant/participant-intermediary | |
parent | 281a36c50d68f29e0e47dfec10ee8be38f5e5761 (diff) |
Fix issue in event handling in participants
Fix issue in event handling in participants
and refactor Participant Publisher and Listener
Issue-ID: POLICY-3544
Change-Id: Ic92ffa79d303adfb1c3319fbfefb1faef911a9d4
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant/participant-intermediary')
14 files changed, 203 insertions, 100 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java index 0b9110bd0..d24f32f2f 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -39,4 +40,9 @@ public class ControlLoopStateChangeListener extends ParticipantListener<ControlL super(ControlLoopStateChange.class, participantHandler, participantHandler::handleControlLoopStateChange); } + + @Override + public String getType() { + return ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java index 56bc1fd9a..f9dec1863 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -38,4 +39,9 @@ public class ControlLoopUpdateListener extends ParticipantListener<ControlLoopUp public ControlLoopUpdateListener(final ParticipantHandler participantHandler) { super(ControlLoopUpdate.class, participantHandler, participantHandler::handleControlLoopUpdate); } + + @Override + public String getType() { + return ParticipantMessageType.CONTROL_LOOP_UPDATE.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java index 4b7224938..113f75dd5 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java @@ -24,6 +24,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import java.util.function.Consumer; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; @@ -32,7 +33,8 @@ import org.onap.policy.common.utils.coder.StandardCoderObject; /** * Abstract Listener for Participant Ack messages sent by runtime. */ -public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T> { +public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T> + implements Listener { private final ParticipantHandler participantHandler; private final Consumer<T> consumer; @@ -52,6 +54,13 @@ public abstract class ParticipantAckListener<T extends ParticipantAckMessage> ex @Override public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { - consumer.accept(message); + if (participantHandler.appliesTo(message)) { + consumer.accept(message); + } + } + + @Override + public ScoListener<T> getScoListener() { + return this; } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java index e20f481f8..5440e005b 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -39,4 +40,9 @@ public class ParticipantDeregisterAckListener extends ParticipantAckListener<Par public ParticipantDeregisterAckListener(final ParticipantHandler participantHandler) { super(ParticipantDeregisterAck.class, participantHandler, participantHandler::handleParticipantDeregisterAck); } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java index c6ad900b3..67af5c844 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java @@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import java.util.function.Consumer; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; @@ -30,7 +31,7 @@ import org.onap.policy.common.utils.coder.StandardCoderObject; /** * Abstract Listener for Participant messages sent by CLAMP. */ -public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> { +public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> implements Listener { private final ParticipantHandler participantHandler; private final Consumer<T> consumer; @@ -54,4 +55,9 @@ public abstract class ParticipantListener<T extends ParticipantMessage> extends consumer.accept(message); } } + + @Override + public ScoListener<T> getScoListener() { + return this; + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java index d8cc9eb6b..2941e9fd8 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java @@ -21,35 +21,43 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import java.util.List; +import javax.ws.rs.core.Response.Status; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Publisher; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * This class is used to send Participant Status messages to clamp using TopicSinkClient. * */ -public class ParticipantMessagePublisher { +@Component +public class ParticipantMessagePublisher implements Publisher { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class); - private final TopicSinkClient topicSinkClient; + private boolean active = false; + private TopicSinkClient topicSinkClient; /** * Constructor for instantiating ParticipantMessagePublisher. * * @param topicSinks the topic sinks */ - public ParticipantMessagePublisher(List<TopicSink> topicSinks) { + @Override + public void active(List<TopicSink> topicSinks) { if (topicSinks.size() != 1) { throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1"); } this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + active = true; } /** @@ -58,6 +66,9 @@ public class ParticipantMessagePublisher { * @param participantStatus the Participant Status */ public void sendParticipantStatus(final ParticipantStatus participantStatus) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantStatus); LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus); } @@ -68,6 +79,9 @@ public class ParticipantMessagePublisher { * @param participantRegister the Participant Status */ public void sendParticipantRegister(final ParticipantRegister participantRegister) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantRegister); LOGGER.debug("Sent Participant Register message to CLAMP - {}", participantRegister); } @@ -78,6 +92,9 @@ public class ParticipantMessagePublisher { * @param participantDeregister the Participant Status */ public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantDeregister); LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister); } @@ -88,6 +105,9 @@ public class ParticipantMessagePublisher { * @param participantUpdateAck the Participant Update Ack */ public void sendParticipantUpdateAck(final ParticipantUpdateAck participantUpdateAck) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantUpdateAck); LOGGER.debug("Sent Participant Update Ack message to CLAMP - {}", participantUpdateAck); } @@ -98,6 +118,9 @@ public class ParticipantMessagePublisher { * @param controlLoopAck ControlLoop Update/StateChange Ack */ public void sendControlLoopAck(final ControlLoopAck controlLoopAck) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(controlLoopAck); LOGGER.debug("Sent ControlLoop Update/StateChange Ack to runtime - {}", controlLoopAck); } @@ -108,7 +131,15 @@ public class ParticipantMessagePublisher { * @param participantStatus the Participant Status */ public void sendHeartbeat(final ParticipantStatus participantStatus) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantStatus); LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus); } + + @Override + public void stop() { + active = false; + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java index a15a2a850..7be460815 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -39,4 +40,9 @@ public class ParticipantRegisterAckListener extends ParticipantAckListener<Parti public ParticipantRegisterAckListener(final ParticipantHandler participantHandler) { super(ParticipantRegisterAck.class, participantHandler, participantHandler::handleParticipantRegisterAck); } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java index 0881edb19..9e978fe75 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusReqListener.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -38,4 +39,9 @@ public class ParticipantStatusReqListener extends ParticipantListener<Participan public ParticipantStatusReqListener(final ParticipantHandler participantHandler) { super(ParticipantStatusReq.class, participantHandler, participantHandler::handleParticipantStatusReq); } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_STATUS_REQ.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java index 42bd52d9a..da45501e7 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -38,4 +39,9 @@ public class ParticipantUpdateListener extends ParticipantListener<ParticipantUp public ParticipantUpdateListener(final ParticipantHandler participantHandler) { super(ParticipantUpdate.class, participantHandler, participantHandler::handleParticipantUpdate); } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_UPDATE.name(); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java deleted file mode 100644 index e363504a5..000000000 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.clamp.controlloop.participant.intermediary.config; - -import java.util.List; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; -import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class BeanFactory { - - // Name of the message type for messages on topics - private static final String[] MSG_TYPE_NAMES = {"messageType"}; - - /** - * create ParticipantMessagePublisher. - * - * @param parameters the ParticipantParameters - * @return ParticipantMessagePublisher - */ - @Bean - public ParticipantMessagePublisher publisher(final ParticipantParameters parameters) { - List<TopicSink> topicSinks = TopicEndpointManager.getManager() - .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); - return new ParticipantMessagePublisher(topicSinks); - } - - @Bean - public MessageTypeDispatcher msgDispatcher() { - return new MessageTypeDispatcher(MSG_TYPE_NAMES); - } -} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java index 4fc0ae1b1..754bf2887 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -23,20 +23,13 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; import java.io.Closeable; import java.io.IOException; import java.util.List; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusReqListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.utils.services.ServiceManagerContainer; -import org.springframework.context.ApplicationContext; import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; @@ -48,32 +41,50 @@ import org.springframework.stereotype.Component; @Component public class IntermediaryActivator extends ServiceManagerContainer implements Closeable { - private final ApplicationContext applicationContext; + private static final String[] MSG_TYPE_NAMES = {"messageType"}; // Topics from which the participant receives and to which the participant sends messages + private List<TopicSink> topicSinks; private List<TopicSource> topicSources; ParticipantIntermediaryApi participantIntermediaryApi; + private final MessageTypeDispatcher msgDispatcher; + /** * Instantiate the activator for participant. * - * @param applicationContext ApplicationContext * @param parameters the ParticipantParameters + * @param publishers list of Publishers + * @param listeners list of Listeners */ - public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters, - ParticipantIntermediaryApi participantIntermediaryApi) { - this.applicationContext = applicationContext; + public IntermediaryActivator(final ParticipantParameters parameters, + ParticipantIntermediaryApi participantIntermediaryApi, List<Publisher> publishers, + List<Listener> listeners) { this.participantIntermediaryApi = participantIntermediaryApi; + topicSinks = TopicEndpointManager.getManager() + .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); + topicSources = TopicEndpointManager.getManager() .addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources()); - // @formatter:off + msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + // @formatter:off addAction("Topic endpoint management", - () -> TopicEndpointManager.getManager().start(), - () -> TopicEndpointManager.getManager().shutdown()); + () -> TopicEndpointManager.getManager().start(), + () -> TopicEndpointManager.getManager().shutdown()); + + publishers.forEach(publisher -> + addAction("Publisher " + publisher.getClass().getSimpleName(), + () -> publisher.active(topicSinks), + publisher::stop)); + + listeners.forEach(listener -> + addAction("Listener " + listener.getClass().getSimpleName(), + () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), + () -> msgDispatcher.unregister(listener.getType()))); addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); // @formatter:on @@ -117,26 +128,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { - MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS_REQ.name(), - applicationContext.getBean(ParticipantStatusReqListener.class)); - - msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name(), - applicationContext.getBean(ControlLoopStateChangeListener.class)); - - msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_UPDATE.name(), - applicationContext.getBean(ControlLoopUpdateListener.class)); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(), - applicationContext.getBean(ParticipantRegisterAckListener.class)); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(), - applicationContext.getBean(ParticipantDeregisterAckListener.class)); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_UPDATE.name(), - applicationContext.getBean(ParticipantUpdateListener.class)); - for (final TopicSource source : topicSources) { source.register(msgDispatcher); } @@ -146,8 +137,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { - MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); - for (final TopicSource source : topicSources) { source.unregister(msgDispatcher); } @@ -155,6 +144,8 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl @Override public void close() throws IOException { - super.shutdown(); + if (isAlive()) { + super.shutdown(); + } } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java new file mode 100644 index 000000000..bca71afda --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import org.onap.policy.common.endpoints.listeners.ScoListener; + +public interface Listener { + + /** + * Get the type of message of interest to the listener. + * + * @return type of message of interest to the listener + */ + String getType(); + + /** + * Get listener to register. + * + * @return listener to register + */ + <T> ScoListener<T> getScoListener(); +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java index 1947fda1a..66e09e7f6 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -43,6 +43,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; @@ -197,6 +198,16 @@ public class ParticipantHandler implements Closeable { } /** + * Check if a participant message applies to this participant handler. + * + * @param participantMsg the message to check + * @return true if it applies, false otherwise + */ + public boolean appliesTo(ParticipantAckMessage participantMsg) { + return participantMsg.appliesTo(participantType, participantId); + } + + /** * Method to send ParticipantRegister message to controlloop runtime. */ public void sendParticipantRegister() { diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java new file mode 100644 index 000000000..287d7c055 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.TopicSink; + +/** + * Publisher. + */ +public interface Publisher { + + void active(List<TopicSink> topicSinks); + + void stop(); +} |