diff options
Diffstat (limited to 'tosca-controlloop/participant/participant-intermediary/src')
8 files changed, 909 insertions, 16 deletions
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java index 6c3e029fe..7ea8b4f35 100644 --- a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java +++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java @@ -30,6 +30,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; @@ -42,12 +43,12 @@ public interface ParticipantIntermediaryApi { * * @param parameters the parameters for the intermediary */ - public void init(ParticipantIntermediaryParameters parameters); + void init(ParticipantIntermediaryParameters parameters); /** * Close the intermediary. */ - public void close(); + void close(); /** * Get participants loops from the intermediary API. @@ -56,23 +57,23 @@ public interface ParticipantIntermediaryApi { * @param version the participant version, null for all * @return the participants */ - public List<Participant> getParticipants(String name, String version); + List<Participant> getParticipants(String name, String version); /** * Update the state of a participant. * * @param definition the definition of the participant to update the state on * @param state the state of the participant - * @return updated participant + * @return the participant */ - public Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state); + Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state); /** * Update the statistics of a participant. * * @param participantStatistics the statistics of the participant */ - public void updateParticipantStatistics(ParticipantStatistics participantStatistics); + void updateParticipantStatistics(ParticipantStatistics participantStatistics); /** * Get control loops from the intermediary API. @@ -81,7 +82,7 @@ public interface ParticipantIntermediaryApi { * @param version the control loop element version, null for all * @return the control loop elements */ - public ControlLoops getControlLoops(String name, String version); + ControlLoops getControlLoops(String name, String version); /** * Get control loop elements from the intermediary API. @@ -90,7 +91,7 @@ public interface ParticipantIntermediaryApi { * @param version the control loop element version, null for all * @return the control loop elements */ - public List<ControlLoopElement> getControlLoopElements(String name, String version); + List<ControlLoopElement> getControlLoopElements(String name, String version); /** * Update the state of a control loop. @@ -99,7 +100,7 @@ public interface ParticipantIntermediaryApi { * @param state the state of the control loop * @return ControlLoop updated control loop */ - public ControlLoop updateControlLoopState(ToscaConceptIdentifier definition, ControlLoopOrderedState state); + ControlLoop updateControlLoopState(ToscaConceptIdentifier definition, ControlLoopOrderedState state); /** * Update the state of a control loop element. @@ -108,12 +109,21 @@ public interface ParticipantIntermediaryApi { * @param state the state of the control loop element * @return ControlLoopElement updated control loop element */ - public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState state); + ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState state); /** * Update the control loop element statistics. * * @param elementStatistics the updated statistics */ - public void updateControlLoopElementStatistics(ClElementStatistics elementStatistics); + void updateControlLoopElementStatistics(ClElementStatistics elementStatistics); + + /** + * Returns participantHandler, This will not be used in real world, but for junits, + * if participantHandler is not returned, there is no way to test state change messages + * without dmaap simulator. + * + * @return ParticipantHandler returns a participantHandler + */ + ParticipantHandler getParticipantHandler(); } diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java index 939f927e6..9e494862e 100644 --- a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java +++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java @@ -32,6 +32,8 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.IntermediaryActivator; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; @@ -40,49 +42,74 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; */ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryApi { + // The activator for the participant intermediary + private IntermediaryActivator activator; + @Override public void init(ParticipantIntermediaryParameters parameters) { + activator = new IntermediaryActivator(parameters); + + activator.start(); } @Override public void close() { + activator.shutdown(); } @Override public List<Participant> getParticipants(String name, String version) { - return Collections.emptyList(); + return List.of(activator.getParticipantHandler().getParticipant(name, version)); } @Override public Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state) { - return null; + return activator.getParticipantHandler().updateParticipantState(definition, state); } @Override public void updateParticipantStatistics(ParticipantStatistics participantStatistics) { + // TODO Auto-generated method stub } @Override public ControlLoops getControlLoops(String name, String version) { - return null; + return activator.getParticipantHandler().getControlLoopHandler().getControlLoops(); } @Override public List<ControlLoopElement> getControlLoopElements(String name, String version) { + List<ControlLoop> controlLoops = activator.getParticipantHandler() + .getControlLoopHandler().getControlLoops().getControlLoopList(); + + for (ControlLoop controlLoop : controlLoops) { + if (controlLoop.getDefinition().getName().equals(name)) { + return controlLoop.getElements(); + } + } return Collections.emptyList(); } @Override public ControlLoop updateControlLoopState(ToscaConceptIdentifier definition, ControlLoopOrderedState state) { - return null; + return activator.getParticipantHandler().getControlLoopHandler() + .updateControlLoopState(definition, state); } @Override public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState state) { - return null; + return activator.getParticipantHandler().getControlLoopHandler() + .updateControlLoopElementState(id, state); } @Override public void updateControlLoopElementStatistics(ClElementStatistics elementStatistics) { + activator.getParticipantHandler().getControlLoopHandler() + .updateControlLoopElementStatistics(elementStatistics); + } + + @Override + public ParticipantHandler getParticipantHandler() { + return activator.getParticipantHandler(); } } diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java new file mode 100644 index 000000000..2ba98891f --- /dev/null +++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java @@ -0,0 +1,61 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.io.Closeable; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listener for Control Loop Update messages sent by CLAMP. + */ +public class ControlLoopUpdateListener extends ScoListener<ParticipantControlLoopUpdate> implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdateListener.class); + + private final ParticipantHandler participantHandler; + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ControlLoopUpdateListener(final ParticipantHandler participantHandler) { + super(ParticipantControlLoopUpdate.class); + this.participantHandler = participantHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantControlLoopUpdate participantControlLoopUpdateMsg) { + LOGGER.debug("Control Loop update received from CLAMP - {}", participantControlLoopUpdateMsg); + participantHandler.getControlLoopHandler().handleControlLoopUpdate(participantControlLoopUpdateMsg); + } + + @Override + public void close() { + // No explicit action on this class + } +} diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java new file mode 100644 index 000000000..20490f81d --- /dev/null +++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java @@ -0,0 +1,118 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.io.Closeable; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class sends messages from participants to CLAMP. + */ +public class MessageSender extends TimerTask implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class); + + private final ParticipantHandler participantHandler; + private final ParticipantStatusPublisher publisher; + private ScheduledExecutorService timerPool; + + /** + * Constructor, set the publisher. + * + * @param participantHandler the participant handler to use for gathering information + * @param publisher the publisher to use for sending messages + * @param interval time interval to send Participant Status periodic messages + */ + public MessageSender(ParticipantHandler participantHandler, ParticipantStatusPublisher publisher, + long interval) { + this.participantHandler = participantHandler; + this.publisher = publisher; + + // Kick off the timer + timerPool = makeTimerPool(); + timerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.SECONDS); + } + + @Override + public void run() { + LOGGER.debug("Sent heartbeat to CLAMP"); + + ParticipantResponseDetails response = new ParticipantResponseDetails(); + + response.setResponseTo(null); + response.setResponseStatus(ParticipantResponseStatus.PERIODIC); + response.setResponseMessage("Periodic response from participant"); + } + + @Override + public void close() { + timerPool.shutdown(); + } + + /** + * Send a response message for this participant. + * + * @param response the details to include in the response message + */ + public void sendResponse(ParticipantResponseDetails response) { + sendResponse(null, response); + } + + /** + * Send a response message for this participant. + * + * @param controlLoopId the control loop to which this message is a response + * @param response the details to include in the response message + */ + public void sendResponse(ToscaConceptIdentifier controlLoopId, ParticipantResponseDetails response) { + ParticipantStatus status = new ParticipantStatus(); + + // Participant related fields + status.setParticipantId(participantHandler.getParticipantId()); + status.setState(participantHandler.getState()); + status.setHealthStatus(participantHandler.getHealthStatus()); + + // Control loop related fields + status.setControlLoopId(controlLoopId); + status.setControlLoops(participantHandler.getControlLoopHandler().getControlLoops()); + status.setResponse(response); + + publisher.send(status); + } + + /** + * Makes a new timer pool. + * + * @return a new timer pool + */ + protected ScheduledExecutorService makeTimerPool() { + return Executors.newScheduledThreadPool(1); + } +} diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java new file mode 100644 index 000000000..e909327cd --- /dev/null +++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.io.Closeable; +import java.util.List; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to send Participant Status messages to clamp using TopicSinkClient. + */ +public class ParticipantStatusPublisher implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusPublisher.class); + + private final TopicSinkClient topicSinkClient; + + /** + * Constructor for instantiating ParticipantStatusPublisher. + * + * @param topicSinks the topic sinks + */ + public ParticipantStatusPublisher(List<TopicSink> topicSinks) { + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + } + + /** + * Method to send Participant Status message to clamp on demand. + * + * @param participantStatus the Participant Status + */ + public void send(final ParticipantStatus participantStatus) { + topicSinkClient.send(participantStatus); + LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus); + } + + @Override + public void close() { + // No explicit action on this class + } +} diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java new file mode 100644 index 000000000..f27be961b --- /dev/null +++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java @@ -0,0 +1,282 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.commons.collections4.CollectionUtils; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatistics; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * This class is responsible for managing the state of all control loops in the participant. + */ +public class ControlLoopHandler implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopHandler.class); + + private ToscaConceptIdentifier participantId = null; + private MessageSender sender = null; + + private final Map<ToscaConceptIdentifier, ControlLoop> controlLoopMap = new LinkedHashMap<>(); + private final Map<UUID, ControlLoopElement> elementsOnThisParticipant = new LinkedHashMap<>(); + + public ControlLoopHandler() { + } + + /** + * Constructor, set the participant ID and sender. + * + * @param parameters the parameters of the participant + * @param sender the sender for sending responses to messages + */ + public ControlLoopHandler(ParticipantIntermediaryParameters parameters, MessageSender sender) { + this.participantId = parameters.getParticipantId(); + this.sender = sender; + } + + @Override + public void close() { + // No explicit action on this class + } + + /** + * Handle a control loop element state change message. + * + * @param id controlloop element id + * @param state the updated state + * @return controlLoopElement the updated controlloop element + */ + public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState state) { + + if (id == null) { + return null; + } + + ControlLoopElement clElement = elementsOnThisParticipant.get(id); + if (clElement != null) { + clElement.setOrderedState(state); + LOGGER.debug("Control loop element {} ordered state changed to {}", id, state); + ParticipantResponseDetails response = new ParticipantResponseDetails(); + sender.sendResponse(response); + return elementsOnThisParticipant.get(id); + } + + return null; + } + + public void updateControlLoopElementStatistics(ClElementStatistics elementStatistics) { + // TODO Handle statistics coming from a participant implementation + } + + /** + * Handle a control loop state change message. + * + * @param definition controlloop id + * @param state the updated state + * @return controlLoop the updated controlloop + */ + public ControlLoop updateControlLoopState(ToscaConceptIdentifier definition, ControlLoopOrderedState state) { + if (definition == null) { + return null; + } + + ControlLoop controlLoop = controlLoopMap.get(definition); + if (controlLoop == null) { + LOGGER.debug("Control loop {} does not use this participant", definition.getName()); + return null; + } + + ParticipantResponseDetails response = new ParticipantResponseDetails(); + handleState(controlLoop, response, state); + sender.sendResponse(response); + return controlLoop; + } + + /** + * Handle a control loop state change message. + * + * @param stateChangeMsg the state change message + */ + public void handleControlLoopStateChange(ParticipantControlLoopStateChange stateChangeMsg) { + if (stateChangeMsg.getControlLoopId() == null) { + return; + } + + ControlLoop controlLoop = controlLoopMap.get(stateChangeMsg.getControlLoopId()); + + if (controlLoop == null) { + LOGGER.debug("Control loop {} does not use this participant", stateChangeMsg.getControlLoopId()); + return; + } + + ParticipantResponseDetails response = new ParticipantResponseDetails(stateChangeMsg); + handleState(controlLoop, response, stateChangeMsg.getOrderedState()); + sender.sendResponse(response); + } + + /** + * Method to handle state changes. + * + * @param controlLoop participant response + * @param response participant response + * @param state controlloop ordered state + */ + private void handleState(final ControlLoop controlLoop, final ParticipantResponseDetails response, + ControlLoopOrderedState state) { + switch (state) { + case UNINITIALISED: + handleUninitialisedState(controlLoop, response); + break; + case PASSIVE: + handlePassiveState(controlLoop, response); + break; + case RUNNING: + handleRunningState(controlLoop, response); + break; + default: + LOGGER.debug("StateChange message has no state, state is null {}", controlLoop.getDefinition()); + break; + } + } + + /** + * Handle a control loop update message. + * + * @param updateMsg the update message + */ + public void handleControlLoopUpdate(ParticipantControlLoopUpdate updateMsg) { + if (!updateMsg.appliesTo(participantId)) { + return; + } + + ControlLoop controlLoop = controlLoopMap.get(updateMsg.getControlLoopId()); + + ParticipantResponseDetails response = new ParticipantResponseDetails(updateMsg); + + // TODO: Updates to existing ControlLoops are not supported yet (Addition/Removal of ControlLoop + // elements to existing ControlLoop has to be supported). + if (controlLoop != null) { + response.setResponseStatus(ParticipantResponseStatus.FAIL); + response.setResponseMessage("Control loop " + updateMsg.getControlLoopId() + + " already defined on participant " + participantId); + + sender.sendResponse(response); + return; + } + + controlLoop = updateMsg.getControlLoop(); + controlLoop.getElements().removeIf(element -> participantId.equals(element.getParticipantId())); + + controlLoopMap.put(updateMsg.getControlLoopId(), controlLoop); + for (ControlLoopElement element : updateMsg.getControlLoop().getElements()) { + element.setState(element.getOrderedState().asState()); + elementsOnThisParticipant.put(element.getId(), element); + } + + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage( + "Control loop " + updateMsg.getControlLoopId() + " defined on participant " + participantId); + + sender.sendResponse(response); + } + + /** + * Method to handle when the new state from participant is UNINITIALISED state. + * + * @param controlLoop participant response + * @param response participant response + */ + private void handleUninitialisedState(final ControlLoop controlLoop, final ParticipantResponseDetails response) { + handleStateChange(controlLoop, ControlLoopState.UNINITIALISED, response); + controlLoopMap.remove(controlLoop.getKey().asIdentifier()); + } + + /** + * Method to handle when the new state from participant is PASSIVE state. + * + * @param controlLoop participant response + * @param response participant response + */ + private void handlePassiveState(final ControlLoop controlLoop, final ParticipantResponseDetails response) { + handleStateChange(controlLoop, ControlLoopState.PASSIVE, response); + } + + /** + * Method to handle when the new state from participant is RUNNING state. + * + * @param controlLoop participant response + * @param response participant response + */ + private void handleRunningState(final ControlLoop controlLoop, final ParticipantResponseDetails response) { + handleStateChange(controlLoop, ControlLoopState.RUNNING, response); + } + + /** + * Method to update the state of control loop elements. + * + * @param controlLoop participant status in memory + * @param state new state of the control loop elements + */ + private void handleStateChange(ControlLoop controlLoop, ControlLoopState newState, + ParticipantResponseDetails response) { + + if (newState.equals(controlLoop.getState())) { + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage("Control loop is already in state " + newState); + return; + } + + if (!CollectionUtils.isEmpty(controlLoop.getElements())) { + controlLoop.getElements().forEach(element -> element.setState(newState)); + } + + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage("ControlLoop state changed from " + controlLoop.getState() + " to " + newState); + controlLoop.setState(newState); + } + + /** + * Get control loops as a {@link ConrolLoops} class. + * + * @return the control loops + */ + public ControlLoops getControlLoops() { + ControlLoops controlLoops = new ControlLoops(); + controlLoops.setControlLoopList(new ArrayList<>(controlLoopMap.values())); + return controlLoops; + } +} diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java new file mode 100644 index 000000000..dd0cf30a8 --- /dev/null +++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -0,0 +1,129 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import javax.ws.rs.core.Response.Status; +import lombok.Getter; +import lombok.experimental.Delegate; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; +import org.onap.policy.common.utils.services.ServiceManagerContainer; + +/** + * This class activates the Participant Intermediary together with all its handlers. + */ +public class IntermediaryActivator extends ServiceManagerContainer { + // Name of the message type for messages on topics + private static final String[] MSG_TYPE_NAMES = {"messageType"}; + + @Getter + private final ParticipantIntermediaryParameters parameters; + + // Topics from which the participant receives and to which the participant sends messages + private List<TopicSink> topicSinks; + private List<TopicSource> topicSources; + + // The participant handler for this intermediary + final AtomicReference<ParticipantHandler> participantHandler = new AtomicReference<>(); + + /** + * Listens for messages on the topic, decodes them into a message, and then dispatches them. + */ + private final MessageTypeDispatcher msgDispatcher; + + /** + * Instantiate the activator for participant. + * + * @param parameters the parameters for the participant intermediary + */ + public IntermediaryActivator(final ParticipantIntermediaryParameters parameters) { + this.parameters = parameters; + + topicSinks = + TopicEndpointManager.getManager().addTopicSinks(parameters.getClampControlLoopTopics().getTopicSinks()); + + topicSources = TopicEndpointManager.getManager() + .addTopicSources(parameters.getClampControlLoopTopics().getTopicSources()); + + try { + this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + } catch (final RuntimeException e) { + throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, + "topic message dispatcher failed to start", e); + } + + // @formatter:off + final AtomicReference<ParticipantStatusPublisher> statusPublisher = new AtomicReference<>(); + final AtomicReference<ControlLoopUpdateListener> controlLoopUpdateListener = new AtomicReference<>(); + + addAction("Topic endpoint management", + () -> TopicEndpointManager.getManager().start(), + () -> TopicEndpointManager.getManager().shutdown()); + + addAction("Participant Status Publisher", + () -> statusPublisher.set(new ParticipantStatusPublisher(topicSinks)), + () -> statusPublisher.get().close()); + + addAction("Participant Handler", + () -> participantHandler.set(new ParticipantHandler(parameters, statusPublisher.get())), + () -> participantHandler.get().close()); + + addAction("Control Loop Update Listener", + () -> controlLoopUpdateListener.set(new ControlLoopUpdateListener(participantHandler.get())), + () -> controlLoopUpdateListener.get().close()); + + addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); + // @formatter:on + } + + /** + * Registers the dispatcher with the topic source(s). + */ + private void registerMsgDispatcher() { + for (final TopicSource source : topicSources) { + source.register(msgDispatcher); + } + } + + /** + * Unregisters the dispatcher from the topic source(s). + */ + private void unregisterMsgDispatcher() { + for (final TopicSource source : topicSources) { + source.unregister(msgDispatcher); + } + } + + /** + * Return the participant handler. + */ + public ParticipantHandler getParticipantHandler() { + return participantHandler.get(); + } +} diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java new file mode 100644 index 000000000..1150471ae --- /dev/null +++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -0,0 +1,204 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import java.io.Closeable; +import java.util.Objects; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is responsible for managing the state of a participant. + */ +@Getter +public class ParticipantHandler implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHandler.class); + + private final ToscaConceptIdentifier participantId; + private final MessageSender sender; + private final ControlLoopHandler controlLoopHandler; + + @Setter + private ParticipantState state = ParticipantState.UNKNOWN; + + @Setter + private ParticipantHealthStatus healthStatus = ParticipantHealthStatus.UNKNOWN; + + /** + * Constructor, set the participant ID and sender. + * + * @param parameters the parameters of the participant + * @param publisher the publisher for sending responses to messages + */ + public ParticipantHandler(ParticipantIntermediaryParameters parameters, ParticipantStatusPublisher publisher) { + this.participantId = parameters.getParticipantId(); + this.sender = new MessageSender(this, publisher, parameters.getReportingTimeInterval()); + this.controlLoopHandler = new ControlLoopHandler(parameters, sender); + } + + @Override + public void close() { + sender.close(); + controlLoopHandler.close(); + } + + /** + * Method which handles a participant state change event from clamp. + * + * @param stateChangeMsg participant state change message + */ + public void handleParticipantStateChange(final ParticipantStateChange stateChangeMsg) { + + if (!stateChangeMsg.appliesTo(participantId)) { + return; + } + + ParticipantResponseDetails response = new ParticipantResponseDetails(stateChangeMsg); + + switch (stateChangeMsg.getState()) { + case PASSIVE: + handlePassiveState(response); + break; + case ACTIVE: + handleActiveState(response); + break; + case SAFE: + handleSafeState(response); + break; + case TEST: + handleTestState(response); + break; + case TERMINATED: + handleTerminatedState(response); + break; + default: + LOGGER.debug("StateChange message has no state, state is null {}", stateChangeMsg.getParticipantId()); + response.setResponseStatus(ParticipantResponseStatus.FAIL); + response.setResponseMessage("StateChange message has invalid state for participantId " + + stateChangeMsg.getParticipantId()); + break; + } + + sender.sendResponse(response); + } + + /** + * Method to handle when the new state from participant is active. + * + * @param response participant response + */ + private void handleActiveState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.ACTIVE, response); + } + + /** + * Method to handle when the new state from participant is passive. + * + * @param response participant response + */ + private void handlePassiveState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.PASSIVE, response); + } + + /** + * Method to handle when the new state from participant is safe. + * + * @param response participant response + */ + private void handleSafeState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.SAFE, response); + } + + /** + * Method to handle when the new state from participant is TEST. + * + * @param response participant response + */ + private void handleTestState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.TEST, response); + } + + /** + * Method to handle when the new state from participant is Terminated. + * + * @param response participant response + */ + private void handleTerminatedState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.TERMINATED, response); + } + + private void handleStateChange(ParticipantState newParticipantState, ParticipantResponseDetails response) { + if (state.equals(newParticipantState)) { + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage("Participant already in state " + newParticipantState); + } else { + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage("Participant state changed from " + state + " to " + newParticipantState); + state = newParticipantState; + } + } + + /** + * Method to update participant state. + * + * @param definition participant definition + * @param participantState participant state + */ + public Participant updateParticipantState(ToscaConceptIdentifier definition, + ParticipantState participantState) { + if (!Objects.equals(definition, participantId)) { + LOGGER.debug("No participant with this ID {}", definition.getName()); + return null; + } + ParticipantResponseDetails response = new ParticipantResponseDetails(); + handleStateChange(participantState, response); + sender.sendResponse(response); + return getParticipant(definition.getName(), definition.getVersion()); + } + + /** + * Get participants as a {@link Participant} class. + * + * @return the participant + */ + public Participant getParticipant(String name, String version) { + if (participantId.getName().equals(name)) { + Participant participant = new Participant(); + participant.setDefinition(participantId); + participant.setParticipantState(state); + participant.setHealthStatus(healthStatus); + return participant; + } + return null; + } +} |