From 6d02de6b9ea3f4e6fc588813fd2177c732a2af92 Mon Sep 17 00:00:00 2001 From: FrancescoFioraEst Date: Wed, 18 Aug 2021 15:25:59 +0100 Subject: Fix issue in event handling in participants Fix issue in event handling in participants and refactor Participant Publisher and Listener Issue-ID: POLICY-3544 Change-Id: Ic92ffa79d303adfb1c3319fbfefb1faef911a9d4 Signed-off-by: FrancescoFioraEst --- .../comm/ControlLoopStateChangeListener.java | 6 ++ .../comm/ControlLoopUpdateListener.java | 6 ++ .../intermediary/comm/ParticipantAckListener.java | 13 +++- .../comm/ParticipantDeregisterAckListener.java | 6 ++ .../intermediary/comm/ParticipantListener.java | 8 ++- .../comm/ParticipantMessagePublisher.java | 37 +++++++++++- .../comm/ParticipantRegisterAckListener.java | 6 ++ .../comm/ParticipantStatusReqListener.java | 6 ++ .../comm/ParticipantUpdateListener.java | 6 ++ .../intermediary/config/BeanFactory.java | 55 ----------------- .../handler/IntermediaryActivator.java | 69 ++++++++++------------ .../participant/intermediary/handler/Listener.java | 40 +++++++++++++ .../intermediary/handler/ParticipantHandler.java | 11 ++++ .../intermediary/handler/Publisher.java | 34 +++++++++++ 14 files changed, 203 insertions(+), 100 deletions(-) delete mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java (limited to 'participant/participant-intermediary/src/main/java') diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java index 0b9110bd0..d24f32f2f 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -39,4 +40,9 @@ public class ControlLoopStateChangeListener extends ParticipantListener extends ScoListener { +public abstract class ParticipantAckListener extends ScoListener + implements Listener { private final ParticipantHandler participantHandler; private final Consumer consumer; @@ -52,6 +54,13 @@ public abstract class ParticipantAckListener ex @Override public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { - consumer.accept(message); + if (participantHandler.appliesTo(message)) { + consumer.accept(message); + } + } + + @Override + public ScoListener getScoListener() { + return this; } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java index e20f481f8..5440e005b 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -39,4 +40,9 @@ public class ParticipantDeregisterAckListener extends ParticipantAckListener extends ScoListener { +public abstract class ParticipantListener extends ScoListener implements Listener { private final ParticipantHandler participantHandler; private final Consumer consumer; @@ -54,4 +55,9 @@ public abstract class ParticipantListener extends consumer.accept(message); } } + + @Override + public ScoListener getScoListener() { + return this; + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java index d8cc9eb6b..2941e9fd8 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java @@ -21,35 +21,43 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import java.util.List; +import javax.ws.rs.core.Response.Status; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Publisher; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * This class is used to send Participant Status messages to clamp using TopicSinkClient. * */ -public class ParticipantMessagePublisher { +@Component +public class ParticipantMessagePublisher implements Publisher { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class); - private final TopicSinkClient topicSinkClient; + private boolean active = false; + private TopicSinkClient topicSinkClient; /** * Constructor for instantiating ParticipantMessagePublisher. * * @param topicSinks the topic sinks */ - public ParticipantMessagePublisher(List topicSinks) { + @Override + public void active(List topicSinks) { if (topicSinks.size() != 1) { throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1"); } this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + active = true; } /** @@ -58,6 +66,9 @@ public class ParticipantMessagePublisher { * @param participantStatus the Participant Status */ public void sendParticipantStatus(final ParticipantStatus participantStatus) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantStatus); LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus); } @@ -68,6 +79,9 @@ public class ParticipantMessagePublisher { * @param participantRegister the Participant Status */ public void sendParticipantRegister(final ParticipantRegister participantRegister) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantRegister); LOGGER.debug("Sent Participant Register message to CLAMP - {}", participantRegister); } @@ -78,6 +92,9 @@ public class ParticipantMessagePublisher { * @param participantDeregister the Participant Status */ public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantDeregister); LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister); } @@ -88,6 +105,9 @@ public class ParticipantMessagePublisher { * @param participantUpdateAck the Participant Update Ack */ public void sendParticipantUpdateAck(final ParticipantUpdateAck participantUpdateAck) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantUpdateAck); LOGGER.debug("Sent Participant Update Ack message to CLAMP - {}", participantUpdateAck); } @@ -98,6 +118,9 @@ public class ParticipantMessagePublisher { * @param controlLoopAck ControlLoop Update/StateChange Ack */ public void sendControlLoopAck(final ControlLoopAck controlLoopAck) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(controlLoopAck); LOGGER.debug("Sent ControlLoop Update/StateChange Ack to runtime - {}", controlLoopAck); } @@ -108,7 +131,15 @@ public class ParticipantMessagePublisher { * @param participantStatus the Participant Status */ public void sendHeartbeat(final ParticipantStatus participantStatus) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantStatus); LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus); } + + @Override + public void stop() { + active = false; + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java index a15a2a850..7be460815 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.springframework.stereotype.Component; @@ -39,4 +40,9 @@ public class ParticipantRegisterAckListener extends ParticipantAckListener topicSinks = TopicEndpointManager.getManager() - .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); - return new ParticipantMessagePublisher(topicSinks); - } - - @Bean - public MessageTypeDispatcher msgDispatcher() { - return new MessageTypeDispatcher(MSG_TYPE_NAMES); - } -} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java index 4fc0ae1b1..754bf2887 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -23,20 +23,13 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; import java.io.Closeable; import java.io.IOException; import java.util.List; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusReqListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.utils.services.ServiceManagerContainer; -import org.springframework.context.ApplicationContext; import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; @@ -48,32 +41,50 @@ import org.springframework.stereotype.Component; @Component public class IntermediaryActivator extends ServiceManagerContainer implements Closeable { - private final ApplicationContext applicationContext; + private static final String[] MSG_TYPE_NAMES = {"messageType"}; // Topics from which the participant receives and to which the participant sends messages + private List topicSinks; private List topicSources; ParticipantIntermediaryApi participantIntermediaryApi; + private final MessageTypeDispatcher msgDispatcher; + /** * Instantiate the activator for participant. * - * @param applicationContext ApplicationContext * @param parameters the ParticipantParameters + * @param publishers list of Publishers + * @param listeners list of Listeners */ - public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters, - ParticipantIntermediaryApi participantIntermediaryApi) { - this.applicationContext = applicationContext; + public IntermediaryActivator(final ParticipantParameters parameters, + ParticipantIntermediaryApi participantIntermediaryApi, List publishers, + List listeners) { this.participantIntermediaryApi = participantIntermediaryApi; + topicSinks = TopicEndpointManager.getManager() + .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); + topicSources = TopicEndpointManager.getManager() .addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources()); - // @formatter:off + msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + // @formatter:off addAction("Topic endpoint management", - () -> TopicEndpointManager.getManager().start(), - () -> TopicEndpointManager.getManager().shutdown()); + () -> TopicEndpointManager.getManager().start(), + () -> TopicEndpointManager.getManager().shutdown()); + + publishers.forEach(publisher -> + addAction("Publisher " + publisher.getClass().getSimpleName(), + () -> publisher.active(topicSinks), + publisher::stop)); + + listeners.forEach(listener -> + addAction("Listener " + listener.getClass().getSimpleName(), + () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), + () -> msgDispatcher.unregister(listener.getType()))); addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); // @formatter:on @@ -117,26 +128,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { - MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS_REQ.name(), - applicationContext.getBean(ParticipantStatusReqListener.class)); - - msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name(), - applicationContext.getBean(ControlLoopStateChangeListener.class)); - - msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_UPDATE.name(), - applicationContext.getBean(ControlLoopUpdateListener.class)); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(), - applicationContext.getBean(ParticipantRegisterAckListener.class)); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(), - applicationContext.getBean(ParticipantDeregisterAckListener.class)); - - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_UPDATE.name(), - applicationContext.getBean(ParticipantUpdateListener.class)); - for (final TopicSource source : topicSources) { source.register(msgDispatcher); } @@ -146,8 +137,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { - MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); - for (final TopicSource source : topicSources) { source.unregister(msgDispatcher); } @@ -155,6 +144,8 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl @Override public void close() throws IOException { - super.shutdown(); + if (isAlive()) { + super.shutdown(); + } } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java new file mode 100644 index 000000000..bca71afda --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Listener.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import org.onap.policy.common.endpoints.listeners.ScoListener; + +public interface Listener { + + /** + * Get the type of message of interest to the listener. + * + * @return type of message of interest to the listener + */ + String getType(); + + /** + * Get listener to register. + * + * @return listener to register + */ + ScoListener getScoListener(); +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java index 1947fda1a..66e09e7f6 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -43,6 +43,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; @@ -196,6 +197,16 @@ public class ParticipantHandler implements Closeable { return participantMsg.appliesTo(participantType, participantId); } + /** + * Check if a participant message applies to this participant handler. + * + * @param participantMsg the message to check + * @return true if it applies, false otherwise + */ + public boolean appliesTo(ParticipantAckMessage participantMsg) { + return participantMsg.appliesTo(participantType, participantId); + } + /** * Method to send ParticipantRegister message to controlloop runtime. */ diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java new file mode 100644 index 000000000..287d7c055 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/Publisher.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.TopicSink; + +/** + * Publisher. + */ +public interface Publisher { + + void active(List topicSinks); + + void stop(); +} -- cgit 1.2.3-korg