diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-06-21 16:45:27 +0100 |
---|---|---|
committer | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-06-28 10:36:54 +0100 |
commit | 7f7f0c5790dc63e43a59c53154e796239ab34cbb (patch) | |
tree | 39a8682b04502e99bc8575ec636613de193ade03 /participant/participant-intermediary/src | |
parent | 077406d881b8b03456f76d246a4db44d13780999 (diff) |
Convert Intermediary Participant to Spring and refactor Participants
Issue-ID: POLICY-3370
Change-Id: I1541fc47b35c91f9ec86ab768bc3cec2cd78647c
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant/participant-intermediary/src')
13 files changed, 250 insertions, 295 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java index adc9c2393..a87299bdc 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java @@ -31,25 +31,12 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; -import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; /** * This interface is used by participant implementations to use the participant intermediary. */ public interface ParticipantIntermediaryApi { - /** - * Initialise the participant intermediary. - * - * @param parameters the parameters for the intermediary - */ - void init(ParticipantIntermediaryParameters parameters); - - /** - * Close the intermediary. - */ - void close(); /** * Register a listener for control loop elements that are mediated by the intermediary. @@ -128,12 +115,4 @@ public interface ParticipantIntermediaryApi { */ void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics); - /** - * Return participantHandler, This will not be used in real world, but for junits, - * if participantHandler is not returned, there is no way to test state change messages - * without dmaap simulator. - * - * @return the participant handler - */ - ParticipantHandler getParticipantHandler(); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java index 839088d72..838f47544 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java @@ -35,45 +35,41 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; -import org.onap.policy.clamp.controlloop.participant.intermediary.handler.IntermediaryActivator; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.springframework.stereotype.Component; /** * This class is api implementation used by participant intermediary. */ +@Component public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryApi { - // The activator for the participant intermediary - private IntermediaryActivator activator; + // The handler for the participant intermediary + private ParticipantHandler participantHandler; - @Override - public void init(ParticipantIntermediaryParameters parameters) { - activator = new IntermediaryActivator(parameters); - - activator.start(); - } - - @Override - public void close() { - activator.shutdown(); + /** + * Constructor. + * + * @param participantHandler ParticipantHandler + */ + public ParticipantIntermediaryApiImpl(ParticipantHandler participantHandler) { + this.participantHandler = participantHandler; } @Override public void registerControlLoopElementListener(ControlLoopElementListener controlLoopElementListener) { - activator.getParticipantHandler().getControlLoopHandler() - .registerControlLoopElementListener(controlLoopElementListener); + participantHandler.getControlLoopHandler().registerControlLoopElementListener(controlLoopElementListener); } @Override public List<Participant> getParticipants(String name, String version) { - return List.of(activator.getParticipantHandler().getParticipant(name, version)); + return List.of(participantHandler.getParticipant(name, version)); } @Override public Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state) { - return activator.getParticipantHandler().updateParticipantState(definition, state); + return participantHandler.updateParticipantState(definition, state); } @Override @@ -83,13 +79,13 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp @Override public ControlLoops getControlLoops(String name, String version) { - return activator.getParticipantHandler().getControlLoopHandler().getControlLoops(); + return participantHandler.getControlLoopHandler().getControlLoops(); } @Override public Map<UUID, ControlLoopElement> getControlLoopElements(String name, String version) { - List<ControlLoop> controlLoops = activator.getParticipantHandler() - .getControlLoopHandler().getControlLoops().getControlLoopList(); + List<ControlLoop> controlLoops = + participantHandler.getControlLoopHandler().getControlLoops().getControlLoopList(); for (ControlLoop controlLoop : controlLoops) { if (name.equals(controlLoop.getDefinition().getName())) { @@ -101,8 +97,8 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp @Override public ControlLoopElement getControlLoopElement(UUID id) { - List<ControlLoop> controlLoops = activator.getParticipantHandler() - .getControlLoopHandler().getControlLoops().getControlLoopList(); + List<ControlLoop> controlLoops = + participantHandler.getControlLoopHandler().getControlLoops().getControlLoopList(); for (ControlLoop controlLoop : controlLoops) { ControlLoopElement clElement = controlLoop.getElements().get(id); @@ -116,18 +112,11 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp @Override public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState currentState, ControlLoopState newState) { - return activator.getParticipantHandler().getControlLoopHandler() - .updateControlLoopElementState(id, currentState, newState); + return participantHandler.getControlLoopHandler().updateControlLoopElementState(id, currentState, newState); } @Override public void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics) { - activator.getParticipantHandler().getControlLoopHandler() - .updateControlLoopElementStatistics(id, elementStatistics); - } - - @Override - public ParticipantHandler getParticipantHandler() { - return activator.getParticipantHandler(); + participantHandler.getControlLoopHandler().updateControlLoopElementStatistics(id, elementStatistics); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java index 50b8b9cdc..e46c6db1b 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java @@ -20,23 +20,15 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; -import java.io.Closeable; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.listeners.ScoListener; -import org.onap.policy.common.utils.coder.StandardCoderObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * Listener for Participant State Change messages sent by CLAMP. */ -public class ControlLoopStateChangeListener extends ScoListener<ParticipantControlLoopStateChange> - implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopStateChangeListener.class); - - private final ParticipantHandler participantHandler; +@Component +public class ControlLoopStateChangeListener extends ParticipantListener<ParticipantControlLoopStateChange> { /** * Constructs the object. @@ -44,25 +36,7 @@ public class ControlLoopStateChangeListener extends ScoListener<ParticipantContr * @param participantHandler the handler for managing the state of the participant */ public ControlLoopStateChangeListener(final ParticipantHandler participantHandler) { - super(ParticipantControlLoopStateChange.class); - this.participantHandler = participantHandler; - } - - @Override - public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, - final ParticipantControlLoopStateChange controlLoopStateChangeMsg) { - LOGGER.debug("Control Loop State Change received from CLAMP - {}", controlLoopStateChangeMsg); - - if (participantHandler.canHandle(controlLoopStateChangeMsg)) { - LOGGER.debug("Message for this participant"); - participantHandler.getControlLoopHandler().handleControlLoopStateChange(controlLoopStateChangeMsg); - } else { - LOGGER.debug("Message not for this participant"); - } - } - - @Override - public void close() { - // No explicit action on this class + super(ParticipantControlLoopStateChange.class, participantHandler, + participantHandler::handleControlLoopStateChange); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java index ab2437c1c..d15643e0f 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java @@ -20,22 +20,15 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; -import java.io.Closeable; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.listeners.ScoListener; -import org.onap.policy.common.utils.coder.StandardCoderObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * Listener for Control Loop Update messages sent by CLAMP. */ -public class ControlLoopUpdateListener extends ScoListener<ParticipantControlLoopUpdate> implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdateListener.class); - - private final ParticipantHandler participantHandler; +@Component +public class ControlLoopUpdateListener extends ParticipantListener<ParticipantControlLoopUpdate> { /** * Constructs the object. @@ -43,25 +36,6 @@ public class ControlLoopUpdateListener extends ScoListener<ParticipantControlLoo * @param participantHandler the handler for managing the state of the participant */ public ControlLoopUpdateListener(final ParticipantHandler participantHandler) { - super(ParticipantControlLoopUpdate.class); - this.participantHandler = participantHandler; - } - - @Override - public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, - final ParticipantControlLoopUpdate participantControlLoopUpdateMsg) { - LOGGER.debug("Control Loop update received from CLAMP - {}", participantControlLoopUpdateMsg); - - if (participantHandler.canHandle(participantControlLoopUpdateMsg)) { - LOGGER.debug("Message for this participant"); - participantHandler.getControlLoopHandler().handleControlLoopUpdate(participantControlLoopUpdateMsg); - } else { - LOGGER.debug("Message not for this participant"); - } - } - - @Override - public void close() { - // No explicit action on this class + super(ParticipantControlLoopUpdate.class, participantHandler, participantHandler::handleControlLoopUpdate); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java index e0e6be329..15f5140eb 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java @@ -20,22 +20,15 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; -import java.io.Closeable; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.listeners.ScoListener; -import org.onap.policy.common.utils.coder.StandardCoderObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * Listener for Participant health status messages sent by CLAMP. */ -public class ParticipantHealthCheckListener extends ScoListener<ParticipantHealthCheck> implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHealthCheckListener.class); - - private final ParticipantHandler participantHandler; +@Component +public class ParticipantHealthCheckListener extends ParticipantListener<ParticipantHealthCheck> { /** * Constructs the object. @@ -43,27 +36,6 @@ public class ParticipantHealthCheckListener extends ScoListener<ParticipantHealt * @param participantHandler the handler for managing the state and health of the participant */ public ParticipantHealthCheckListener(final ParticipantHandler participantHandler) { - super(ParticipantHealthCheck.class); - this.participantHandler = participantHandler; - } - - @Override - public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, - final ParticipantHealthCheck participantHealthCheckMsg) { - LOGGER.debug("Participant Health Check message received from CLAMP - {}", participantHealthCheckMsg); - - - if (participantHandler.canHandle(participantHealthCheckMsg)) { - LOGGER.debug("Message for this participant"); - participantHandler.handleParticipantHealthCheck(participantHealthCheckMsg); - } else { - LOGGER.debug("Message not for this participant"); - } - - } - - @Override - public void close() { - // No explicit action on this class + super(ParticipantHealthCheck.class, participantHandler, participantHandler::handleParticipantHealthCheck); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java new file mode 100644 index 000000000..c6ad900b3 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.util.function.Consumer; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; + +/** + * Abstract Listener for Participant messages sent by CLAMP. + */ +public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> { + + private final ParticipantHandler participantHandler; + private final Consumer<T> consumer; + + /** + * Constructs the object. + * + * @param clazz class of message this handles + * @param participantHandler ParticipantHandler + * @param consumer function that handles the message + */ + protected ParticipantListener(Class<T> clazz, ParticipantHandler participantHandler, Consumer<T> consumer) { + super(clazz); + this.participantHandler = participantHandler; + this.consumer = consumer; + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { + if (participantHandler.appliesTo(message)) { + consumer.accept(message); + } + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java index c1a8b5b4a..ec6548a7c 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java @@ -20,23 +20,16 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; -import java.io.Closeable; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.listeners.ScoListener; -import org.onap.policy.common.utils.coder.StandardCoderObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * Listener for Participant State Change messages sent by CLAMP. * */ -public class ParticipantStateChangeListener extends ScoListener<ParticipantStateChange> implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStateChangeListener.class); - - private final ParticipantHandler participantHandler; +@Component +public class ParticipantStateChangeListener extends ParticipantListener<ParticipantStateChange> { /** * Constructs the object. @@ -44,25 +37,6 @@ public class ParticipantStateChangeListener extends ScoListener<ParticipantState * @param participantHandler the handler for managing the state of the participant */ public ParticipantStateChangeListener(final ParticipantHandler participantHandler) { - super(ParticipantStateChange.class); - this.participantHandler = participantHandler; - } - - @Override - public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, - final ParticipantStateChange participantStateChangeMsg) { - LOGGER.debug("Participant State Change received from CLAMP - {}", participantStateChangeMsg); - - if (participantHandler.canHandle(participantStateChangeMsg)) { - LOGGER.debug("Message for this participant"); - participantHandler.handleParticipantStateChange(participantStateChangeMsg); - } else { - LOGGER.debug("Message not for this participant"); - } - } - - @Override - public void close() { - // No explicit action on this class + super(ParticipantStateChange.class, participantHandler, participantHandler::handleParticipantStateChange); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java index bc53b4e9d..78b998453 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java @@ -20,7 +20,6 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; -import java.io.Closeable; import java.util.List; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; import org.onap.policy.common.endpoints.event.comm.TopicSink; @@ -32,7 +31,7 @@ import org.slf4j.LoggerFactory; * This class is used to send Participant Status messages to clamp using TopicSinkClient. * */ -public class ParticipantStatusPublisher implements Closeable { +public class ParticipantStatusPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusPublisher.class); private final TopicSinkClient topicSinkClient; @@ -55,9 +54,4 @@ public class ParticipantStatusPublisher implements Closeable { topicSinkClient.send(participantStatus); LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus); } - - @Override - public void close() { - // No explicit action on this class - } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java new file mode 100644 index 000000000..dc7d87eec --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java @@ -0,0 +1,55 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.config; + +import java.util.List; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class BeanFactory { + + // Name of the message type for messages on topics + private static final String[] MSG_TYPE_NAMES = {"messageType"}; + + /** + * create ParticipantStatusPublisher. + * + * @param parameters the ParticipantParameters + * @return ParticipantStatusPublisher + */ + @Bean + public ParticipantStatusPublisher publisher(final ParticipantParameters parameters) { + List<TopicSink> topicSinks = TopicEndpointManager.getManager() + .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); + return new ParticipantStatusPublisher(topicSinks); + } + + @Bean + public MessageTypeDispatcher msgDispatcher() { + return new MessageTypeDispatcher(MSG_TYPE_NAMES); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java index 3eebd177f..50048ffc2 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java @@ -20,7 +20,6 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; -import java.io.Closeable; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -51,7 +50,7 @@ import org.slf4j.LoggerFactory; * This class is responsible for managing the state of all control loops in the participant. */ @NoArgsConstructor -public class ControlLoopHandler implements Closeable { +public class ControlLoopHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopHandler.class); private ToscaConceptIdentifier participantType = null; @@ -77,11 +76,6 @@ public class ControlLoopHandler implements Closeable { this.messageSender = messageSender; } - @Override - public void close() { - // No explicit action on this class - } - public void registerControlLoopElementListener(ControlLoopElementListener listener) { listeners.add(listener); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java index 3eae27267..2d789d40d 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -20,125 +20,100 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; +import java.io.Closeable; +import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; -import javax.ws.rs.core.Response.Status; -import lombok.Getter; -import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantHealthCheckListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStateChangeListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; -import org.onap.policy.common.endpoints.listeners.ScoListener; import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.springframework.context.ApplicationContext; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; /** * This class activates the Participant Intermediary together with all its handlers. */ -public class IntermediaryActivator extends ServiceManagerContainer { - // Name of the message type for messages on topics - private static final String[] MSG_TYPE_NAMES = {"messageType"}; +@Component +public class IntermediaryActivator extends ServiceManagerContainer implements Closeable { - @Getter - private final ParticipantIntermediaryParameters parameters; + private final ApplicationContext applicationContext; // Topics from which the participant receives and to which the participant sends messages - private List<TopicSink> topicSinks; private List<TopicSource> topicSources; - // The participant handler for this intermediary - final AtomicReference<ParticipantHandler> participantHandler = new AtomicReference<>(); - - /** - * Listens for messages on the topic, decodes them into a message, and then dispatches them. - */ - private final MessageTypeDispatcher msgDispatcher; - /** * Instantiate the activator for participant. * - * @param parameters the parameters for the participant intermediary - * @throws ControlLoopRuntimeException when the activation fails + * @param applicationContext ApplicationContext + * @param parameters the ParticipantParameters */ - public IntermediaryActivator(final ParticipantIntermediaryParameters parameters) { - this.parameters = parameters; - - topicSinks = - TopicEndpointManager.getManager().addTopicSinks(parameters.getClampControlLoopTopics().getTopicSinks()); + public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters) { + this.applicationContext = applicationContext; - topicSources = - TopicEndpointManager.getManager().addTopicSources(parameters.getClampControlLoopTopics().getTopicSources()); - - try { - this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - } catch (final RuntimeException e) { - throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, - "topic message dispatcher failed to start", e); - } + topicSources = TopicEndpointManager.getManager() + .addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources()); // @formatter:off - final AtomicReference<ParticipantStatusPublisher> statusPublisher = new AtomicReference<>(); - final AtomicReference<ParticipantStateChangeListener> participantStateChangeListener = new AtomicReference<>(); - final AtomicReference<ParticipantHealthCheckListener> participantHealthCheckListener = new AtomicReference<>(); - final AtomicReference<ControlLoopStateChangeListener> controlLoopStateChangeListener = new AtomicReference<>(); - final AtomicReference<ControlLoopUpdateListener> controlLoopUpdateListener = new AtomicReference<>(); addAction("Topic endpoint management", () -> TopicEndpointManager.getManager().start(), () -> TopicEndpointManager.getManager().shutdown()); - addAction("Participant Status Publisher", - () -> statusPublisher.set(new ParticipantStatusPublisher(topicSinks)), - () -> statusPublisher.get().close()); - - addAction("Participant Handler", - () -> participantHandler.set(new ParticipantHandler(parameters, statusPublisher.get())), - () -> participantHandler.get().close()); - - addAction("Participant State Change Listener", - () -> participantStateChangeListener.set(new ParticipantStateChangeListener(participantHandler.get())), - () -> participantStateChangeListener.get().close()); - - addAction("Participant Health Check Listener", - () -> participantHealthCheckListener.set(new ParticipantHealthCheckListener(participantHandler.get())), - () -> participantHealthCheckListener.get().close()); - - addAction("Control Loop State Change Listener", - () -> controlLoopStateChangeListener.set(new ControlLoopStateChangeListener(participantHandler.get())), - () -> controlLoopStateChangeListener.get().close()); - - addAction("Control Loop Update Listener", - () -> controlLoopUpdateListener.set(new ControlLoopUpdateListener(participantHandler.get())), - () -> controlLoopUpdateListener.get().close()); - addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); // @formatter:on } /** + * Handle ContextRefreshEvent. + * + * @param ctxRefreshedEvent ContextRefreshedEvent + */ + @EventListener + public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) { + if (!isAlive()) { + start(); + } + } + + /** + * Handle ContextClosedEvent. + * + * @param ctxClosedEvent ContextClosedEvent + */ + @EventListener + public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) { + if (isAlive()) { + stop(); + } + } + + /** * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { + MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATE_CHANGE.name(), - (ScoListener<ParticipantStateChange>) new ParticipantStateChangeListener(participantHandler.get())); + applicationContext.getBean(ParticipantStateChangeListener.class)); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_HEALTH_CHECK.name(), - (ScoListener<ParticipantHealthCheck>) new ParticipantHealthCheckListener(participantHandler.get())); + applicationContext.getBean(ParticipantHealthCheckListener.class)); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_STATE_CHANGE.name(), - (ScoListener<ParticipantControlLoopStateChange>) new ControlLoopStateChangeListener( - participantHandler.get())); + applicationContext.getBean(ControlLoopStateChangeListener.class)); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_UPDATE.name(), - (ScoListener<ParticipantControlLoopUpdate>) new ControlLoopUpdateListener(participantHandler.get())); + applicationContext.getBean(ControlLoopUpdateListener.class)); + for (final TopicSource source : topicSources) { source.register(msgDispatcher); } @@ -148,17 +123,15 @@ public class IntermediaryActivator extends ServiceManagerContainer { * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { + MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class); + for (final TopicSource source : topicSources) { source.unregister(msgDispatcher); } } - /** - * Return the participant handler. - * - * @return the participant handler - */ - public ParticipantHandler getParticipantHandler() { - return participantHandler.get(); + @Override + public void close() throws IOException { + super.shutdown(); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java index 5e414b175..1c54658fa 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -28,6 +28,8 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; @@ -35,15 +37,17 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Parti import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * This class is responsible for managing the state of a participant. */ @Getter +@Component public class ParticipantHandler implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHandler.class); @@ -65,18 +69,18 @@ public class ParticipantHandler implements Closeable { * @param parameters the parameters of the participant * @param publisher the publisher for sending responses to messages */ - public ParticipantHandler(ParticipantIntermediaryParameters parameters, ParticipantStatusPublisher publisher) { - this.participantType = parameters.getParticipantType(); - this.participantId = parameters.getParticipantId(); - this.sender = new MessageSender(this, publisher, parameters.getReportingTimeInterval()); - this.controlLoopHandler = new ControlLoopHandler(parameters, sender); + public ParticipantHandler(ParticipantParameters parameters, ParticipantStatusPublisher publisher) { + this.participantType = parameters.getIntermediaryParameters().getParticipantType(); + this.participantId = parameters.getIntermediaryParameters().getParticipantId(); + this.sender = + new MessageSender(this, publisher, parameters.getIntermediaryParameters().getReportingTimeInterval()); + this.controlLoopHandler = new ControlLoopHandler(parameters.getIntermediaryParameters(), sender); this.participantStatistics = new ParticipantStatistics(); } @Override public void close() { sender.close(); - controlLoopHandler.close(); } /** @@ -133,6 +137,24 @@ public class ParticipantHandler implements Closeable { } /** + * Handle a control loop update message. + * + * @param updateMsg the update message + */ + public void handleControlLoopUpdate(ParticipantControlLoopUpdate updateMsg) { + controlLoopHandler.handleControlLoopUpdate(updateMsg); + } + + /** + * Handle a control loop state change message. + * + * @param stateChangeMsg the state change message + */ + public void handleControlLoopStateChange(ParticipantControlLoopStateChange stateChangeMsg) { + controlLoopHandler.handleControlLoopStateChange(stateChangeMsg); + } + + /** * Method to handle when the new state from participant is active. * * @param response participant response @@ -233,4 +255,14 @@ public class ParticipantHandler implements Closeable { public boolean canHandle(ParticipantMessage partipantMsg) { return partipantMsg.appliesTo(participantType, participantId); } + + /** + * Check if a participant message applies to this participant handler. + * + * @param partipantMsg the message to check + * @return true if it applies, false otherwise + */ + public boolean appliesTo(ParticipantMessage partipantMsg) { + return partipantMsg.appliesTo(participantType, participantId); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantParameters.java index d7cc4b2ed..c350b1b95 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantParameters.java @@ -18,21 +18,9 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.clamp.controlloop.participant.intermediary.api; +package org.onap.policy.clamp.controlloop.participant.intermediary.parameters; -import org.onap.policy.clamp.controlloop.participant.intermediary.api.impl.ParticipantIntermediaryApiImpl; +public interface ParticipantParameters { -/** - * Factory class for creating {@link ParticipantIntermediaryApi} instances. - */ -public class ParticipantIntermediaryFactory { - - /** - * Create an implementation of the {@link ParticipantIntermediaryApi} interface. - * - * @return the implementation of the API - */ - public ParticipantIntermediaryApi createApiImplementation() { - return new ParticipantIntermediaryApiImpl(); - } + ParticipantIntermediaryParameters getIntermediaryParameters(); } |