summaryrefslogtreecommitdiffstats
path: root/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java')
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java518
1 files changed, 518 insertions, 0 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
new file mode 100644
index 000000000..055acb28f
--- /dev/null
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
@@ -0,0 +1,518 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.supervision;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import lombok.AllArgsConstructor;
+import org.apache.commons.collections4.CollectionUtils;
+import org.onap.policy.clamp.acm.runtime.monitoring.MonitoringProvider;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionUpdatePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantUpdatePublisher;
+import org.onap.policy.clamp.common.acm.exception.AutomationCompositionException;
+import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementAck;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionState;
+import org.onap.policy.clamp.models.acm.concepts.Participant;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantHealthStatus;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
+import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionAck;
+import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantDeregister;
+import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantMessage;
+import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantRegister;
+import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ServiceTemplateProvider;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class handles supervision of automation composition instances, so only one object of this type should be built
+ * at a time.
+ *
+ * <p/>
+ * It is effectively a singleton that is started at system start.
+ */
+@Component
+@AllArgsConstructor
+public class SupervisionHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class);
+
+ private static final String AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE =
+ "Automation composition can't transition from state ";
+ private static final String AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE =
+ "Automation composition is already in state ";
+ private static final String TO_STATE = " to state ";
+ private static final String AND_TRANSITIONING_TO_STATE = " and transitioning to state ";
+
+ private final AutomationCompositionProvider automationCompositionProvider;
+ private final ParticipantProvider participantProvider;
+ private final MonitoringProvider monitoringProvider;
+ private final ServiceTemplateProvider serviceTemplateProvider;
+
+ // Publishers for participant communication
+ private final AutomationCompositionUpdatePublisher automationCompositionUpdatePublisher;
+ private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
+ private final ParticipantRegisterAckPublisher participantRegisterAckPublisher;
+ private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
+ private final ParticipantUpdatePublisher participantUpdatePublisher;
+
+ /**
+ * Supervision trigger called when a command is issued on automation compositions.
+ *
+ * <p/>
+ * Causes supervision to start or continue supervision on the automation compositions in question.
+ *
+ * @param automationCompositionIdentifierList the automation compositions for which the supervision command has been
+ * issued
+ * @throws AutomationCompositionException on supervision triggering exceptions
+ */
+ public void triggerAutomationCompositionSupervision(
+ List<ToscaConceptIdentifier> automationCompositionIdentifierList) throws AutomationCompositionException {
+
+ LOGGER.debug("triggering automation composition supervision on automation compositions {}",
+ automationCompositionIdentifierList);
+
+ if (CollectionUtils.isEmpty(automationCompositionIdentifierList)) {
+ // This is just to force throwing of the exception in certain circumstances.
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ "The list of automation compositions for supervision is empty");
+ }
+
+ for (ToscaConceptIdentifier automationCompositionId : automationCompositionIdentifierList) {
+ try {
+ var automationComposition =
+ automationCompositionProvider.getAutomationComposition(automationCompositionId);
+
+ superviseAutomationComposition(automationComposition);
+
+ automationCompositionProvider.saveAutomationComposition(automationComposition);
+ } catch (PfModelException pfme) {
+ throw new AutomationCompositionException(pfme.getErrorResponse().getResponseCode(), pfme.getMessage(),
+ pfme);
+ }
+ }
+ }
+
+ /**
+ * Handle a ParticipantStatus message from a participant.
+ *
+ * @param participantStatusMessage the ParticipantStatus message received from a participant
+ */
+ @MessageIntercept
+ public void handleParticipantMessage(ParticipantStatus participantStatusMessage) {
+ LOGGER.debug("Participant Status received {}", participantStatusMessage);
+ try {
+ superviseParticipant(participantStatusMessage);
+ } catch (PfModelException | AutomationCompositionException svExc) {
+ LOGGER.warn("error supervising participant {}", participantStatusMessage.getParticipantId(), svExc);
+ return;
+ }
+
+ try {
+ superviseAutomationCompositions(participantStatusMessage);
+ } catch (PfModelException | AutomationCompositionException svExc) {
+ LOGGER.warn("error supervising participant {}", participantStatusMessage.getParticipantId(), svExc);
+ }
+ }
+
+ /**
+ * Handle a ParticipantRegister message from a participant.
+ *
+ * @param participantRegisterMessage the ParticipantRegister message received from a participant
+ */
+ @MessageIntercept
+ public boolean handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
+ LOGGER.debug("Participant Register received {}", participantRegisterMessage);
+ try {
+ checkParticipant(participantRegisterMessage, ParticipantState.UNKNOWN, ParticipantHealthStatus.UNKNOWN);
+ } catch (PfModelException | AutomationCompositionException svExc) {
+ LOGGER.warn("error saving participant {}", participantRegisterMessage.getParticipantId(), svExc);
+ }
+
+ var isCommissioning = participantUpdatePublisher.sendCommissioning(null, null,
+ participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType());
+
+ participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId(),
+ participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType());
+ return isCommissioning;
+ }
+
+ /**
+ * Handle a ParticipantDeregister message from a participant.
+ *
+ * @param participantDeregisterMessage the ParticipantDeregister message received from a participant
+ */
+ @MessageIntercept
+ public void handleParticipantMessage(ParticipantDeregister participantDeregisterMessage) {
+ LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage);
+ try {
+ var participantOpt =
+ participantProvider.findParticipant(participantDeregisterMessage.getParticipantId().getName(),
+ participantDeregisterMessage.getParticipantId().getVersion());
+
+ if (participantOpt.isPresent()) {
+ var participant = participantOpt.get();
+ participant.setParticipantState(ParticipantState.TERMINATED);
+ participant.setHealthStatus(ParticipantHealthStatus.OFF_LINE);
+ participantProvider.saveParticipant(participant);
+ }
+ } catch (PfModelException pfme) {
+ LOGGER.warn("Model exception occured with participant id {}",
+ participantDeregisterMessage.getParticipantId());
+ }
+
+ participantDeregisterAckPublisher.send(participantDeregisterMessage.getMessageId());
+ }
+
+ /**
+ * Handle a ParticipantUpdateAck message from a participant.
+ *
+ * @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant
+ */
+ @MessageIntercept
+ public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) {
+ LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage);
+ try {
+ var participantOpt =
+ participantProvider.findParticipant(participantUpdateAckMessage.getParticipantId().getName(),
+ participantUpdateAckMessage.getParticipantId().getVersion());
+
+ if (participantOpt.isPresent()) {
+ var participant = participantOpt.get();
+ participant.setParticipantState(participantUpdateAckMessage.getState());
+ participantProvider.saveParticipant(participant);
+ } else {
+ LOGGER.warn("Participant not found in database {}", participantUpdateAckMessage.getParticipantId());
+ }
+ } catch (PfModelException pfme) {
+ LOGGER.warn("Model exception occured with participant id {}",
+ participantUpdateAckMessage.getParticipantId());
+ }
+ }
+
+ /**
+ * Send commissioning update message to dmaap.
+ *
+ * @param name the ToscaServiceTemplate name
+ * @param version the ToscaServiceTemplate version
+ */
+ public void handleSendCommissionMessage(String name, String version) {
+ LOGGER.debug("Participant update message with serviveTemplate {} {} being sent to all participants", name,
+ version);
+ participantUpdatePublisher.sendComissioningBroadcast(name, version);
+ }
+
+ /**
+ * Send decommissioning update message to dmaap.
+ *
+ */
+ public void handleSendDeCommissionMessage() {
+ LOGGER.debug("Participant update message being sent");
+ participantUpdatePublisher.sendDecomisioning();
+ }
+
+ /**
+ * Handle a AutomationComposition update acknowledge message from a participant.
+ *
+ * @param automationCompositionAckMessage the AutomationCompositionAck message received from a participant
+ */
+ @MessageIntercept
+ public void handleAutomationCompositionUpdateAckMessage(AutomationCompositionAck automationCompositionAckMessage) {
+ LOGGER.debug("AutomationComposition Update Ack message received {}", automationCompositionAckMessage);
+ setAcElementStateInDb(automationCompositionAckMessage);
+ }
+
+ /**
+ * Handle a AutomationComposition statechange acknowledge message from a participant.
+ *
+ * @param automationCompositionAckMessage the AutomationCompositionAck message received from a participant
+ */
+ @MessageIntercept
+ public void handleAutomationCompositionStateChangeAckMessage(
+ AutomationCompositionAck automationCompositionAckMessage) {
+ LOGGER.debug("AutomationComposition StateChange Ack message received {}", automationCompositionAckMessage);
+ setAcElementStateInDb(automationCompositionAckMessage);
+ }
+
+ private void setAcElementStateInDb(AutomationCompositionAck automationCompositionAckMessage) {
+ if (automationCompositionAckMessage.getAutomationCompositionResultMap() != null) {
+ try {
+ var automationComposition = automationCompositionProvider
+ .getAutomationComposition(automationCompositionAckMessage.getAutomationCompositionId());
+ if (automationComposition != null) {
+ var updated = updateState(automationComposition,
+ automationCompositionAckMessage.getAutomationCompositionResultMap().entrySet());
+ updated |= setPrimed(automationComposition);
+ if (updated) {
+ automationCompositionProvider.saveAutomationComposition(automationComposition);
+ }
+ } else {
+ LOGGER.warn("AutomationComposition not found in database {}",
+ automationCompositionAckMessage.getAutomationCompositionId());
+ }
+ } catch (PfModelException pfme) {
+ LOGGER.warn("Model exception occured with AutomationComposition Id {}",
+ automationCompositionAckMessage.getAutomationCompositionId());
+ }
+ }
+ }
+
+ private boolean updateState(AutomationComposition automationComposition,
+ Set<Map.Entry<UUID, AutomationCompositionElementAck>> automationCompositionResultSet) {
+ var updated = false;
+ for (var acElementAck : automationCompositionResultSet) {
+ var element = automationComposition.getElements().get(acElementAck.getKey());
+ if (element != null) {
+ element.setState(acElementAck.getValue().getState());
+ updated = true;
+ }
+ }
+ return updated;
+ }
+
+ private boolean setPrimed(AutomationComposition automationComposition) {
+ var acElements = automationComposition.getElements().values();
+ if (acElements != null) {
+ Boolean primedFlag = true;
+ var checkOpt = automationComposition.getElements().values().stream()
+ .filter(acElement -> (!acElement.getState().equals(AutomationCompositionState.PASSIVE)
+ || !acElement.getState().equals(AutomationCompositionState.RUNNING)))
+ .findAny();
+ if (checkOpt.isEmpty()) {
+ primedFlag = false;
+ }
+ automationComposition.setPrimed(primedFlag);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Supervise a automation composition, performing whatever actions need to be performed on the automation
+ * composition.
+ *
+ * @param automationComposition the automation composition to supervises
+ * @throws AutomationCompositionException on supervision errors
+ */
+ private void superviseAutomationComposition(AutomationComposition automationComposition)
+ throws AutomationCompositionException {
+ switch (automationComposition.getOrderedState()) {
+ case UNINITIALISED:
+ superviseAutomationCompositionUninitialization(automationComposition);
+ break;
+
+ case PASSIVE:
+ superviseAutomationCompositionPassivation(automationComposition);
+ break;
+
+ case RUNNING:
+ superviseAutomationCompositionActivation(automationComposition);
+ break;
+
+ default:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ "A automation composition cannot be commanded to go into state "
+ + automationComposition.getOrderedState().name());
+ }
+ }
+
+ /**
+ * Supervise a automation composition uninitialisation, performing whatever actions need to be performed on the
+ * automation composition,
+ * automation composition ordered state is UNINITIALIZED.
+ *
+ * @param automationComposition the automation composition to supervises
+ * @throws AutomationCompositionException on supervision errors
+ */
+ private void superviseAutomationCompositionUninitialization(AutomationComposition automationComposition)
+ throws AutomationCompositionException {
+ switch (automationComposition.getState()) {
+ case UNINITIALISED:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name());
+ break;
+
+ case UNINITIALISED2PASSIVE:
+ case PASSIVE:
+ automationComposition.setState(AutomationCompositionState.PASSIVE2UNINITIALISED);
+ automationCompositionStateChangePublisher.send(automationComposition,
+ getFirstStartPhase(automationComposition));
+ break;
+
+ case PASSIVE2UNINITIALISED:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()
+ + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState());
+ break;
+
+ default:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE
+ + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState());
+ break;
+ }
+ }
+
+ private void superviseAutomationCompositionPassivation(AutomationComposition automationComposition)
+ throws AutomationCompositionException {
+ switch (automationComposition.getState()) {
+ case PASSIVE:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name());
+ break;
+ case UNINITIALISED:
+ automationComposition.setState(AutomationCompositionState.UNINITIALISED2PASSIVE);
+ automationCompositionUpdatePublisher.send(automationComposition);
+ break;
+
+ case UNINITIALISED2PASSIVE:
+ case RUNNING2PASSIVE:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()
+ + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState());
+ break;
+
+ case RUNNING:
+ automationComposition.setState(AutomationCompositionState.RUNNING2PASSIVE);
+ automationCompositionStateChangePublisher.send(automationComposition,
+ getFirstStartPhase(automationComposition));
+ break;
+
+ default:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE
+ + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState());
+ break;
+ }
+ }
+
+ private void superviseAutomationCompositionActivation(AutomationComposition automationComposition)
+ throws AutomationCompositionException {
+ switch (automationComposition.getState()) {
+ case RUNNING:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name());
+ break;
+
+ case PASSIVE2RUNNING:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()
+ + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState());
+ break;
+
+ case PASSIVE:
+ automationComposition.setState(AutomationCompositionState.PASSIVE2RUNNING);
+ automationCompositionStateChangePublisher.send(automationComposition,
+ getFirstStartPhase(automationComposition));
+ break;
+
+ default:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE
+ + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState());
+ break;
+ }
+ }
+
+ private int getFirstStartPhase(AutomationComposition automationComposition) {
+ ToscaServiceTemplate toscaServiceTemplate = null;
+ try {
+ toscaServiceTemplate = serviceTemplateProvider.getAllServiceTemplates().get(0);
+ } catch (PfModelException e) {
+ throw new PfModelRuntimeException(Status.BAD_REQUEST, "Canont load ToscaServiceTemplate from DB", e);
+ }
+ return ParticipantUtils.getFirstStartPhase(automationComposition, toscaServiceTemplate);
+ }
+
+ private void checkParticipant(ParticipantMessage participantMessage, ParticipantState participantState,
+ ParticipantHealthStatus healthStatus) throws AutomationCompositionException, PfModelException {
+ if (participantMessage.getParticipantId() == null) {
+ exceptionOccured(Response.Status.NOT_FOUND, "Participant ID on PARTICIPANT_STATUS message is null");
+ }
+ var participantOpt = participantProvider.findParticipant(participantMessage.getParticipantId().getName(),
+ participantMessage.getParticipantId().getVersion());
+
+ if (participantOpt.isEmpty()) {
+ var participant = new Participant();
+ participant.setName(participantMessage.getParticipantId().getName());
+ participant.setVersion(participantMessage.getParticipantId().getVersion());
+ participant.setDefinition(participantMessage.getParticipantId());
+ participant.setParticipantType(participantMessage.getParticipantType());
+ participant.setParticipantState(participantState);
+ participant.setHealthStatus(healthStatus);
+
+ participantProvider.saveParticipant(participant);
+ } else {
+ var participant = participantOpt.get();
+ participant.setParticipantState(participantState);
+ participant.setHealthStatus(healthStatus);
+
+ participantProvider.saveParticipant(participant);
+ }
+ }
+
+ private void superviseParticipant(ParticipantStatus participantStatusMessage)
+ throws PfModelException, AutomationCompositionException {
+
+ checkParticipant(participantStatusMessage, participantStatusMessage.getState(),
+ participantStatusMessage.getHealthStatus());
+
+ monitoringProvider.createParticipantStatistics(List.of(participantStatusMessage.getParticipantStatistics()));
+ }
+
+ private void superviseAutomationCompositions(ParticipantStatus participantStatusMessage)
+ throws PfModelException, AutomationCompositionException {
+ if (participantStatusMessage.getAutomationCompositionInfoList() != null) {
+ for (AutomationCompositionInfo acEntry : participantStatusMessage.getAutomationCompositionInfoList()) {
+ var dbAutomationComposition = automationCompositionProvider
+ .getAutomationComposition(new ToscaConceptIdentifier(acEntry.getAutomationCompositionId()));
+ if (dbAutomationComposition == null) {
+ exceptionOccured(Response.Status.NOT_FOUND,
+ "PARTICIPANT_STATUS automation composition not found in database: "
+ + acEntry.getAutomationCompositionId());
+ }
+ dbAutomationComposition.setState(acEntry.getState());
+ monitoringProvider.createAcElementStatistics(
+ acEntry.getAutomationCompositionStatistics().getAcElementStatisticsList().getAcElementStatistics());
+ }
+ }
+ }
+
+ private void exceptionOccured(Response.Status status, String reason) throws AutomationCompositionException {
+ throw new AutomationCompositionException(status, reason);
+ }
+}