From b13d8dc3a73bc372dabe47ebd88ed1892ee688ea Mon Sep 17 00:00:00 2001 From: FrancescoFioraEst Date: Tue, 11 Jul 2023 11:19:34 +0100 Subject: Add support participant restart in ACM runtime Issue-ID: POLICY-4744 Change-Id: I33d31751be7ca5d7c215a2b465564d3ab0c7bee6 Signed-off-by: FrancescoFioraEst --- .../commissioning/CommissioningProvider.java | 8 +- ...AutomationCompositionInstantiationProvider.java | 17 +++- .../runtime/supervision/SupervisionAcHandler.java | 16 ++- .../acm/runtime/supervision/SupervisionAspect.java | 8 +- .../runtime/supervision/SupervisionHandler.java | 15 ++- .../supervision/SupervisionPartecipantScanner.java | 18 ++-- .../supervision/SupervisionParticipantHandler.java | 112 +++++++++++++++++---- .../comm/ParticipantRestartPublisher.java | 16 +-- 8 files changed, 162 insertions(+), 48 deletions(-) (limited to 'runtime-acm/src/main') diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java index c79e31726..961f12a5c 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java @@ -130,7 +130,7 @@ public class CommissioningProvider { var acDefinition = acDefinitionProvider.getAcDefinition(compositionId); if (!AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { throw new PfModelRuntimeException(Status.BAD_REQUEST, - "ACM not in COMMISSIONED state, Update of ACM Definition not allowed"); + "ACM not in COMMISSIONED state, Delete of ACM Definition not allowed"); } var serviceTemplate = acDefinitionProvider.deleteAcDefintion(compositionId); return createCommissioningResponse(compositionId, serviceTemplate); @@ -184,6 +184,10 @@ public class CommissioningProvider { throw new PfModelRuntimeException(Status.BAD_REQUEST, "There are instances, Priming/Depriming not allowed"); } var acmDefinition = acDefinitionProvider.getAcDefinition(compositionId); + if (acmDefinition.getRestarting() != null) { + throw new PfModelRuntimeException(Status.BAD_REQUEST, + "There is a restarting process, Priming/Depriming not allowed"); + } var stateOrdered = acTypeStateResolver.resolve(acTypeStateUpdate.getPrimeOrder(), acmDefinition.getState(), acmDefinition.getStateChangeResult()); switch (stateOrdered) { @@ -220,7 +224,7 @@ public class CommissioningProvider { participantIds.add(participantId); } } - if (! participantIds.isEmpty()) { + if (!participantIds.isEmpty()) { acmParticipantProvider.verifyParticipantState(participantIds); } acmDefinition.setState(AcTypeState.DEPRIMING); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java index ab3b00d6d..710a975f0 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java @@ -155,6 +155,9 @@ public class AutomationCompositionInstantiationProvider { .putAll(automationComposition.getElements().get(elementId).getProperties()); } } + if (automationComposition.getRestarting() != null) { + throw new PfModelRuntimeException(Status.BAD_REQUEST, "There is a restarting process, Update not allowed"); + } var validationResult = validateAutomationComposition(acToBeUpdated); if (!validationResult.isValid()) { throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, validationResult.getResult()); @@ -187,10 +190,16 @@ public class AutomationCompositionInstantiationProvider { return result; } if (!AcTypeState.PRIMED.equals(acDefinitionOpt.get().getState())) { - result.addResult(new ObjectValidationResult("ServiceTemplate", acDefinitionOpt.get().getState(), + result.addResult(new ObjectValidationResult("ServiceTemplate.state", acDefinitionOpt.get().getState(), ValidationStatus.INVALID, "Commissioned automation composition definition not primed")); return result; } + if (acDefinitionOpt.get().getRestarting() != null) { + result.addResult( + new ObjectValidationResult("ServiceTemplate.restarting", acDefinitionOpt.get().getRestarting(), + ValidationStatus.INVALID, "There is a restarting process in composition")); + return result; + } var participantIds = acDefinitionOpt.get().getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); @@ -245,8 +254,10 @@ public class AutomationCompositionInstantiationProvider { throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, "Automation composition state is still " + automationComposition.getDeployState()); } - var acDefinition = acDefinitionProvider - .getAcDefinition(automationComposition.getCompositionId()); + if (automationComposition.getRestarting() != null) { + throw new PfModelRuntimeException(Status.BAD_REQUEST, "There is a restarting process, Delete not allowed"); + } + var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); if (acDefinition != null) { var participantIds = acDefinition.getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java index 536e3e246..5a2079b0d 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java @@ -22,6 +22,7 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; import lombok.AllArgsConstructor; @@ -31,6 +32,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionS import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils; @@ -78,8 +80,8 @@ public class SupervisionAcHandler { automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); automationCompositionProvider.updateAutomationComposition(automationComposition); var startPhase = ParticipantUtils.getFirstStartPhase(automationComposition, acDefinition.getServiceTemplate()); - automationCompositionDeployPublisher.send(automationComposition, acDefinition.getServiceTemplate(), - startPhase, true); + automationCompositionDeployPublisher.send(automationComposition, acDefinition.getServiceTemplate(), startPhase, + true); } /** @@ -261,9 +263,19 @@ public class SupervisionAcHandler { element.setUseState(acElementAck.getValue().getUseState()); element.setDeployState(acElementAck.getValue().getDeployState()); element.setLockState(acElementAck.getValue().getLockState()); + element.setRestarting(null); updated = true; } } + + if (automationComposition.getRestarting() != null) { + var restarting = automationComposition.getElements().values().stream() + .map(AutomationCompositionElement::getRestarting).filter(Objects::nonNull).findAny(); + if (restarting.isEmpty()) { + automationComposition.setRestarting(null); + } + } + return updated; } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java index 7303fc84f..079f218ee 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java @@ -68,13 +68,13 @@ public class SupervisionAspect implements Closeable { public void doCheck() { if (executor.getQueue().size() < 2) { LOGGER.debug("Add scanning Message"); - executor.execute(() -> supervisionScanner.run()); + executor.execute(supervisionScanner::run); } } - @Before("@annotation(MessageIntercept) && args(participantStatusMessage,..)") - public void handleParticipantStatus(ParticipantStatus participantStatusMessage) { - executor.execute(() -> partecipantScanner.handleParticipantStatus(participantStatusMessage.getParticipantId())); + @Before("@annotation(MessageIntercept) && args(participantStatusMsg,..)") + public void handleParticipantStatus(ParticipantStatus participantStatusMsg) { + executor.execute(() -> partecipantScanner.handleParticipantStatus(participantStatusMsg.getParticipantId())); } @Override diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java index a18ea19a9..4f58801c3 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java @@ -58,7 +58,7 @@ public class SupervisionHandler { } var acDefinition = acDefinitionOpt.get(); if (!AcTypeState.PRIMING.equals(acDefinition.getState()) - && !AcTypeState.DEPRIMING.equals(acDefinition.getState())) { + && !AcTypeState.DEPRIMING.equals(acDefinition.getState()) && acDefinition.getRestarting() == null) { LOGGER.error("AC Definition {} already primed/deprimed with participant {}", participantPrimeAckMessage.getCompositionId(), participantPrimeAckMessage.getParticipantId()); return; @@ -68,8 +68,8 @@ public class SupervisionHandler { private void handleParticipantPrimeAck(ParticipantPrimeAck participantPrimeAckMessage, AutomationCompositionDefinition acDefinition) { - var finalState = - AcTypeState.PRIMING.equals(acDefinition.getState()) ? AcTypeState.PRIMED : AcTypeState.COMMISSIONED; + var finalState = AcTypeState.PRIMING.equals(acDefinition.getState()) + || AcTypeState.PRIMED.equals(acDefinition.getState()) ? AcTypeState.PRIMED : AcTypeState.COMMISSIONED; var msgInErrors = StateChangeResult.FAILED.equals(participantPrimeAckMessage.getStateChangeResult()); boolean inProgress = !StateChangeResult.FAILED.equals(acDefinition.getStateChangeResult()); if (inProgress && msgInErrors) { @@ -77,14 +77,19 @@ public class SupervisionHandler { } boolean completed = true; + boolean restarting = false; for (var element : acDefinition.getElementStateMap().values()) { if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) { element.setMessage(participantPrimeAckMessage.getMessage()); element.setState(participantPrimeAckMessage.getCompositionState()); + element.setRestarting(null); } if (!finalState.equals(element.getState())) { completed = false; } + if (element.getRestarting() != null) { + restarting = true; + } } if (inProgress && !msgInErrors && completed) { @@ -92,7 +97,9 @@ public class SupervisionHandler { if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) { acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR); } - + } + if (!restarting) { + acDefinition.setRestarting(null); } acDefinitionProvider.updateAcDefinition(acDefinition); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java index 092fc3572..c07c584cd 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java @@ -46,9 +46,8 @@ public class SupervisionPartecipantScanner { * @param participantProvider the Participant Provider * @param acRuntimeParameterGroup the parameters for the automation composition runtime */ - public SupervisionPartecipantScanner( - final ParticipantProvider participantProvider, - final AcRuntimeParameterGroup acRuntimeParameterGroup) { + public SupervisionPartecipantScanner(final ParticipantProvider participantProvider, + final AcRuntimeParameterGroup acRuntimeParameterGroup) { this.participantProvider = participantProvider; participantStatusTimeout.setMaxWaitMs(acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs()); @@ -70,11 +69,17 @@ public class SupervisionPartecipantScanner { private void scanParticipantStatus(Participant participant) { var id = participant.getParticipantId(); if (participantStatusTimeout.isTimeout(id)) { - LOGGER.debug("report Participant fault"); - return; + if (ParticipantState.ON_LINE.equals(participant.getParticipantState())) { + // restart scenario + LOGGER.debug("Participant is back ON_LINE {}", id); + participantStatusTimeout.clear(id); + } else { + LOGGER.debug("report Participant is still OFF_LINE {}", id); + return; + } } if (participantStatusTimeout.getDuration(id) > participantStatusTimeout.getMaxWaitMs()) { - LOGGER.debug("report Participant fault"); + LOGGER.debug("report Participant OFF_LINE {}", id); participantStatusTimeout.setTimeout(id); participant.setParticipantState(ParticipantState.OFF_LINE); participantProvider.updateParticipant(participant); @@ -85,6 +90,7 @@ public class SupervisionPartecipantScanner { * handle participant Status message. */ public void handleParticipantStatus(UUID id) { + LOGGER.debug("Participant is ON_LINE {}", id); participantStatusTimeout.clear(id); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java index 6d438038d..e0d11ed9f 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,13 +30,18 @@ import lombok.AllArgsConstructor; import org.apache.commons.collections4.MapUtils; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher; +import org.onap.policy.clamp.models.acm.concepts.AcTypeState; +import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.Participant; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType; +import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantDeregister; -import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantMessage; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.slf4j.Logger; @@ -54,6 +60,8 @@ public class SupervisionParticipantHandler { private final ParticipantRegisterAckPublisher participantRegisterAckPublisher; private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher; private final AutomationCompositionProvider automationCompositionProvider; + private final AcDefinitionProvider acDefinitionProvider; + private final ParticipantRestartPublisher participantRestartPublisher; /** * Handle a ParticipantRegister message from a participant. @@ -64,11 +72,24 @@ public class SupervisionParticipantHandler { @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received") public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) { LOGGER.debug("Participant Register received {}", participantRegisterMsg); - saveParticipantStatus(participantRegisterMsg, - listToMap(participantRegisterMsg.getParticipantSupportedElementType())); + var participantOpt = participantProvider.findParticipant(participantRegisterMsg.getParticipantId()); + + if (participantOpt.isPresent()) { + var participant = participantOpt.get(); + if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) { + participant.setParticipantState(ParticipantState.ON_LINE); + participantProvider.saveParticipant(participant); + } + handleRestart(participant.getParticipantId()); + } else { + var participant = createParticipant(participantRegisterMsg.getParticipantId(), + listToMap(participantRegisterMsg.getParticipantSupportedElementType())); + participantProvider.saveParticipant(participant); + + } participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(), - participantRegisterMsg.getParticipantId()); + participantRegisterMsg.getParticipantId()); } /** @@ -100,30 +121,81 @@ public class SupervisionParticipantHandler { @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received") public void handleParticipantMessage(ParticipantStatus participantStatusMsg) { LOGGER.debug("Participant Status received {}", participantStatusMsg); - saveParticipantStatus(participantStatusMsg, - listToMap(participantStatusMsg.getParticipantSupportedElementType())); + + var participantOpt = participantProvider.findParticipant(participantStatusMsg.getParticipantId()); + if (participantOpt.isEmpty()) { + var participant = createParticipant(participantStatusMsg.getParticipantId(), + listToMap(participantStatusMsg.getParticipantSupportedElementType())); + participantProvider.saveParticipant(participant); + } if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) { automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList()); } } - private void saveParticipantStatus(ParticipantMessage participantMessage, - Map participantSupportedElementType) { - var participantOpt = participantProvider.findParticipant(participantMessage.getParticipantId()); - - if (participantOpt.isEmpty()) { - var participant = new Participant(); - participant.setParticipantId(participantMessage.getParticipantId()); - participant.setParticipantSupportedElementTypes(participantSupportedElementType); - participant.setParticipantState(ParticipantState.ON_LINE); + private void handleRestart(UUID participantId) { + var compositionIds = participantProvider.getCompositionIds(participantId); + for (var compositionId : compositionIds) { + var acDefinition = acDefinitionProvider.getAcDefinition(compositionId); + LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId()); + handleRestart(participantId, acDefinition); + } + } - participantProvider.saveParticipant(participant); - } else { - var participant = participantOpt.get(); - participant.setParticipantState(ParticipantState.ON_LINE); + private void handleRestart(UUID participantId, AutomationCompositionDefinition acDefinition) { + if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { + LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId()); + return; + } + LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId()); + for (var elementState : acDefinition.getElementStateMap().values()) { + if (participantId.equals(elementState.getParticipantId())) { + elementState.setRestarting(true); + } + } + var automationCompositionList = + automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); + List automationCompositions = new ArrayList<>(); + for (var automationComposition : automationCompositionList) { + if (isAcToBeRestarted(participantId, automationComposition)) { + automationCompositions.add(automationComposition); + } + } + // expected final state + if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) { + acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR); + } + acDefinition.setRestarting(true); + acDefinitionProvider.updateAcDefinition(acDefinition); + participantRestartPublisher.send(participantId, acDefinition, automationCompositions); + } - participantProvider.updateParticipant(participant); + private boolean isAcToBeRestarted(UUID participantId, AutomationComposition automationComposition) { + boolean toAdd = false; + for (var element : automationComposition.getElements().values()) { + if (participantId.equals(element.getParticipantId())) { + element.setRestarting(true); + toAdd = true; + } + } + if (toAdd) { + automationComposition.setRestarting(true); + // expected final state + if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) { + automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); + } + automationCompositionProvider.updateAutomationComposition(automationComposition); } + return toAdd; + } + + private Participant createParticipant(UUID participantId, + Map participantSupportedElementType) { + var participant = new Participant(); + participant.setParticipantId(participantId); + participant.setParticipantSupportedElementTypes(participantSupportedElementType); + participant.setParticipantState(ParticipantState.ON_LINE); + return participant; } private Map listToMap(List elementList) { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java index cb00c8e4d..b086b1914 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java @@ -34,7 +34,6 @@ import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantRestart; -import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder; import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; @@ -51,6 +50,9 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher