diff options
Diffstat (limited to 'participant/participant-intermediary/src/main/java/org')
14 files changed, 1650 insertions, 0 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ControlLoopElementListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ControlLoopElementListener.java new file mode 100644 index 000000000..9e5d2c663 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ControlLoopElementListener.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.api; + +import java.util.UUID; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; + +/** + * This interface is implemented by participant implementations to receive updates on control loop elements. + */ +public interface ControlLoopElementListener { + /** + * Handle a control loop element state change. + * + * @param controlLoopElementId the ID of the control loop element + * @param currentState the current state of the control loop element + * @param newState the state to which the control loop element is changing to + * @throws PfModelException in case of a model exception + */ + public void controlLoopElementStateChange(UUID controlLoopElementId, ControlLoopState currentState, + ControlLoopOrderedState newState) throws PfModelException; + + /** + * Handle an update on a control loop element. + * + * @param element the information on the control loop element + * @param controlLoopDefinition toscaServiceTemplate + * @throws PfModelException from Policy framework + */ + public void controlLoopElementUpdate(ControlLoopElement element, + ToscaServiceTemplate controlLoopDefinition) throws PfModelException; + + /** + * Handle controlLoopElement statistics. + * + * @param controlLoopElementId controlLoopElement id + * @throws PfModelException in case of a model exception + */ + public void handleStatistics(UUID controlLoopElementId) throws PfModelException; +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java new file mode 100644 index 000000000..d31ae1082 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java @@ -0,0 +1,139 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.api; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatistics; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; + +/** + * This interface is used by participant implementations to use the participant intermediary. + */ +public interface ParticipantIntermediaryApi { + /** + * Initialise the participant intermediary. + * + * @param parameters the parameters for the intermediary + */ + void init(ParticipantIntermediaryParameters parameters); + + /** + * Close the intermediary. + */ + void close(); + + /** + * Register a listener for control loop elements that are mediated by the intermediary. + * + * @param controlLoopElementListener The control loop element listener to register + */ + void registerControlLoopElementListener(ControlLoopElementListener controlLoopElementListener); + + /** + * Get participants loops from the intermediary API. + * + * @param name the participant name, null for all + * @param version the participant version, null for all + * @return the participants + */ + List<Participant> getParticipants(String name, String version); + + /** + * Update the state of a participant. + * + * @param definition the definition of the participant to update the state on + * @param state the state of the participant + * @return the participant + */ + Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state); + + /** + * Update the statistics of a participant. + * + * @param participantStatistics the statistics of the participant + */ + void updateParticipantStatistics(ParticipantStatistics participantStatistics); + + /** + * Get control loops from the intermediary API. + * + * @param name the control loop element name, null for all + * @param version the control loop element version, null for all + * @return the control loop elements + */ + ControlLoops getControlLoops(String name, String version); + + /** + * Get control loop elements from the intermediary API. + * + * @param name the control loop element name, null for all + * @param version the control loop element version, null for all + * @return the control loop elements + */ + Map<UUID, ControlLoopElement> getControlLoopElements(String name, String version); + + /** + * Get control loop element from the intermediary API. + * + * @param id control loop element ID + * @return the control loop element + */ + ControlLoopElement getControlLoopElement(UUID id); + + /** + * Update the state of a control loop element. + * + * @param id the ID of the control loop element to update the state on + * @param currentState the state of the control loop element + * @param newState the state of the control loop element + * @return ControlLoopElement updated control loop element + */ + ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState currentState, + ControlLoopState newState); + + /** + * Update the control loop element statistics. + * + * @param id the ID of the control loop element to update the state on + * @param elementStatistics the updated statistics + */ + void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics); + + /** + * Return participantHandler, This will not be used in real world, but for junits, + * if participantHandler is not returned, there is no way to test state change messages + * without dmaap simulator. + * + */ + ParticipantHandler getParticipantHandler(); +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java new file mode 100644 index 000000000..d7cc4b2ed --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.api; + +import org.onap.policy.clamp.controlloop.participant.intermediary.api.impl.ParticipantIntermediaryApiImpl; + +/** + * Factory class for creating {@link ParticipantIntermediaryApi} instances. + */ +public class ParticipantIntermediaryFactory { + + /** + * Create an implementation of the {@link ParticipantIntermediaryApi} interface. + * + * @return the implementation of the API + */ + public ParticipantIntermediaryApi createApiImplementation() { + return new ParticipantIntermediaryApiImpl(); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java new file mode 100644 index 000000000..839088d72 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java @@ -0,0 +1,133 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.api.impl; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatistics; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.IntermediaryActivator; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; + +/** + * This class is api implementation used by participant intermediary. + */ +public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryApi { + + // The activator for the participant intermediary + private IntermediaryActivator activator; + + @Override + public void init(ParticipantIntermediaryParameters parameters) { + activator = new IntermediaryActivator(parameters); + + activator.start(); + } + + @Override + public void close() { + activator.shutdown(); + } + + @Override + public void registerControlLoopElementListener(ControlLoopElementListener controlLoopElementListener) { + activator.getParticipantHandler().getControlLoopHandler() + .registerControlLoopElementListener(controlLoopElementListener); + } + + @Override + public List<Participant> getParticipants(String name, String version) { + return List.of(activator.getParticipantHandler().getParticipant(name, version)); + } + + @Override + public Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state) { + return activator.getParticipantHandler().updateParticipantState(definition, state); + } + + @Override + public void updateParticipantStatistics(ParticipantStatistics participantStatistics) { + // TODO Auto-generated method stub + } + + @Override + public ControlLoops getControlLoops(String name, String version) { + return activator.getParticipantHandler().getControlLoopHandler().getControlLoops(); + } + + @Override + public Map<UUID, ControlLoopElement> getControlLoopElements(String name, String version) { + List<ControlLoop> controlLoops = activator.getParticipantHandler() + .getControlLoopHandler().getControlLoops().getControlLoopList(); + + for (ControlLoop controlLoop : controlLoops) { + if (name.equals(controlLoop.getDefinition().getName())) { + return controlLoop.getElements(); + } + } + return new LinkedHashMap<>(); + } + + @Override + public ControlLoopElement getControlLoopElement(UUID id) { + List<ControlLoop> controlLoops = activator.getParticipantHandler() + .getControlLoopHandler().getControlLoops().getControlLoopList(); + + for (ControlLoop controlLoop : controlLoops) { + ControlLoopElement clElement = controlLoop.getElements().get(id); + if (clElement != null) { + return clElement; + } + } + return null; + } + + @Override + public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState currentState, + ControlLoopState newState) { + return activator.getParticipantHandler().getControlLoopHandler() + .updateControlLoopElementState(id, currentState, newState); + } + + @Override + public void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics) { + activator.getParticipantHandler().getControlLoopHandler() + .updateControlLoopElementStatistics(id, elementStatistics); + } + + @Override + public ParticipantHandler getParticipantHandler() { + return activator.getParticipantHandler(); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java new file mode 100644 index 000000000..50b8b9cdc --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.io.Closeable; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listener for Participant State Change messages sent by CLAMP. + */ +public class ControlLoopStateChangeListener extends ScoListener<ParticipantControlLoopStateChange> + implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopStateChangeListener.class); + + private final ParticipantHandler participantHandler; + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ControlLoopStateChangeListener(final ParticipantHandler participantHandler) { + super(ParticipantControlLoopStateChange.class); + this.participantHandler = participantHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantControlLoopStateChange controlLoopStateChangeMsg) { + LOGGER.debug("Control Loop State Change received from CLAMP - {}", controlLoopStateChangeMsg); + + if (participantHandler.canHandle(controlLoopStateChangeMsg)) { + LOGGER.debug("Message for this participant"); + participantHandler.getControlLoopHandler().handleControlLoopStateChange(controlLoopStateChangeMsg); + } else { + LOGGER.debug("Message not for this participant"); + } + } + + @Override + public void close() { + // No explicit action on this class + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java new file mode 100644 index 000000000..ab2437c1c --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.io.Closeable; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listener for Control Loop Update messages sent by CLAMP. + */ +public class ControlLoopUpdateListener extends ScoListener<ParticipantControlLoopUpdate> implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdateListener.class); + + private final ParticipantHandler participantHandler; + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ControlLoopUpdateListener(final ParticipantHandler participantHandler) { + super(ParticipantControlLoopUpdate.class); + this.participantHandler = participantHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantControlLoopUpdate participantControlLoopUpdateMsg) { + LOGGER.debug("Control Loop update received from CLAMP - {}", participantControlLoopUpdateMsg); + + if (participantHandler.canHandle(participantControlLoopUpdateMsg)) { + LOGGER.debug("Message for this participant"); + participantHandler.getControlLoopHandler().handleControlLoopUpdate(participantControlLoopUpdateMsg); + } else { + LOGGER.debug("Message not for this participant"); + } + } + + @Override + public void close() { + // No explicit action on this class + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java new file mode 100644 index 000000000..3128f1eaa --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java @@ -0,0 +1,162 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.io.Closeable; +import java.time.Instant; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class sends messages from participants to CLAMP. + */ +public class MessageSender extends TimerTask implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class); + + private final ParticipantHandler participantHandler; + private final ParticipantStatusPublisher publisher; + private ScheduledExecutorService timerPool; + + /** + * Constructor, set the publisher. + * + * @param participantHandler the participant handler to use for gathering information + * @param publisher the publisher to use for sending messages + * @param interval time interval to send Participant Status periodic messages + */ + public MessageSender(ParticipantHandler participantHandler, ParticipantStatusPublisher publisher, + long interval) { + this.participantHandler = participantHandler; + this.publisher = publisher; + + // Kick off the timer + timerPool = makeTimerPool(); + timerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.SECONDS); + } + + @Override + public void run() { + LOGGER.debug("Sent heartbeat to CLAMP"); + + ParticipantResponseDetails response = new ParticipantResponseDetails(); + + response.setResponseTo(null); + response.setResponseStatus(ParticipantResponseStatus.PERIODIC); + response.setResponseMessage("Periodic response from participant"); + } + + @Override + public void close() { + timerPool.shutdown(); + } + + /** + * Send a response message for this participant. + * + * @param response the details to include in the response message + */ + public void sendResponse(ParticipantResponseDetails response) { + sendResponse(null, response); + } + + /** + * Dispatch a response message for this participant. + * + * @param controlLoopId the control loop to which this message is a response + * @param response the details to include in the response message + */ + public void sendResponse(ToscaConceptIdentifier controlLoopId, ParticipantResponseDetails response) { + ParticipantStatus status = new ParticipantStatus(); + + // Participant related fields + status.setParticipantType(participantHandler.getParticipantType()); + status.setParticipantId(participantHandler.getParticipantId()); + status.setState(participantHandler.getState()); + status.setHealthStatus(participantHandler.getHealthStatus()); + + // Control loop related fields + ControlLoops controlLoops = participantHandler.getControlLoopHandler().getControlLoops(); + status.setControlLoopId(controlLoopId); + status.setControlLoops(controlLoops); + status.setResponse(response); + + ParticipantStatistics participantStatistics = new ParticipantStatistics(); + participantStatistics.setTimeStamp(Instant.now()); + participantStatistics.setParticipantId(participantHandler.getParticipantId()); + participantStatistics.setHealthStatus(participantHandler.getHealthStatus()); + participantStatistics.setState(participantHandler.getState()); + status.setParticipantStatistics(participantStatistics); + + for (ControlLoopElementListener clElementListener : + participantHandler.getControlLoopHandler().getListeners()) { + updateClElementStatistics(controlLoops, clElementListener); + } + + status.setControlLoops(controlLoops); + + publisher.send(status); + } + + /** + * Update ControlLoopElement statistics. The control loop elements listening will be + * notified to retrieve statistics from respective controlloop elements, and controlloopelements + * data on the handler will be updated. + * + * @param controlLoops the control loops + * @param clElementListener control loop element listener + */ + public void updateClElementStatistics(ControlLoops controlLoops, ControlLoopElementListener clElementListener) { + for (ControlLoop controlLoop : controlLoops.getControlLoopList()) { + for (ControlLoopElement element : controlLoop.getElements().values()) { + try { + clElementListener.handleStatistics(element.getId()); + } catch (PfModelException e) { + LOGGER.debug("Getting statistics for Control loop element failed"); + } + } + } + } + + /** + * Makes a new timer pool. + * + * @return a new timer pool + */ + protected ScheduledExecutorService makeTimerPool() { + return Executors.newScheduledThreadPool(1); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java new file mode 100644 index 000000000..e0e6be329 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.io.Closeable; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listener for Participant health status messages sent by CLAMP. + */ +public class ParticipantHealthCheckListener extends ScoListener<ParticipantHealthCheck> implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHealthCheckListener.class); + + private final ParticipantHandler participantHandler; + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state and health of the participant + */ + public ParticipantHealthCheckListener(final ParticipantHandler participantHandler) { + super(ParticipantHealthCheck.class); + this.participantHandler = participantHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantHealthCheck participantHealthCheckMsg) { + LOGGER.debug("Participant Health Check message received from CLAMP - {}", participantHealthCheckMsg); + + + if (participantHandler.canHandle(participantHealthCheckMsg)) { + LOGGER.debug("Message for this participant"); + participantHandler.handleParticipantHealthCheck(participantHealthCheckMsg); + } else { + LOGGER.debug("Message not for this participant"); + } + + } + + @Override + public void close() { + // No explicit action on this class + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java new file mode 100644 index 000000000..c1a8b5b4a --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.io.Closeable; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listener for Participant State Change messages sent by CLAMP. + * + */ +public class ParticipantStateChangeListener extends ScoListener<ParticipantStateChange> implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStateChangeListener.class); + + private final ParticipantHandler participantHandler; + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ParticipantStateChangeListener(final ParticipantHandler participantHandler) { + super(ParticipantStateChange.class); + this.participantHandler = participantHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantStateChange participantStateChangeMsg) { + LOGGER.debug("Participant State Change received from CLAMP - {}", participantStateChangeMsg); + + if (participantHandler.canHandle(participantStateChangeMsg)) { + LOGGER.debug("Message for this participant"); + participantHandler.handleParticipantStateChange(participantStateChangeMsg); + } else { + LOGGER.debug("Message not for this participant"); + } + } + + @Override + public void close() { + // No explicit action on this class + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java new file mode 100644 index 000000000..bc53b4e9d --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java @@ -0,0 +1,63 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.io.Closeable; +import java.util.List; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to send Participant Status messages to clamp using TopicSinkClient. + * + */ +public class ParticipantStatusPublisher implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusPublisher.class); + + private final TopicSinkClient topicSinkClient; + + /** + * Constructor for instantiating ParticipantStatusPublisher. + * + * @param topicSinks the topic sinks + */ + public ParticipantStatusPublisher(List<TopicSink> topicSinks) { + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + } + + /** + * Method to send Participant Status message to clamp on demand. + * + * @param participantStatus the Participant Status + */ + public void send(final ParticipantStatus participantStatus) { + topicSinkClient.send(participantStatus); + LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus); + } + + @Override + public void close() { + // No explicit action on this class + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java new file mode 100644 index 000000000..cc2a66b58 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java @@ -0,0 +1,323 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import lombok.Getter; +import org.apache.commons.collections4.CollectionUtils; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatistics; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * This class is responsible for managing the state of all control loops in the participant. + */ +public class ControlLoopHandler implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopHandler.class); + + private ToscaConceptIdentifier participantType = null; + private ToscaConceptIdentifier participantId = null; + private MessageSender messageSender = null; + + private final Map<ToscaConceptIdentifier, ControlLoop> controlLoopMap = new LinkedHashMap<>(); + + private final Map<UUID, ControlLoopElement> elementsOnThisParticipant = new LinkedHashMap<>(); + + @Getter + private List<ControlLoopElementListener> listeners = new ArrayList<>(); + + public ControlLoopHandler() {} + + /** + * Constructor, set the participant ID and messageSender. + * + * @param parameters the parameters of the participant + * @param messageSender the messageSender for sending responses to messages + */ + public ControlLoopHandler(ParticipantIntermediaryParameters parameters, MessageSender messageSender) { + this.participantType = parameters.getParticipantType(); + this.participantId = parameters.getParticipantId(); + this.messageSender = messageSender; + } + + @Override + public void close() { + // No explicit action on this class + } + + public void registerControlLoopElementListener(ControlLoopElementListener listener) { + listeners.add(listener); + } + + /** + * Handle a control loop element state change message. + * + * @param id controlloop element id + * @param orderedState the current state + * @param newState the ordered state + * @return controlLoopElement the updated controlloop element + */ + public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState orderedState, + ControlLoopState newState) { + + if (id == null) { + return null; + } + + ControlLoopElement clElement = elementsOnThisParticipant.get(id); + if (clElement != null) { + clElement.setOrderedState(orderedState); + clElement.setState(newState); + LOGGER.debug("Control loop element {} state changed to {}", id, newState); + ParticipantResponseDetails response = new ParticipantResponseDetails(); + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage("ControlLoopElement state changed to {} " + newState); + messageSender.sendResponse(response); + return clElement; + } + + return null; + } + + /** + * Handle a control loop element statistics. + * + * @param id controlloop element id + * @param elementStatistics control loop element Statistics + */ + public void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics) { + ControlLoopElement clElement = elementsOnThisParticipant.get(id); + if (clElement != null) { + elementStatistics.setParticipantId(participantId); + elementStatistics.setId(id); + clElement.setClElementStatistics(elementStatistics); + } + } + + /** + * Handle a control loop state change message. + * + * @param stateChangeMsg the state change message + */ + public void handleControlLoopStateChange(ParticipantControlLoopStateChange stateChangeMsg) { + if (stateChangeMsg.getControlLoopId() == null) { + return; + } + + ControlLoop controlLoop = controlLoopMap.get(stateChangeMsg.getControlLoopId()); + + if (controlLoop == null) { + LOGGER.debug("Control loop {} does not use this participant", stateChangeMsg.getControlLoopId()); + return; + } + + ParticipantResponseDetails response = new ParticipantResponseDetails(stateChangeMsg); + handleState(controlLoop, response, stateChangeMsg.getOrderedState()); + messageSender.sendResponse(response); + } + + /** + * Method to handle state changes. + * + * @param controlLoop participant response + * @param response participant response + * @param orderedState controlloop ordered state + */ + private void handleState(final ControlLoop controlLoop, final ParticipantResponseDetails response, + ControlLoopOrderedState orderedState) { + switch (orderedState) { + case UNINITIALISED: + handleUninitialisedState(controlLoop, orderedState, response); + break; + case PASSIVE: + handlePassiveState(controlLoop, orderedState, response); + break; + case RUNNING: + handleRunningState(controlLoop, orderedState, response); + break; + default: + LOGGER.debug("StateChange message has no state, state is null {}", controlLoop.getDefinition()); + break; + } + } + + /** + * Handle a control loop update message. + * + * @param updateMsg the update message + */ + public void handleControlLoopUpdate(ParticipantControlLoopUpdate updateMsg) { + + if (!updateMsg.appliesTo(participantType, participantId)) { + return; + } + + ControlLoop controlLoop = controlLoopMap.get(updateMsg.getControlLoopId()); + + ParticipantResponseDetails response = new ParticipantResponseDetails(updateMsg); + + // TODO: Updates to existing ControlLoops are not supported yet (Addition/Removal of ControlLoop + // elements to existing ControlLoop has to be supported). + if (controlLoop != null) { + response.setResponseStatus(ParticipantResponseStatus.FAIL); + response.setResponseMessage("Control loop " + updateMsg.getControlLoopId() + + " already defined on participant " + participantId); + + messageSender.sendResponse(response); + return; + } + + controlLoop = updateMsg.getControlLoop(); + controlLoop.getElements().values().removeIf(element -> !participantType.equals(element.getParticipantType())); + + controlLoopMap.put(updateMsg.getControlLoopId(), controlLoop); + for (ControlLoopElement element : updateMsg.getControlLoop().getElements().values()) { + element.setState(element.getOrderedState().asState()); + element.setParticipantId(participantId); + elementsOnThisParticipant.put(element.getId(), element); + } + + for (ControlLoopElementListener clElementListener : listeners) { + try { + for (ControlLoopElement element : updateMsg.getControlLoop().getElements().values()) { + clElementListener.controlLoopElementUpdate(element, updateMsg.getControlLoopDefinition()); + } + } catch (PfModelException e) { + LOGGER.debug("Control loop element update failed {}", updateMsg.getControlLoopId()); + } + } + + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage( + "Control loop " + updateMsg.getControlLoopId() + " defined on participant " + participantId); + + messageSender.sendResponse(response); + } + + /** + * Method to handle when the new state from participant is UNINITIALISED state. + * + * @param controlLoop participant response + * @param orderedState orderedState + * @param response participant response + */ + private void handleUninitialisedState(final ControlLoop controlLoop, final ControlLoopOrderedState orderedState, + final ParticipantResponseDetails response) { + handleStateChange(controlLoop, orderedState, ControlLoopState.UNINITIALISED, response); + controlLoopMap.remove(controlLoop.getKey().asIdentifier()); + + for (ControlLoopElementListener clElementListener : listeners) { + try { + for (ControlLoopElement element : controlLoop.getElements().values()) { + clElementListener.controlLoopElementStateChange(element.getId(), element.getState(), + orderedState); + } + } catch (PfModelException e) { + LOGGER.debug("Control loop element update failed {}", controlLoop.getDefinition()); + } + } + } + + /** + * Method to handle when the new state from participant is PASSIVE state. + * + * @param controlLoop participant response + * @param orderedState orderedState + * @param response participant response + */ + private void handlePassiveState(final ControlLoop controlLoop, final ControlLoopOrderedState orderedState, + final ParticipantResponseDetails response) { + handleStateChange(controlLoop, orderedState, ControlLoopState.PASSIVE, response); + } + + /** + * Method to handle when the new state from participant is RUNNING state. + * + * @param controlLoop participant response + * @param orderedState orderedState + * @param response participant response + */ + private void handleRunningState(final ControlLoop controlLoop, final ControlLoopOrderedState orderedState, + final ParticipantResponseDetails response) { + handleStateChange(controlLoop, orderedState, ControlLoopState.RUNNING, response); + } + + /** + * Method to update the state of control loop elements. + * + * @param controlLoop participant status in memory + * @param orderedState orderedState + * @param state new state of the control loop elements + */ + private void handleStateChange(ControlLoop controlLoop, final ControlLoopOrderedState orderedState, + ControlLoopState newState, ParticipantResponseDetails response) { + + if (orderedState.equals(controlLoop.getOrderedState())) { + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage("Control loop is already in state " + orderedState); + return; + } + + if (!CollectionUtils.isEmpty(controlLoop.getElements().values())) { + controlLoop.getElements().values().forEach(element -> { + element.setState(newState); + element.setOrderedState(orderedState); + } + ); + } + + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage("ControlLoop state changed from " + controlLoop.getOrderedState() + + " to " + orderedState); + controlLoop.setOrderedState(orderedState); + } + + + /** + * Get control loops as a {@link ConrolLoops} class. + * + * @return the control loops + */ + public ControlLoops getControlLoops() { + ControlLoops controlLoops = new ControlLoops(); + controlLoops.setControlLoopList(new ArrayList<>(controlLoopMap.values())); + return controlLoops; + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java new file mode 100644 index 000000000..be2fa1a30 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -0,0 +1,165 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import javax.ws.rs.core.Response.Status; +import lombok.Getter; +import lombok.experimental.Delegate; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantHealthCheckListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStateChangeListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.services.ServiceManagerContainer; + +/** + * This class activates the Participant Intermediary together with all its handlers. + */ +public class IntermediaryActivator extends ServiceManagerContainer { + // Name of the message type for messages on topics + private static final String[] MSG_TYPE_NAMES = {"messageType"}; + + @Getter + private final ParticipantIntermediaryParameters parameters; + + // Topics from which the participant receives and to which the participant sends messages + private List<TopicSink> topicSinks; + private List<TopicSource> topicSources; + + // The participant handler for this intermediary + final AtomicReference<ParticipantHandler> participantHandler = new AtomicReference<>(); + + /** + * Listens for messages on the topic, decodes them into a message, and then dispatches them. + */ + private final MessageTypeDispatcher msgDispatcher; + + /** + * Instantiate the activator for participant. + * + * @param parameters the parameters for the participant intermediary + */ + public IntermediaryActivator(final ParticipantIntermediaryParameters parameters) { + this.parameters = parameters; + + topicSinks = + TopicEndpointManager.getManager().addTopicSinks(parameters.getClampControlLoopTopics().getTopicSinks()); + + topicSources = TopicEndpointManager.getManager() + .addTopicSources(parameters.getClampControlLoopTopics().getTopicSources()); + + try { + this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + } catch (final RuntimeException e) { + throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, + "topic message dispatcher failed to start", e); + } + + // @formatter:off + final AtomicReference<ParticipantStatusPublisher> statusPublisher = new AtomicReference<>(); + final AtomicReference<ParticipantStateChangeListener> participantStateChangeListener = new AtomicReference<>(); + final AtomicReference<ParticipantHealthCheckListener> participantHealthCheckListener = new AtomicReference<>(); + final AtomicReference<ControlLoopStateChangeListener> controlLoopStateChangeListener = new AtomicReference<>(); + final AtomicReference<ControlLoopUpdateListener> controlLoopUpdateListener = new AtomicReference<>(); + + addAction("Topic endpoint management", + () -> TopicEndpointManager.getManager().start(), + () -> TopicEndpointManager.getManager().shutdown()); + + addAction("Participant Status Publisher", + () -> statusPublisher.set(new ParticipantStatusPublisher(topicSinks)), + () -> statusPublisher.get().close()); + + addAction("Participant Handler", + () -> participantHandler.set(new ParticipantHandler(parameters, statusPublisher.get())), + () -> participantHandler.get().close()); + + addAction("Participant State Change Listener", + () -> participantStateChangeListener.set(new ParticipantStateChangeListener(participantHandler.get())), + () -> participantStateChangeListener.get().close()); + + addAction("Participant Health Check Listener", + () -> participantHealthCheckListener.set(new ParticipantHealthCheckListener(participantHandler.get())), + () -> participantHealthCheckListener.get().close()); + + addAction("Control Loop State Change Listener", + () -> controlLoopStateChangeListener.set(new ControlLoopStateChangeListener(participantHandler.get())), + () -> controlLoopStateChangeListener.get().close()); + + addAction("Control Loop Update Listener", + () -> controlLoopUpdateListener.set(new ControlLoopUpdateListener(participantHandler.get())), + () -> controlLoopUpdateListener.get().close()); + + addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); + // @formatter:on + } + + /** + * Registers the dispatcher with the topic source(s). + */ + private void registerMsgDispatcher() { + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATE_CHANGE.name(), + (ScoListener<ParticipantStateChange>) new ParticipantStateChangeListener( + participantHandler.get())); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_HEALTH_CHECK.name(), + (ScoListener<ParticipantHealthCheck>) new ParticipantHealthCheckListener( + participantHandler.get())); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_STATE_CHANGE.name(), + (ScoListener<ParticipantControlLoopStateChange>) new ControlLoopStateChangeListener( + participantHandler.get())); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_UPDATE.name(), + (ScoListener<ParticipantControlLoopUpdate>) new ControlLoopUpdateListener( + participantHandler.get())); + for (final TopicSource source : topicSources) { + source.register(msgDispatcher); + } + } + + /** + * Unregisters the dispatcher from the topic source(s). + */ + private void unregisterMsgDispatcher() { + for (final TopicSource source : topicSources) { + source.unregister(msgDispatcher); + } + } + + /** + * Return the participant handler. + */ + public ParticipantHandler getParticipantHandler() { + return participantHandler.get(); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java new file mode 100644 index 000000000..980ab6ec1 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -0,0 +1,235 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.handler; + +import java.io.Closeable; +import java.util.Objects; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is responsible for managing the state of a participant. + */ +@Getter +public class ParticipantHandler implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHandler.class); + + private final ToscaConceptIdentifier participantType; + private final ToscaConceptIdentifier participantId; + private final MessageSender sender; + private final ControlLoopHandler controlLoopHandler; + private final ParticipantStatistics participantStatistics; + + @Setter + private ParticipantState state = ParticipantState.UNKNOWN; + + @Setter + private ParticipantHealthStatus healthStatus = ParticipantHealthStatus.UNKNOWN; + + /** + * Constructor, set the participant ID and sender. + * + * @param parameters the parameters of the participant + * @param publisher the publisher for sending responses to messages + */ + public ParticipantHandler(ParticipantIntermediaryParameters parameters, ParticipantStatusPublisher publisher) { + this.participantType = parameters.getParticipantType(); + this.participantId = parameters.getParticipantId(); + this.sender = new MessageSender(this, publisher, parameters.getReportingTimeInterval()); + this.controlLoopHandler = new ControlLoopHandler(parameters, sender); + this.participantStatistics = new ParticipantStatistics(); + } + + @Override + public void close() { + sender.close(); + controlLoopHandler.close(); + } + + /** + * Method which handles a participant state change event from clamp. + * + * @param stateChangeMsg participant state change message + */ + public void handleParticipantStateChange(final ParticipantStateChange stateChangeMsg) { + + if (!stateChangeMsg.appliesTo(participantType, participantId)) { + return; + } + + ParticipantResponseDetails response = new ParticipantResponseDetails(stateChangeMsg); + + switch (stateChangeMsg.getState()) { + case PASSIVE: + handlePassiveState(response); + break; + case ACTIVE: + handleActiveState(response); + break; + case SAFE: + handleSafeState(response); + break; + case TEST: + handleTestState(response); + break; + case TERMINATED: + handleTerminatedState(response); + break; + default: + LOGGER.debug("StateChange message has no state, state is null {}", stateChangeMsg.getParticipantId()); + response.setResponseStatus(ParticipantResponseStatus.FAIL); + response.setResponseMessage("StateChange message has invalid state for participantId " + + stateChangeMsg.getParticipantId()); + break; + } + + sender.sendResponse(response); + } + + + /** + * Method which handles a participant health check event from clamp. + * + * @param healthCheckMsg participant health check message + */ + public void handleParticipantHealthCheck(final ParticipantHealthCheck healthCheckMsg) { + ParticipantResponseDetails response = new ParticipantResponseDetails(healthCheckMsg); + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage(healthStatus.toString()); + + sender.sendResponse(response); + } + + /** + * Method to handle when the new state from participant is active. + * + * @param response participant response + */ + private void handleActiveState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.ACTIVE, response); + } + + /** + * Method to handle when the new state from participant is passive. + * + * @param response participant response + */ + private void handlePassiveState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.PASSIVE, response); + } + + /** + * Method to handle when the new state from participant is safe. + * + * @param response participant response + */ + private void handleSafeState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.SAFE, response); + } + + /** + * Method to handle when the new state from participant is TEST. + * + * @param response participant response + */ + private void handleTestState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.TEST, response); + } + + /** + * Method to handle when the new state from participant is Terminated. + * + * @param response participant response + */ + private void handleTerminatedState(final ParticipantResponseDetails response) { + handleStateChange(ParticipantState.TERMINATED, response); + } + + private void handleStateChange(ParticipantState newParticipantState, ParticipantResponseDetails response) { + if (state.equals(newParticipantState)) { + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage("Participant already in state " + newParticipantState); + } else { + response.setResponseStatus(ParticipantResponseStatus.SUCCESS); + response.setResponseMessage("Participant state changed from " + state + " to " + newParticipantState); + state = newParticipantState; + } + } + + /** + * Method to update participant state. + * + * @param definition participant definition + * @param participantState participant state + */ + public Participant updateParticipantState(ToscaConceptIdentifier definition, + ParticipantState participantState) { + if (!Objects.equals(definition, participantId)) { + LOGGER.debug("No participant with this ID {}", definition.getName()); + return null; + } + ParticipantResponseDetails response = new ParticipantResponseDetails(); + handleStateChange(participantState, response); + sender.sendResponse(response); + return getParticipant(definition.getName(), definition.getVersion()); + } + + /** + * Get participants as a {@link Participant} class. + * + * @return the participant + */ + public Participant getParticipant(String name, String version) { + if (participantId.getName().equals(name)) { + Participant participant = new Participant(); + participant.setDefinition(participantId); + participant.setParticipantState(state); + participant.setHealthStatus(healthStatus); + return participant; + } + return null; + } + + /** + * Check if a participant message applies to this participant handler. + * + * @param partipantMsg the message to check + * @return true if it applies, false otherwise + */ + public boolean canHandle(ParticipantMessage partipantMsg) { + return partipantMsg.appliesTo(participantType, participantId); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantIntermediaryParameters.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantIntermediaryParameters.java new file mode 100644 index 000000000..8e3440e42 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantIntermediaryParameters.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.parameters; + +import lombok.Getter; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.parameters.ParameterGroupImpl; +import org.onap.policy.common.parameters.annotations.NotBlank; +import org.onap.policy.common.parameters.annotations.NotNull; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; + +/** + * Class to hold all parameters needed for participant component. + */ +@NotNull +@NotBlank +@Getter +public class ParticipantIntermediaryParameters extends ParameterGroupImpl { + // The ID and description of this participant + private ToscaConceptIdentifier participantId; + private String description; + + // The participant type of this participant + private ToscaConceptIdentifier participantType; + + // The time interval for periodic reporting of status to the CLAMP control loop server + private long reportingTimeInterval; + + // DMaaP topics for communicating with the CLAMP control loop server + private TopicParameterGroup clampControlLoopTopics; + + /** + * Create the participant parameter group. + * + * @param instanceId instance id of the event. + */ + public ParticipantIntermediaryParameters(final String instanceId) { + super(instanceId); + } +} |