From 7f7f0c5790dc63e43a59c53154e796239ab34cbb Mon Sep 17 00:00:00 2001 From: FrancescoFioraEst Date: Mon, 21 Jun 2021 16:45:27 +0100 Subject: Convert Intermediary Participant to Spring and refactor Participants Issue-ID: POLICY-3370 Change-Id: I1541fc47b35c91f9ec86ab768bc3cec2cd78647c Signed-off-by: FrancescoFioraEst --- .../api/ParticipantIntermediaryApi.java | 21 ---- .../api/ParticipantIntermediaryFactory.java | 38 ------ .../api/impl/ParticipantIntermediaryApiImpl.java | 53 ++++---- .../comm/ControlLoopStateChangeListener.java | 36 +----- .../comm/ControlLoopUpdateListener.java | 34 +---- .../comm/ParticipantHealthCheckListener.java | 36 +----- .../intermediary/comm/ParticipantListener.java | 57 +++++++++ .../comm/ParticipantStateChangeListener.java | 34 +---- .../comm/ParticipantStatusPublisher.java | 8 +- .../intermediary/config/BeanFactory.java | 55 ++++++++ .../intermediary/handler/ControlLoopHandler.java | 8 +- .../handler/IntermediaryActivator.java | 139 +++++++++------------ .../intermediary/handler/ParticipantHandler.java | 46 +++++-- .../parameters/ParticipantParameters.java | 26 ++++ 14 files changed, 273 insertions(+), 318 deletions(-) delete mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantParameters.java (limited to 'participant/participant-intermediary/src') diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java index adc9c2393..a87299bdc 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java @@ -31,25 +31,12 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; -import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; /** * This interface is used by participant implementations to use the participant intermediary. */ public interface ParticipantIntermediaryApi { - /** - * Initialise the participant intermediary. - * - * @param parameters the parameters for the intermediary - */ - void init(ParticipantIntermediaryParameters parameters); - - /** - * Close the intermediary. - */ - void close(); /** * Register a listener for control loop elements that are mediated by the intermediary. @@ -128,12 +115,4 @@ public interface ParticipantIntermediaryApi { */ void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics); - /** - * Return participantHandler, This will not be used in real world, but for junits, - * if participantHandler is not returned, there is no way to test state change messages - * without dmaap simulator. - * - * @return the participant handler - */ - ParticipantHandler getParticipantHandler(); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java deleted file mode 100644 index d7cc4b2ed..000000000 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.clamp.controlloop.participant.intermediary.api; - -import org.onap.policy.clamp.controlloop.participant.intermediary.api.impl.ParticipantIntermediaryApiImpl; - -/** - * Factory class for creating {@link ParticipantIntermediaryApi} instances. - */ -public class ParticipantIntermediaryFactory { - - /** - * Create an implementation of the {@link ParticipantIntermediaryApi} interface. - * - * @return the implementation of the API - */ - public ParticipantIntermediaryApi createApiImplementation() { - return new ParticipantIntermediaryApiImpl(); - } -} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java index 839088d72..838f47544 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java @@ -35,45 +35,41 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; -import org.onap.policy.clamp.controlloop.participant.intermediary.handler.IntermediaryActivator; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.springframework.stereotype.Component; /** * This class is api implementation used by participant intermediary. */ +@Component public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryApi { - // The activator for the participant intermediary - private IntermediaryActivator activator; + // The handler for the participant intermediary + private ParticipantHandler participantHandler; - @Override - public void init(ParticipantIntermediaryParameters parameters) { - activator = new IntermediaryActivator(parameters); - - activator.start(); - } - - @Override - public void close() { - activator.shutdown(); + /** + * Constructor. + * + * @param participantHandler ParticipantHandler + */ + public ParticipantIntermediaryApiImpl(ParticipantHandler participantHandler) { + this.participantHandler = participantHandler; } @Override public void registerControlLoopElementListener(ControlLoopElementListener controlLoopElementListener) { - activator.getParticipantHandler().getControlLoopHandler() - .registerControlLoopElementListener(controlLoopElementListener); + participantHandler.getControlLoopHandler().registerControlLoopElementListener(controlLoopElementListener); } @Override public List getParticipants(String name, String version) { - return List.of(activator.getParticipantHandler().getParticipant(name, version)); + return List.of(participantHandler.getParticipant(name, version)); } @Override public Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state) { - return activator.getParticipantHandler().updateParticipantState(definition, state); + return participantHandler.updateParticipantState(definition, state); } @Override @@ -83,13 +79,13 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp @Override public ControlLoops getControlLoops(String name, String version) { - return activator.getParticipantHandler().getControlLoopHandler().getControlLoops(); + return participantHandler.getControlLoopHandler().getControlLoops(); } @Override public Map getControlLoopElements(String name, String version) { - List controlLoops = activator.getParticipantHandler() - .getControlLoopHandler().getControlLoops().getControlLoopList(); + List controlLoops = + participantHandler.getControlLoopHandler().getControlLoops().getControlLoopList(); for (ControlLoop controlLoop : controlLoops) { if (name.equals(controlLoop.getDefinition().getName())) { @@ -101,8 +97,8 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp @Override public ControlLoopElement getControlLoopElement(UUID id) { - List controlLoops = activator.getParticipantHandler() - .getControlLoopHandler().getControlLoops().getControlLoopList(); + List controlLoops = + participantHandler.getControlLoopHandler().getControlLoops().getControlLoopList(); for (ControlLoop controlLoop : controlLoops) { ControlLoopElement clElement = controlLoop.getElements().get(id); @@ -116,18 +112,11 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp @Override public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState currentState, ControlLoopState newState) { - return activator.getParticipantHandler().getControlLoopHandler() - .updateControlLoopElementState(id, currentState, newState); + return participantHandler.getControlLoopHandler().updateControlLoopElementState(id, currentState, newState); } @Override public void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics) { - activator.getParticipantHandler().getControlLoopHandler() - .updateControlLoopElementStatistics(id, elementStatistics); - } - - @Override - public ParticipantHandler getParticipantHandler() { - return activator.getParticipantHandler(); + participantHandler.getControlLoopHandler().updateControlLoopElementStatistics(id, elementStatistics); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java index 50b8b9cdc..e46c6db1b 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java @@ -20,23 +20,15 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; -import java.io.Closeable; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.listeners.ScoListener; -import org.onap.policy.common.utils.coder.StandardCoderObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * Listener for Participant State Change messages sent by CLAMP. */ -public class ControlLoopStateChangeListener extends ScoListener - implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopStateChangeListener.class); - - private final ParticipantHandler participantHandler; +@Component +public class ControlLoopStateChangeListener extends ParticipantListener { /** * Constructs the object. @@ -44,25 +36,7 @@ public class ControlLoopStateChangeListener extends ScoListener implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdateListener.class); - - private final ParticipantHandler participantHandler; +@Component +public class ControlLoopUpdateListener extends ParticipantListener { /** * Constructs the object. @@ -43,25 +36,6 @@ public class ControlLoopUpdateListener extends ScoListener implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHealthCheckListener.class); - - private final ParticipantHandler participantHandler; +@Component +public class ParticipantHealthCheckListener extends ParticipantListener { /** * Constructs the object. @@ -43,27 +36,6 @@ public class ParticipantHealthCheckListener extends ScoListener extends ScoListener { + + private final ParticipantHandler participantHandler; + private final Consumer consumer; + + /** + * Constructs the object. + * + * @param clazz class of message this handles + * @param participantHandler ParticipantHandler + * @param consumer function that handles the message + */ + protected ParticipantListener(Class clazz, ParticipantHandler participantHandler, Consumer consumer) { + super(clazz); + this.participantHandler = participantHandler; + this.consumer = consumer; + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { + if (participantHandler.appliesTo(message)) { + consumer.accept(message); + } + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java index c1a8b5b4a..ec6548a7c 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java @@ -20,23 +20,16 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; -import java.io.Closeable; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.listeners.ScoListener; -import org.onap.policy.common.utils.coder.StandardCoderObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * Listener for Participant State Change messages sent by CLAMP. * */ -public class ParticipantStateChangeListener extends ScoListener implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStateChangeListener.class); - - private final ParticipantHandler participantHandler; +@Component +public class ParticipantStateChangeListener extends ParticipantListener { /** * Constructs the object. @@ -44,25 +37,6 @@ public class ParticipantStateChangeListener extends ScoListener topicSinks = TopicEndpointManager.getManager() + .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); + return new ParticipantStatusPublisher(topicSinks); + } + + @Bean + public MessageTypeDispatcher msgDispatcher() { + return new MessageTypeDispatcher(MSG_TYPE_NAMES); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java index 3eebd177f..50048ffc2 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java @@ -20,7 +20,6 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; -import java.io.Closeable; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -51,7 +50,7 @@ import org.slf4j.LoggerFactory; * This class is responsible for managing the state of all control loops in the participant. */ @NoArgsConstructor -public class ControlLoopHandler implements Closeable { +public class ControlLoopHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopHandler.class); private ToscaConceptIdentifier participantType = null; @@ -77,11 +76,6 @@ public class ControlLoopHandler implements Closeable { this.messageSender = messageSender; } - @Override - public void close() { - // No explicit action on this class - } - public void registerControlLoopElementListener(ControlLoopElementListener listener) { listeners.add(listener); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java index 3eae27267..2d789d40d 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -20,125 +20,100 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; +import java.io.Closeable; +import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; -import javax.ws.rs.core.Response.Status; -import lombok.Getter; -import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantHealthCheckListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStateChangeListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; -import org.onap.policy.common.endpoints.listeners.ScoListener; import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.springframework.context.ApplicationContext; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; /** * This class activates the Participant Intermediary together with all its handlers. */ -public class IntermediaryActivator extends ServiceManagerContainer { - // Name of the message type for messages on topics - private static final String[] MSG_TYPE_NAMES = {"messageType"}; +@Component +public class IntermediaryActivator extends ServiceManagerContainer implements Closeable { - @Getter - private final ParticipantIntermediaryParameters parameters; + private final ApplicationContext applicationContext; // Topics from which the participant receives and to which the participant sends messages - private List topicSinks; private List topicSources; - // The participant handler for this intermediary - final AtomicReference participantHandler = new AtomicReference<>(); - - /** - * Listens for messages on the topic, decodes them into a message, and then dispatches them. - */ - private final MessageTypeDispatcher msgDispatcher; - /** * Instantiate the activator for participant. * - * @param parameters the parameters for the participant intermediary - * @throws ControlLoopRuntimeException when the activation fails + * @param applicationContext ApplicationContext + * @param parameters the ParticipantParameters */ - public IntermediaryActivator(final ParticipantIntermediaryParameters parameters) { - this.parameters = parameters; - - topicSinks = - TopicEndpointManager.getManager().addTopicSinks(parameters.getClampControlLoopTopics().getTopicSinks()); + public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters) { + this.applicationContext = applicationContext; - topicSources = - TopicEndpointManager.getManager().addTopicSources(parameters.getClampControlLoopTopics().getTopicSources()); - - try { - this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - } catch (final RuntimeException e) { - throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, - "topic message dispatcher failed to start", e); - } + topicSources = TopicEndpointManager.getManager() + .addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources()); // @formatter:off - final AtomicReference statusPublisher = new AtomicReference<>(); - final AtomicReference participantStateChangeListener = new AtomicReference<>(); - final AtomicReference participantHealthCheckListener = new AtomicReference<>(); - final AtomicReference controlLoopStateChangeListener = new AtomicReference<>(); - final AtomicReference controlLoopUpdateListener = new AtomicReference<>(); addAction("Topic endpoint management", () -> TopicEndpointManager.getManager().start(), () -> TopicEndpointManager.getManager().shutdown()); - addAction("Participant Status Publisher", - () -> statusPublisher.set(new ParticipantStatusPublisher(topicSinks)), - () -> statusPublisher.get().close()); - - addAction("Participant Handler", - () -> participantHandler.set(new ParticipantHandler(parameters, statusPublisher.get())), - () -> participantHandler.get().close()); - - addAction("Participant State Change Listener", - () -> participantStateChangeListener.set(new ParticipantStateChangeListener(participantHandler.get())), - () -> participantStateChangeListener.get().close()); - - addAction("Participant Health Check Listener", - () -> participantHealthCheckListener.set(new ParticipantHealthCheckListener(participantHandler.get())), - () -> participantHealthCheckListener.get().close()); - - addAction("Control Loop State Change Listener", - () -> controlLoopStateChangeListener.set(new ControlLoopStateChangeListener(participantHandler.get())), - () -> controlLoopStateChangeListener.get().close()); - - addAction("Control Loop Update Listener", - () -> controlLoopUpdateListener.set(new ControlLoopUpdateListener(participantHandler.get())), - () -> controlLoopUpdateListener.get().close()); - addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); // @formatter:on } + /** + * Handle ContextRefreshEvent. + * + * @param ctxRefreshedEvent ContextRefreshedEvent + */ + @EventListener + public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) { + if (!isAlive()) { + start(); + } + } + + /** + * Handle ContextClosedEvent. + * + * @param ctxClosedEvent ContextClosedEvent + */ + @EventListener + public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) { + if (isAlive()) { + stop(); + } + } + /** * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { + MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATE_CHANGE.name(), - (ScoListener) new ParticipantStateChangeListener(participantHandler.get())); + applicationContext.getBean(ParticipantStateChangeListener.class)); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_HEALTH_CHECK.name(), - (ScoListener) new ParticipantHealthCheckListener(participantHandler.get())); + applicationContext.getBean(ParticipantHealthCheckListener.class)); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_STATE_CHANGE.name(), - (ScoListener) new ControlLoopStateChangeListener( - participantHandler.get())); + applicationContext.getBean(ControlLoopStateChangeListener.class)); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_UPDATE.name(), - (ScoListener) new ControlLoopUpdateListener(participantHandler.get())); + applicationContext.getBean(ControlLoopUpdateListener.class)); + for (final TopicSource source : topicSources) { source.register(msgDispatcher); } @@ -148,17 +123,15 @@ public class IntermediaryActivator extends ServiceManagerContainer { * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { + MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); + for (final TopicSource source : topicSources) { source.unregister(msgDispatcher); } } - /** - * Return the participant handler. - * - * @return the participant handler - */ - public ParticipantHandler getParticipantHandler() { - return participantHandler.get(); + @Override + public void close() throws IOException { + super.shutdown(); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java index 5e414b175..1c54658fa 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -28,6 +28,8 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; @@ -35,15 +37,17 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Parti import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * This class is responsible for managing the state of a participant. */ @Getter +@Component public class ParticipantHandler implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHandler.class); @@ -65,18 +69,18 @@ public class ParticipantHandler implements Closeable { * @param parameters the parameters of the participant * @param publisher the publisher for sending responses to messages */ - public ParticipantHandler(ParticipantIntermediaryParameters parameters, ParticipantStatusPublisher publisher) { - this.participantType = parameters.getParticipantType(); - this.participantId = parameters.getParticipantId(); - this.sender = new MessageSender(this, publisher, parameters.getReportingTimeInterval()); - this.controlLoopHandler = new ControlLoopHandler(parameters, sender); + public ParticipantHandler(ParticipantParameters parameters, ParticipantStatusPublisher publisher) { + this.participantType = parameters.getIntermediaryParameters().getParticipantType(); + this.participantId = parameters.getIntermediaryParameters().getParticipantId(); + this.sender = + new MessageSender(this, publisher, parameters.getIntermediaryParameters().getReportingTimeInterval()); + this.controlLoopHandler = new ControlLoopHandler(parameters.getIntermediaryParameters(), sender); this.participantStatistics = new ParticipantStatistics(); } @Override public void close() { sender.close(); - controlLoopHandler.close(); } /** @@ -132,6 +136,24 @@ public class ParticipantHandler implements Closeable { sender.sendResponse(response); } + /** + * Handle a control loop update message. + * + * @param updateMsg the update message + */ + public void handleControlLoopUpdate(ParticipantControlLoopUpdate updateMsg) { + controlLoopHandler.handleControlLoopUpdate(updateMsg); + } + + /** + * Handle a control loop state change message. + * + * @param stateChangeMsg the state change message + */ + public void handleControlLoopStateChange(ParticipantControlLoopStateChange stateChangeMsg) { + controlLoopHandler.handleControlLoopStateChange(stateChangeMsg); + } + /** * Method to handle when the new state from participant is active. * @@ -233,4 +255,14 @@ public class ParticipantHandler implements Closeable { public boolean canHandle(ParticipantMessage partipantMsg) { return partipantMsg.appliesTo(participantType, participantId); } + + /** + * Check if a participant message applies to this participant handler. + * + * @param partipantMsg the message to check + * @return true if it applies, false otherwise + */ + public boolean appliesTo(ParticipantMessage partipantMsg) { + return partipantMsg.appliesTo(participantType, participantId); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantParameters.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantParameters.java new file mode 100644 index 000000000..c350b1b95 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantParameters.java @@ -0,0 +1,26 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.parameters; + +public interface ParticipantParameters { + + ParticipantIntermediaryParameters getIntermediaryParameters(); +} -- cgit 1.2.3-korg