diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2024-06-18 16:32:30 +0100 |
---|---|---|
committer | Francesco Fiora <francesco.fiora@est.tech> | 2024-06-19 08:16:09 +0000 |
commit | 9cdfa4dc5aadaaf8ec11223c4991b61c0aa6d0b0 (patch) | |
tree | d91f07e3bf3054698f3702ce6c660af3ddc040d6 /runtime-acm | |
parent | 29d86f951b30f4941ee63b0d2badef810b856e53 (diff) |
Add support for sync messages in ACM-runtime
Issue-ID: POLICY-5035
Change-Id: Ibcf1c6a414a7ba9d1cafd42041551bb0fb198088
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-acm')
24 files changed, 500 insertions, 325 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java index 74ccb9cc6..8a56fbb1e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java @@ -29,7 +29,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -39,6 +38,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.Commissionin import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.TimestampHelper; import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; @@ -56,7 +56,7 @@ public class CommissioningProvider { private final AcDefinitionProvider acDefinitionProvider; private final AutomationCompositionProvider acProvider; - private final AcmParticipantProvider acmParticipantProvider; + private final ParticipantProvider participantProvider; private final AcTypeStateResolver acTypeStateResolver; private final ParticipantPrimePublisher participantPrimePublisher; private final AcRuntimeParameterGroup acRuntimeParameterGroup; @@ -229,7 +229,7 @@ public class CommissioningProvider { } } if (!participantIds.isEmpty()) { - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); } acmDefinition.setState(AcTypeState.DEPRIMING); acmDefinition.setLastMsg(TimestampHelper.now()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java index 220636b9d..2bf08220a 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; @@ -43,6 +42,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.Instantiatio import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ObjectValidationResult; @@ -69,7 +69,7 @@ public class AutomationCompositionInstantiationProvider { private final AcDefinitionProvider acDefinitionProvider; private final AcInstanceStateResolver acInstanceStateResolver; private final SupervisionAcHandler supervisionAcHandler; - private final AcmParticipantProvider acmParticipantProvider; + private final ParticipantProvider participantProvider; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -265,7 +265,7 @@ public class AutomationCompositionInstantiationProvider { var participantIds = acDefinitionOpt.get().getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); result.addResult(AcmUtils.validateAutomationComposition(automationComposition, acDefinitionOpt.get().getServiceTemplate(), @@ -331,7 +331,7 @@ public class AutomationCompositionInstantiationProvider { var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); var participantIds = acDefinition.getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); supervisionAcHandler.delete(automationComposition, acDefinition); var response = new InstantiationResponse(); response.setInstanceId(automationComposition.getInstanceId()); @@ -374,7 +374,7 @@ public class AutomationCompositionInstantiationProvider { var participantIds = acDefinition.getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); var result = acInstanceStateResolver.resolve(acInstanceStateUpdate.getDeployOrder(), acInstanceStateUpdate.getLockOrder(), automationComposition.getDeployState(), automationComposition.getLockState(), automationComposition.getStateChangeResult()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java index 282389a74..62ba7b017 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java @@ -20,12 +20,10 @@ package org.onap.policy.clamp.acm.runtime.participants; -import jakarta.ws.rs.core.Response; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.MapUtils; @@ -33,9 +31,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPu import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.ParticipantInformation; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; -import org.onap.policy.models.base.PfModelRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -94,12 +90,11 @@ public class AcmParticipantProvider { * @param participantId The UUID of the participant to send request to */ public void sendParticipantStatusRequest(UUID participantId) { - var participant = this.participantProvider.getParticipantById(participantId); + // check if participant is present + this.participantProvider.getParticipantById(participantId); LOGGER.debug("Requesting Participant Status Now ParticipantStatusReq"); participantStatusReqPublisher.send(participantId); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.saveParticipant(participant); } /** @@ -110,22 +105,6 @@ public class AcmParticipantProvider { this.participantStatusReqPublisher.send((UUID) null); } - /** - * Verify Participant state. - * - * @param participantIds The list of UUIDs of the participants to get - * @throws PfModelRuntimeException in case the participant is offline - */ - public void verifyParticipantState(Set<UUID> participantIds) { - for (UUID participantId : participantIds) { - var participant = this.participantProvider.getParticipantById(participantId); - if (! participant.getParticipantState().equals(ParticipantState.ON_LINE)) { - throw new PfModelRuntimeException(Response.Status.CONFLICT, - "Participant: " + participantId + " is OFFLINE"); - } - } - } - private Map<UUID, AutomationCompositionElement> getAutomationCompositionElementsForParticipant(UUID participantId) { var automationCompositionElements = participantProvider .getAutomationCompositionElements(participantId); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java index 802c6603b..3e2057ed5 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java @@ -33,6 +33,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -42,6 +43,7 @@ import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck; +import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.slf4j.Logger; @@ -58,12 +60,14 @@ public class SupervisionAcHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAcHandler.class); private final AutomationCompositionProvider automationCompositionProvider; + private final AcDefinitionProvider acDefinitionProvider; // Publishers for participant communication private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher; private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher; private final AcElementPropertiesPublisher acElementPropertiesPublisher; private final AutomationCompositionMigrationPublisher acCompositionMigrationPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; private final ExecutorService executor = Context.taskWrapping(Executors.newFixedThreadPool(1)); @@ -260,6 +264,8 @@ public class SupervisionAcHandler { automationCompositionAckMessage.getStateChangeResult()); if (updated) { automationCompositionProvider.updateAutomationComposition(automationComposition); + var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); + participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), automationComposition); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java index 8f3a4c2eb..9ef979f8e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java @@ -41,9 +41,9 @@ public class SupervisionAspect implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class); private final SupervisionScanner supervisionScanner; - private final SupervisionPartecipantScanner partecipantScanner; + private final SupervisionParticipantScanner participantScanner; - private ThreadPoolExecutor executor = + private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); @Scheduled( @@ -56,7 +56,7 @@ public class SupervisionAspect implements Closeable { private void executeScan() { supervisionScanner.run(); - partecipantScanner.run(); + participantScanner.run(); } /** diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java index 963e4830e..a4e470495 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java @@ -23,9 +23,10 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; import lombok.AllArgsConstructor; -import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; @@ -43,7 +44,7 @@ public class SupervisionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class); private final AcDefinitionProvider acDefinitionProvider; - private final AcRuntimeParameterGroup acRuntimeParameterGroup; + private final ParticipantSyncPublisher participantSyncPublisher; /** * Handle a ParticipantPrimeAck message from a participant. @@ -82,12 +83,7 @@ public class SupervisionHandler { boolean completed = true; boolean restarting = false; for (var element : acDefinition.getElementStateMap().values()) { - if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) { - element.setMessage(participantPrimeAckMessage.getMessage()); - element.setState(participantPrimeAckMessage.getCompositionState()); - element.setRestarting(null); - acDefinitionProvider.updateAcDefinitionElement(element, acDefinition.getCompositionId()); - } + handlePrimeAckElement(participantPrimeAckMessage, element); if (!finalState.equals(element.getState())) { completed = false; } @@ -110,6 +106,18 @@ public class SupervisionHandler { if (toUpdate) { acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(), acDefinition.getStateChangeResult(), acDefinition.getRestarting()); + if (!participantPrimeAckMessage.getParticipantId().equals(participantPrimeAckMessage.getReplicaId())) { + participantSyncPublisher.sendSync(acDefinition, participantPrimeAckMessage.getReplicaId()); + } + } + } + + private void handlePrimeAckElement(ParticipantPrimeAck participantPrimeAckMessage, NodeTemplateState element) { + if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) { + element.setMessage(participantPrimeAckMessage.getMessage()); + element.setState(participantPrimeAckMessage.getCompositionState()); + element.setRestarting(null); + acDefinitionProvider.updateAcDefinitionElement(element, participantPrimeAckMessage.getCompositionId()); } } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java index 609e036be..4c8c5815c 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java @@ -21,7 +21,6 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,11 +31,13 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.Participant; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; @@ -65,6 +66,7 @@ public class SupervisionParticipantHandler { private final AutomationCompositionProvider automationCompositionProvider; private final AcDefinitionProvider acDefinitionProvider; private final ParticipantRestartPublisher participantRestartPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -72,21 +74,11 @@ public class SupervisionParticipantHandler { * * @param participantRegisterMsg the ParticipantRegister message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received") public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) { - var participantOpt = participantProvider.findParticipant(participantRegisterMsg.getParticipantId()); - - if (participantOpt.isPresent()) { - var participant = participantOpt.get(); - checkOnline(participant); - handleRestart(participant.getParticipantId()); - } else { - var participant = createParticipant(participantRegisterMsg.getParticipantId(), - listToMap(participantRegisterMsg.getParticipantSupportedElementType())); - participantProvider.saveParticipant(participant); - - } + saveIfNotPresent(participantRegisterMsg.getReplicaId(), + participantRegisterMsg.getParticipantId(), + participantRegisterMsg.getParticipantSupportedElementType(), true); participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(), participantRegisterMsg.getParticipantId()); @@ -97,15 +89,13 @@ public class SupervisionParticipantHandler { * * @param participantDeregisterMsg the ParticipantDeregister message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received") public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) { - var participantOpt = participantProvider.findParticipant(participantDeregisterMsg.getParticipantId()); - - if (participantOpt.isPresent()) { - var participant = participantOpt.get(); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.saveParticipant(participant); + var replicaId = participantDeregisterMsg.getReplicaId() != null + ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId(); + var replicaOpt = participantProvider.findParticipantReplica(replicaId); + if (replicaOpt.isPresent()) { + participantProvider.deleteParticipantReplica(replicaId); } participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId()); @@ -116,32 +106,57 @@ public class SupervisionParticipantHandler { * * @param participantStatusMsg the ParticipantStatus message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received") public void handleParticipantMessage(ParticipantStatus participantStatusMsg) { + saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(), + participantStatusMsg.getParticipantSupportedElementType(), false); - var participantOpt = participantProvider.findParticipant(participantStatusMsg.getParticipantId()); - if (participantOpt.isEmpty()) { - var participant = createParticipant(participantStatusMsg.getParticipantId(), - listToMap(participantStatusMsg.getParticipantSupportedElementType())); - participantProvider.saveParticipant(participant); - } else { - checkOnline(participantOpt.get()); - } if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) { automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList()); } if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty() && participantStatusMsg.getCompositionId() != null) { updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(), - participantStatusMsg.getParticipantDefinitionUpdates()); + participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates()); } } - private void updateAcDefinitionOutProperties(UUID composotionId, List<ParticipantDefinition> list) { - var acDefinitionOpt = acDefinitionProvider.findAcDefinition(composotionId); + private void saveIfNotPresent(UUID msgReplicaId, UUID participantId, + List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) { + var replicaId = msgReplicaId != null ? msgReplicaId : participantId; + var replicaOpt = participantProvider.findParticipantReplica(replicaId); + if (replicaOpt.isPresent()) { + var replica = replicaOpt.get(); + checkOnline(replica); + } else { + var participant = getParticipant(participantId, listToMap(participantSupportedElementType)); + participant.getReplicas().put(replicaId, createReplica(replicaId)); + participantProvider.saveParticipant(participant); + } + if (registration) { + handleRestart(participantId, replicaId); + } + } + + private Participant getParticipant(UUID participantId, + Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) { + var participantOpt = participantProvider.findParticipant(participantId); + return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType)); + } + + private ParticipantReplica createReplica(UUID replicaId) { + var replica = new ParticipantReplica(); + replica.setReplicaId(replicaId); + replica.setParticipantState(ParticipantState.ON_LINE); + replica.setLastMsg(TimestampHelper.now()); + return replica; + + } + + private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) { + var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId); if (acDefinitionOpt.isEmpty()) { - LOGGER.error("Ac Definition with id {} not found", composotionId); + LOGGER.error("Ac Definition with id {} not found", compositionId); return; } var acDefinition = acDefinitionOpt.get(); @@ -155,26 +170,32 @@ public class SupervisionParticipantHandler { } acDefinitionProvider.updateAcDefinition(acDefinition, acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName()); + participantSyncPublisher.sendSync(acDefinition, replicaId); } - private void checkOnline(Participant participant) { - if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) { - participant.setParticipantState(ParticipantState.ON_LINE); + private void checkOnline(ParticipantReplica replica) { + if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) { + replica.setParticipantState(ParticipantState.ON_LINE); } - participant.setLastMsg(TimestampHelper.now()); - participantProvider.saveParticipant(participant); + replica.setLastMsg(TimestampHelper.now()); + participantProvider.saveParticipantReplica(replica); } - private void handleRestart(UUID participantId) { + private void handleRestart(UUID participantId, UUID replicaId) { var compositionIds = participantProvider.getCompositionIds(participantId); + var oldParticipant = participantId.equals(replicaId); for (var compositionId : compositionIds) { var acDefinition = acDefinitionProvider.getAcDefinition(compositionId); LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId()); - handleRestart(participantId, acDefinition); + if (oldParticipant) { + handleRestart(participantId, acDefinition); + } else { + handleSyncRestart(participantId, replicaId, acDefinition); + } } } - private void handleRestart(UUID participantId, AutomationCompositionDefinition acDefinition) { + private void handleRestart(final UUID participantId, AutomationCompositionDefinition acDefinition) { if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId()); return; @@ -185,14 +206,6 @@ public class SupervisionParticipantHandler { elementState.setRestarting(true); } } - var automationCompositionList = - automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); - List<AutomationComposition> automationCompositions = new ArrayList<>(); - for (var automationComposition : automationCompositionList) { - if (isAcToBeRestarted(participantId, automationComposition)) { - automationCompositions.add(automationComposition); - } - } // expected final state if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) { acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR); @@ -200,6 +213,11 @@ public class SupervisionParticipantHandler { acDefinition.setRestarting(true); acDefinitionProvider.updateAcDefinition(acDefinition, acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName()); + + var automationCompositionList = + automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); + var automationCompositions = automationCompositionList.stream() + .filter(ac -> isAcToBeRestarted(participantId, ac)).toList(); participantRestartPublisher.send(participantId, acDefinition, automationCompositions); } @@ -222,13 +240,34 @@ public class SupervisionParticipantHandler { return toAdd; } + private void handleSyncRestart(final UUID participantId, UUID replicaId, + AutomationCompositionDefinition acDefinition) { + if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { + LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId()); + return; + } + LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId()); + var automationCompositionList = + automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); + var automationCompositions = automationCompositionList.stream() + .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList(); + participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions); + } + + private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) { + for (var element : automationComposition.getElements().values()) { + if (participantId.equals(element.getParticipantId())) { + return true; + } + } + return false; + } + private Participant createParticipant(UUID participantId, Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) { var participant = new Participant(); participant.setParticipantId(participantId); participant.setParticipantSupportedElementTypes(participantSupportedElementType); - participant.setParticipantState(ParticipantState.ON_LINE); - participant.setLastMsg(TimestampHelper.now()); return participant; } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java index b7f0be8eb..4ada199b6 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java @@ -21,8 +21,7 @@ package org.onap.policy.clamp.acm.runtime.supervision; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.models.acm.concepts.Participant; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.TimestampHelper; import org.slf4j.Logger; @@ -33,20 +32,20 @@ import org.springframework.stereotype.Component; * This class is used to scan the automation compositions in the database and check if they are in the correct state. */ @Component -public class SupervisionPartecipantScanner { - private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionPartecipantScanner.class); +public class SupervisionParticipantScanner { + private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantScanner.class); private final long maxWaitMs; private final ParticipantProvider participantProvider; /** - * Constructor for instantiating SupervisionPartecipantScanner. + * Constructor for instantiating SupervisionParticipantScanner. * * @param participantProvider the Participant Provider * @param acRuntimeParameterGroup the parameters for the automation composition runtime */ - public SupervisionPartecipantScanner(final ParticipantProvider participantProvider, + public SupervisionParticipantScanner(final ParticipantProvider participantProvider, final AcRuntimeParameterGroup acRuntimeParameterGroup) { this.participantProvider = participantProvider; this.maxWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs(); @@ -56,27 +55,17 @@ public class SupervisionPartecipantScanner { * Run Scanning. */ public void run() { - LOGGER.debug("Scanning participans in the database . . ."); - - for (var participant : participantProvider.getParticipants()) { - scanParticipantStatus(participant); - } - - LOGGER.debug("Participans scan complete . . ."); + LOGGER.debug("Scanning participants in the database . . ."); + participantProvider.findReplicasOnLine().forEach(this::scanParticipantReplicaStatus); + LOGGER.debug("Participants scan complete . . ."); } - private void scanParticipantStatus(Participant participant) { - var id = participant.getParticipantId(); - if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) { - LOGGER.debug("report Participant is still OFF_LINE {}", id); - return; - } + private void scanParticipantReplicaStatus(ParticipantReplica replica) { var now = TimestampHelper.nowEpochMilli(); - var lastMsg = TimestampHelper.toEpochMilli(participant.getLastMsg()); + var lastMsg = TimestampHelper.toEpochMilli(replica.getLastMsg()); if ((now - lastMsg) > maxWaitMs) { - LOGGER.debug("report Participant OFF_LINE {}", id); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.saveParticipant(participant); + LOGGER.debug("Participant OFF_LINE {}", replica.getReplicaId()); + participantProvider.deleteParticipantReplica(replica.getReplicaId()); } } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java index 06d464671..75a2f0540 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -55,6 +56,7 @@ public class SupervisionScanner { private final AcDefinitionProvider acDefinitionProvider; private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher; private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; /** * Constructor for instantiating SupervisionScanner. @@ -69,11 +71,13 @@ public class SupervisionScanner { final AcDefinitionProvider acDefinitionProvider, final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher, final AutomationCompositionDeployPublisher automationCompositionDeployPublisher, + final ParticipantSyncPublisher participantSyncPublisher, final AcRuntimeParameterGroup acRuntimeParameterGroup) { this.automationCompositionProvider = automationCompositionProvider; this.acDefinitionProvider = acDefinitionProvider; this.automationCompositionStateChangePublisher = automationCompositionStateChangePublisher; this.automationCompositionDeployPublisher = automationCompositionDeployPublisher; + this.participantSyncPublisher = participantSyncPublisher; this.maxStatusWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs(); } @@ -118,6 +122,7 @@ public class SupervisionScanner { if (completed) { acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), finalState, StateChangeResult.NO_ERROR, null); + participantSyncPublisher.sendSync(acDefinition, null); } else { handleTimeout(acDefinition); } @@ -132,7 +137,6 @@ public class SupervisionScanner { || StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult())) { LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId()); - // Clear Timeout on automation composition return; } @@ -158,7 +162,7 @@ public class SupervisionScanner { LOGGER.debug("automation composition scan: transition state {} {} completed", automationComposition.getDeployState(), automationComposition.getLockState()); - complete(automationComposition); + complete(automationComposition, serviceTemplate); } else { LOGGER.debug("automation composition scan: transition state {} {} not completed", automationComposition.getDeployState(), automationComposition.getLockState()); @@ -183,7 +187,8 @@ public class SupervisionScanner { } } - private void complete(final AutomationComposition automationComposition) { + private void complete(final AutomationComposition automationComposition, + ToscaServiceTemplate serviceTemplate) { var deployState = automationComposition.getDeployState(); if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) { // migration scenario @@ -201,6 +206,7 @@ public class SupervisionScanner { } else { automationCompositionProvider.updateAutomationComposition(automationComposition); } + participantSyncPublisher.sendSync(serviceTemplate, automationComposition); } private void handleTimeout(AutomationCompositionDefinition acDefinition) { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java index 89763a2b6..b0848bd51 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.UUID; import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; @@ -55,7 +54,6 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantPrimePublisher.class); private final ParticipantProvider participantProvider; - private final AcmParticipantProvider acmParticipantProvider; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -99,9 +97,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); elementState.setState(AcTypeState.PRIMING); participantIds.add(elementState.getParticipantId()); - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - supportedElementMap.put(type, elementState.getParticipantId()); + supportedElementMap.put(AcmUtils.getType(elementEntry.getValue()), elementState.getParticipantId()); } } else { // scenario Prime participants not assigned yet @@ -109,16 +105,14 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part for (var elementEntry : acElements) { var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); elementState.setState(AcTypeState.PRIMING); - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - var participantId = supportedElementMap.get(type); + var participantId = supportedElementMap.get(AcmUtils.getType(elementEntry.getValue())); if (participantId != null) { elementState.setParticipantId(participantId); participantIds.add(participantId); } } } - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); return AcmUtils.prepareParticipantPriming(acElements, supportedElementMap); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java index 4f28eab8e..3fe46a928 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java @@ -22,22 +22,15 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import io.micrometer.core.annotation.Timed; import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.UUID; import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; -import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart; import org.onap.policy.clamp.models.acm.utils.AcmUtils; -import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; -import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -66,52 +59,18 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher<Pa message.setMessageId(UUID.randomUUID()); message.setTimestamp(Instant.now()); message.setState(acmDefinition.getState()); - message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition)); + message.setParticipantDefinitionUpdates( + AcmUtils.prepareParticipantRestarting(participantId, acmDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate()); for (var automationComposition : automationCompositions) { - var restartAc = new ParticipantRestartAc(); - restartAc.setAutomationCompositionId(automationComposition.getInstanceId()); - for (var element : automationComposition.getElements().values()) { - if (participantId.equals(element.getParticipantId())) { - var acElementRestart = AcmUtils.createAcElementRestart(element); - acElementRestart.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); - restartAc.getAcElementList().add(acElementRestart); - } - } + var restartAc = AcmUtils + .createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment); message.getAutomationcompositionList().add(restartAc); } LOGGER.debug("Participant Restart sent {}", message); super.send(message); } - - protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId, - AutomationCompositionDefinition acmDefinition) { - var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(), - acRuntimeParameterGroup.getAcmParameters().getToscaElementName()); - - // list of entry filtered by participantId - List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>(); - Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>(); - for (var elementEntry : acElements) { - var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); - if (participantId.equals(elementState.getParticipantId())) { - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - supportedElementMap.put(type, participantId); - elementList.add(elementEntry); - } - } - var list = AcmUtils.prepareParticipantPriming(elementList, supportedElementMap); - for (var participantDefinition : list) { - for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) { - var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName()); - if (state != null) { - elementDe.setOutProperties(state.getOutProperties()); - } - } - } - return list; - } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java index 76feee72f..2eb434b64 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java @@ -40,7 +40,7 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher< */ @Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published") public void send(UUID participantId) { - ParticipantStatusReq message = new ParticipantStatusReq(); + var message = new ParticipantStatusReq(); message.setParticipantId(participantId); message.setTimestamp(Instant.now()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java index ae7eda1ee..b63bc0a6b 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java @@ -23,69 +23,58 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import io.micrometer.core.annotation.Timed; import java.time.Instant; import java.util.List; -import java.util.Optional; import java.util.UUID; +import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; +import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync; import org.onap.policy.clamp.models.acm.utils.AcmUtils; -import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; - @Component -public class ParticipantSyncPublisher extends ParticipantRestartPublisher { +@AllArgsConstructor +public class ParticipantSyncPublisher extends AbstractParticipantPublisher<ParticipantSync> { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class); - private final AcRuntimeParameterGroup acRuntimeParameterGroup; - public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) { - super(acRuntimeParameterGroup); - this.acRuntimeParameterGroup = acRuntimeParameterGroup; - } - - /** - * Send sync msg to Participant. + * Send Restart sync msg to Participant by participantId. * - * @param participantId the ParticipantId + * @param participantId the participantId + * @param replicaId the replicaId * @param acmDefinition the AutomationComposition Definition * @param automationCompositions the list of automationCompositions */ - @Override @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") - public void send(UUID participantId, AutomationCompositionDefinition acmDefinition, + public void sendRestartMsg(UUID participantId, UUID replicaId, AutomationCompositionDefinition acmDefinition, List<AutomationComposition> automationCompositions) { var message = new ParticipantSync(); message.setParticipantId(participantId); + message.setReplicaId(replicaId); + message.setRestarting(true); message.setCompositionId(acmDefinition.getCompositionId()); message.setMessageId(UUID.randomUUID()); message.setTimestamp(Instant.now()); message.setState(acmDefinition.getState()); - message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition)); + message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(participantId, acmDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate()); for (var automationComposition : automationCompositions) { - var syncAc = new ParticipantRestartAc(); - syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); - for (var element : automationComposition.getElements().values()) { - if (participantId.equals(element.getParticipantId())) { - var acElementSync = AcmUtils.createAcElementRestart(element); - acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); - syncAc.getAcElementList().add(acElementSync); - } - } + var syncAc = AcmUtils.createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment); message.getAutomationcompositionList().add(syncAc); } - LOGGER.debug("Participant Sync sent {}", message); + LOGGER.debug("Participant Restarting Sync sent {}", message); super.send(message); } @@ -98,4 +87,64 @@ public class ParticipantSyncPublisher extends ParticipantRestartPublisher { return false; } + /** + * Send AutomationCompositionDefinition sync msg to all Participants. + * + * @param acDefinition the AutomationComposition Definition + * @param excludeReplicaId the replica to be excluded + */ + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void sendSync(AutomationCompositionDefinition acDefinition, UUID excludeReplicaId) { + var message = new ParticipantSync(); + message.setCompositionId(acDefinition.getCompositionId()); + if (excludeReplicaId != null) { + message.getExcludeReplicas().add(excludeReplicaId); + } + message.setState(acDefinition.getState()); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { + message.setDelete(true); + } else { + message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); + } + LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message); + super.send(message); + } + + /** + * Send AutomationComposition sync msg to all Participants. + * + * @param serviceTemplate the ServiceTemplate + * @param automationComposition the automationComposition + */ + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void sendSync(ToscaServiceTemplate serviceTemplate, AutomationComposition automationComposition) { + var message = new ParticipantSync(); + message.setCompositionId(automationComposition.getCompositionId()); + message.setAutomationCompositionId(automationComposition.getInstanceId()); + message.setState(AcTypeState.PRIMED); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + var syncAc = new ParticipantRestartAc(); + syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); + syncAc.setDeployState(automationComposition.getDeployState()); + syncAc.setLockState(automationComposition.getLockState()); + if (DeployState.DELETED.equals(automationComposition.getDeployState())) { + message.setDelete(true); + } else { + var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(serviceTemplate); + for (var element : automationComposition.getElements().values()) { + var acElementSync = AcmUtils.createAcElementRestart(element); + acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); + syncAc.getAcElementList().add(acElementSync); + + } + } + message.getAutomationcompositionList().add(syncAc); + + LOGGER.debug("Participant AutomationComposition Sync sent {}", message); + super.send(message); + } } diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java index 5c26ea3bd..413719110 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021-2023 Nordix Foundation. + * Copyright (C) 2021-2024 Nordix Foundation. * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -36,7 +36,6 @@ import java.util.UUID; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; @@ -47,6 +46,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.PrimeOrder; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; class CommissioningProviderTest { @@ -165,7 +165,7 @@ class CommissioningProviderTest { var participantPrimePublisher = mock(ParticipantPrimePublisher.class); var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class), - mock(AcmParticipantProvider.class), new AcTypeStateResolver(), participantPrimePublisher, + mock(ParticipantProvider.class), new AcTypeStateResolver(), participantPrimePublisher, CommonTestData.getTestParamaterGroup()); var acTypeStateUpdate = new AcTypeStateUpdate(); @@ -184,15 +184,15 @@ class CommissioningProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition); var participantPrimePublisher = mock(ParticipantPrimePublisher.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class), - acmParticipantProvider, new AcTypeStateResolver(), participantPrimePublisher, + participantProvider, new AcTypeStateResolver(), participantPrimePublisher, CommonTestData.getTestParamaterGroup()); var acTypeStateUpdate = new AcTypeStateUpdate(); acTypeStateUpdate.setPrimeOrder(PrimeOrder.DEPRIME); - doNothing().when(acmParticipantProvider).verifyParticipantState(any()); + doNothing().when(participantProvider).verifyParticipantState(any()); provider.compositionDefinitionPriming(compositionId, acTypeStateUpdate); verify(participantPrimePublisher, timeout(1000).times(1)).sendDepriming(compositionId); } @@ -201,7 +201,7 @@ class CommissioningProviderTest { void testBadRequest() { var acProvider = mock(AutomationCompositionProvider.class); var provider = new CommissioningProvider(mock(AcDefinitionProvider.class), acProvider, - mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), + mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), mock(AcRuntimeParameterGroup.class)); var compositionId = UUID.randomUUID(); @@ -225,7 +225,7 @@ class CommissioningProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition); var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class), - mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), + mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), mock(AcRuntimeParameterGroup.class)); assertThatThrownBy(() -> provider.updateCompositionDefinition(compositionId, toscaServiceTemplate)) @@ -245,7 +245,7 @@ class CommissioningProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition); var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class), - mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), + mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), mock(AcRuntimeParameterGroup.class)); var acTypeStateUpdate = new AcTypeStateUpdate(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java index fbd8330fc..2ee6a152e 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java @@ -37,7 +37,6 @@ import java.util.UUID; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; @@ -52,6 +51,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.persistence.provider.ProviderUtils; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; import org.onap.policy.models.tosca.simple.concepts.JpaToscaServiceTemplate; @@ -101,9 +101,9 @@ class AutomationCompositionInstantiationProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition); var acProvider = mock(AutomationCompositionProvider.class); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, + null, supervisionAcHandler, participantProvider, CommonTestData.getTestParamaterGroup()); var automationCompositionCreate = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Crud"); @@ -141,7 +141,7 @@ class AutomationCompositionInstantiationProviderTest { when(acProvider.deleteAutomationComposition(automationCompositionUpdate.getInstanceId())) .thenReturn(automationCompositionUpdate); - doNothing().when(acmParticipantProvider).verifyParticipantState(any()); + doNothing().when(participantProvider).verifyParticipantState(any()); instantiationProvider.deleteAutomationComposition(automationCompositionCreate.getCompositionId(), automationCompositionCreate.getInstanceId()); @@ -167,9 +167,9 @@ class AutomationCompositionInstantiationProviderTest { when(acProvider.updateAutomationComposition(acmFromDb)).thenReturn(acmFromDb); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, + null, supervisionAcHandler, participantProvider, CommonTestData.getTestParamaterGroup()); var instantiationResponse = instantiationProvider.updateAutomationComposition( automationCompositionUpdate.getCompositionId(), automationCompositionUpdate); @@ -201,7 +201,7 @@ class AutomationCompositionInstantiationProviderTest { var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, mock(AcDefinitionProvider.class), null, - mock(SupervisionAcHandler.class), mock(AcmParticipantProvider.class), + mock(SupervisionAcHandler.class), mock(ParticipantProvider.class), mock(AcRuntimeParameterGroup.class)); var compositionId = automationCompositionUpdate.getCompositionId(); @@ -232,7 +232,7 @@ class AutomationCompositionInstantiationProviderTest { var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, mock(AcDefinitionProvider.class), null, - mock(SupervisionAcHandler.class), mock(AcmParticipantProvider.class), + mock(SupervisionAcHandler.class), mock(ParticipantProvider.class), mock(AcRuntimeParameterGroup.class)); var compositionId = automationCompositionUpdate.getCompositionId(); @@ -273,9 +273,9 @@ class AutomationCompositionInstantiationProviderTest { .thenReturn(automationCompositionUpdate); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, + null, supervisionAcHandler, participantProvider, mock(AcRuntimeParameterGroup.class)); assertThatThrownBy( () -> instantiationProvider.updateAutomationComposition(compositionId, automationCompositionUpdate)) @@ -303,9 +303,9 @@ class AutomationCompositionInstantiationProviderTest { when(acProvider.updateAutomationComposition(automationComposition)).thenReturn(automationComposition); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, new AcRuntimeParameterGroup()); + null, supervisionAcHandler, participantProvider, new AcRuntimeParameterGroup()); assertThatThrownBy(() -> instantiationProvider .updateAutomationComposition(automationComposition.getCompositionId(), automationComposition)) @@ -352,9 +352,9 @@ class AutomationCompositionInstantiationProviderTest { automationComposition.getElements().clear(); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, new AcRuntimeParameterGroup()); + null, supervisionAcHandler, participantProvider, new AcRuntimeParameterGroup()); assertThatThrownBy(() -> instantiationProvider .updateAutomationComposition(automationComposition.getCompositionId(), acMigrate)) @@ -373,11 +373,11 @@ class AutomationCompositionInstantiationProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition); automationComposition.setCompositionId(compositionId); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, acRuntimeParameterGroup); + null, supervisionAcHandler, participantProvider, acRuntimeParameterGroup); when(acProvider.getAutomationComposition(automationComposition.getInstanceId())) .thenReturn(automationComposition); @@ -436,10 +436,10 @@ class AutomationCompositionInstantiationProviderTest { var acProvider = mock(AutomationCompositionProvider.class); when(acProvider.createAutomationComposition(automationCompositionCreate)) .thenReturn(automationCompositionCreate); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, null, acmParticipantProvider, + null, null, participantProvider, CommonTestData.getTestParamaterGroup()); var instantiationResponse = instantiationProvider.createAutomationComposition( @@ -457,7 +457,7 @@ class AutomationCompositionInstantiationProviderTest { @Test void testCreateAutomationCompositions_CommissionedAcElementNotFound() { var acDefinitionProvider = mock(AcDefinitionProvider.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var compositionId = acDefinition.getCompositionId(); when(acDefinitionProvider.findAcDefinition(compositionId)).thenReturn(Optional.of(acDefinition)); @@ -467,7 +467,7 @@ class AutomationCompositionInstantiationProviderTest { var acProvider = mock(AutomationCompositionProvider.class); var provider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, null, null, - acmParticipantProvider, CommonTestData.getTestParamaterGroup()); + participantProvider, CommonTestData.getTestParamaterGroup()); assertThatThrownBy(() -> provider.createAutomationComposition(compositionId, automationComposition)) .hasMessageMatching(AC_ELEMENT_NAME_NOT_FOUND); @@ -572,9 +572,9 @@ class AutomationCompositionInstantiationProviderTest { when(acProvider.getAutomationComposition(instanceId)).thenReturn(automationComposition); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var provider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - new AcInstanceStateResolver(), supervisionAcHandler, acmParticipantProvider, + new AcInstanceStateResolver(), supervisionAcHandler, participantProvider, mock(AcRuntimeParameterGroup.class)); var acInstanceStateUpdate = new AcInstanceStateUpdate(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java index bcfdea1dd..ca58fad51 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java @@ -374,6 +374,9 @@ class InstantiationControllerTest extends CommonRestController { } private void saveDummyParticipantInDb() { - participantProvider.saveParticipant(CommonTestData.createParticipant(CommonTestData.getParticipantId())); + var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); + var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId()); + participant.getReplicas().put(replica.getReplicaId(), replica); + participantProvider.saveParticipant(participant); } } diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java index 8f39c9e2e..0bec9d0ce 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java @@ -39,16 +39,19 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType; +import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; class SupervisionAcHandlerTest { @@ -64,9 +67,14 @@ class SupervisionAcHandlerTest { when(automationCompositionProvider.findAutomationComposition(IDENTIFIER)) .thenReturn(Optional.of(automationComposition)); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var acDefinitionProvider = mock(AcDefinitionProvider.class); + when(acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId())) + .thenReturn(new AutomationCompositionDefinition()); + + var handler = new SupervisionAcHandler(automationCompositionProvider, acDefinitionProvider, mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class), - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var automationCompositionAckMessage = getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK, @@ -100,14 +108,19 @@ class SupervisionAcHandlerTest { when(automationCompositionProvider.findAutomationComposition(IDENTIFIER)) .thenReturn(Optional.of(automationComposition)); + var acDefinitionProvider = mock(AcDefinitionProvider.class); + when(acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId())) + .thenReturn(new AutomationCompositionDefinition()); + var automationCompositionAckMessage = getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_DEPLOY_ACK, automationComposition, DeployState.DEPLOYED, LockState.LOCKED); automationCompositionAckMessage.setParticipantId(CommonTestData.getParticipantId()); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, acDefinitionProvider, mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class), - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage); @@ -142,9 +155,9 @@ class SupervisionAcHandlerTest { var automationCompositionStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), automationCompositionStateChangePublisher, null, - null); + null, mock(ParticipantSyncPublisher.class)); handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage); @@ -156,8 +169,9 @@ class SupervisionAcHandlerTest { void testDeployFailed() { var automationCompositionDeployPublisher = mock(AutomationCompositionDeployPublisher.class); var automationCompositionProvider = mock(AutomationCompositionProvider.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, automationCompositionDeployPublisher, - mock(AutomationCompositionStateChangePublisher.class), mock(AcElementPropertiesPublisher.class), null); + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), + automationCompositionDeployPublisher, mock(AutomationCompositionStateChangePublisher.class), + mock(AcElementPropertiesPublisher.class), null, mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); @@ -174,9 +188,10 @@ class SupervisionAcHandlerTest { void testUndeploy() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -191,9 +206,10 @@ class SupervisionAcHandlerTest { void testUndeployFailed() { var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); var automationCompositionProvider = mock(AutomationCompositionProvider.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); @@ -211,9 +227,10 @@ class SupervisionAcHandlerTest { void testUnlock() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -228,9 +245,10 @@ class SupervisionAcHandlerTest { void testUnlockFailed() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -247,9 +265,10 @@ class SupervisionAcHandlerTest { void testLock() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -264,9 +283,10 @@ class SupervisionAcHandlerTest { void testLockFailed() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -294,9 +314,10 @@ class SupervisionAcHandlerTest { .setParticipantId(automationComposition.getElements().values().iterator().next().getParticipantId()); automationCompositionAckMessage.setAutomationCompositionId(IDENTIFIER); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class), - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage); @@ -308,8 +329,9 @@ class SupervisionAcHandlerTest { void testUpdate() { var acElementPropertiesPublisher = mock(AcElementPropertiesPublisher.class); var handler = new SupervisionAcHandler(mock(AutomationCompositionProvider.class), - mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class), - acElementPropertiesPublisher, null); + mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), + mock(AutomationCompositionStateChangePublisher.class), acElementPropertiesPublisher, null, + mock(ParticipantSyncPublisher.class)); var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Lock"); handler.update(automationComposition); @@ -320,8 +342,9 @@ class SupervisionAcHandlerTest { void testMigrate() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acCompositionMigrationPublisher = mock(AutomationCompositionMigrationPublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, null, null, null, - acCompositionMigrationPublisher); + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), + null, null, null, + acCompositionMigrationPublisher, mock(ParticipantSyncPublisher.class)); var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Migrate"); handler.migrate(automationComposition, UUID.randomUUID()); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java index f78344bcb..7a72e0ef5 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java @@ -32,19 +32,19 @@ class SupervisionAspectTest { @Test void testSchedule() throws Exception { var supervisionScanner = mock(SupervisionScanner.class); - var partecipantScanner = mock(SupervisionPartecipantScanner.class); - try (var supervisionAspect = new SupervisionAspect(supervisionScanner, partecipantScanner)) { + var participantScanner = mock(SupervisionParticipantScanner.class); + try (var supervisionAspect = new SupervisionAspect(supervisionScanner, participantScanner)) { supervisionAspect.schedule(); verify(supervisionScanner, timeout(500)).run(); - verify(partecipantScanner, timeout(500)).run(); + verify(participantScanner, timeout(500)).run(); } } @Test void testDoCheck() throws Exception { var supervisionScanner = mock(SupervisionScanner.class); - var partecipantScanner = mock(SupervisionPartecipantScanner.class); - try (var supervisionAspect = new SupervisionAspect(supervisionScanner, partecipantScanner)) { + var participantScanner = mock(SupervisionParticipantScanner.class); + try (var supervisionAspect = new SupervisionAspect(supervisionScanner, participantScanner)) { supervisionAspect.doCheck(); supervisionAspect.doCheck(); verify(supervisionScanner, timeout(500).times(2)).run(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java index 448666f8f..e8be3b6b7 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java @@ -30,7 +30,7 @@ import static org.onap.policy.clamp.acm.runtime.util.CommonTestData.TOSCA_SERVIC import java.util.Optional; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils; -import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; @@ -46,8 +46,7 @@ class SupervisionHandlerTest { participantPrimeAckMessage.setParticipantId(CommonTestData.getParticipantId()); participantPrimeAckMessage.setState(ParticipantState.ON_LINE); var acDefinitionProvider = mock(AcDefinitionProvider.class); - var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class); - var handler = new SupervisionHandler(acDefinitionProvider, acRuntimeParameterGroup); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); @@ -66,9 +65,7 @@ class SupervisionHandlerTest { var acDefinitionProvider = mock(AcDefinitionProvider.class); when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId())) .thenReturn(Optional.of(acDefinition)); - var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class); - - var handler = new SupervisionHandler(acDefinitionProvider, acRuntimeParameterGroup); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); @@ -93,7 +90,7 @@ class SupervisionHandlerTest { when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId())) .thenReturn(Optional.of(acDefinition)); - var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup()); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); @@ -120,7 +117,7 @@ class SupervisionHandlerTest { when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId())) .thenReturn(Optional.of(acDefinition)); - var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup()); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); @@ -150,7 +147,7 @@ class SupervisionHandlerTest { when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId())) .thenReturn(Optional.of(acDefinition)); - var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup()); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java index e352d2f2a..bebaa3319 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java @@ -23,6 +23,7 @@ package org.onap.policy.clamp.acm.runtime.supervision; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -37,6 +38,7 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -45,6 +47,7 @@ import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo; import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.Participant; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister; @@ -60,25 +63,26 @@ class SupervisionParticipantHandlerTest { @Test void testHandleParticipantDeregister() { - var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); + var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId()); var participantProvider = mock(ParticipantProvider.class); - when(participantProvider.findParticipant(CommonTestData.getParticipantId())) - .thenReturn(Optional.of(participant)); + when(participantProvider.findParticipantReplica(replica.getReplicaId())) + .thenReturn(Optional.of(replica)); var participantDeregisterMessage = new ParticipantDeregister(); participantDeregisterMessage.setMessageId(UUID.randomUUID()); participantDeregisterMessage.setParticipantId(CommonTestData.getParticipantId()); + participantDeregisterMessage.setReplicaId(replica.getReplicaId()); var participantDeregisterAckPublisher = mock(ParticipantDeregisterAckPublisher.class); var handler = new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), participantDeregisterAckPublisher, mock(AutomationCompositionProvider.class), mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); handler.handleParticipantMessage(participantDeregisterMessage); - verify(participantProvider).saveParticipant(any()); + verify(participantProvider).deleteParticipantReplica(CommonTestData.getReplicaId()); verify(participantDeregisterAckPublisher).send(participantDeregisterMessage.getMessageId()); } @@ -95,7 +99,7 @@ class SupervisionParticipantHandlerTest { var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher, mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class), mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); handler.handleParticipantMessage(participantRegisterMessage); verify(participantProvider).saveParticipant(any()); @@ -109,13 +113,18 @@ class SupervisionParticipantHandlerTest { participantRegisterMessage.setMessageId(UUID.randomUUID()); var participantId = CommonTestData.getParticipantId(); participantRegisterMessage.setParticipantId(participantId); + participantRegisterMessage.setReplicaId(participantId); var supportedElementType = CommonTestData.createParticipantSupportedElementType(); participantRegisterMessage.setParticipantSupportedElementType(List.of(supportedElementType)); var participant = new Participant(); + var replica = new ParticipantReplica(); + replica.setReplicaId(participantId); participant.setParticipantId(participantId); + participant.getReplicas().put(replica.getReplicaId(), replica); var participantProvider = mock(ParticipantProvider.class); when(participantProvider.findParticipant(participantId)).thenReturn(Optional.of(participant)); + when(participantProvider.findParticipantReplica(participantId)).thenReturn(Optional.of(replica)); var compositionId = UUID.randomUUID(); var composition2Id = UUID.randomUUID(); when(participantProvider.getCompositionIds(participantId)).thenReturn(Set.of(compositionId, composition2Id)); @@ -145,7 +154,8 @@ class SupervisionParticipantHandlerTest { var participantRestartPublisher = mock(ParticipantRestartPublisher.class); var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher, mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, acDefinitionProvider, - participantRestartPublisher, CommonTestData.getTestParamaterGroup()); + participantRestartPublisher, mock(ParticipantSyncPublisher.class), + CommonTestData.getTestParamaterGroup()); handler.handleParticipantMessage(participantRegisterMessage); verify(participantRegisterAckPublisher).send(participantRegisterMessage.getMessageId(), participantId); @@ -155,6 +165,65 @@ class SupervisionParticipantHandlerTest { } @Test + void testHandleParticipantSyncRestart() { + var participantRegisterMessage = new ParticipantRegister(); + participantRegisterMessage.setMessageId(UUID.randomUUID()); + var participantId = CommonTestData.getParticipantId(); + participantRegisterMessage.setParticipantId(participantId); + var replicaId = CommonTestData.getReplicaId(); + participantRegisterMessage.setReplicaId(replicaId); + var supportedElementType = CommonTestData.createParticipantSupportedElementType(); + participantRegisterMessage.setParticipantSupportedElementType(List.of(supportedElementType)); + + var participant = new Participant(); + var replica = new ParticipantReplica(); + replica.setReplicaId(replicaId); + participant.setParticipantId(participantId); + participant.getReplicas().put(replica.getReplicaId(), replica); + var participantProvider = mock(ParticipantProvider.class); + when(participantProvider.findParticipant(participantId)).thenReturn(Optional.of(participant)); + when(participantProvider.findParticipantReplica(replicaId)).thenReturn(Optional.of(replica)); + var compositionId = UUID.randomUUID(); + var composition2Id = UUID.randomUUID(); + when(participantProvider.getCompositionIds(participantId)).thenReturn(Set.of(compositionId, composition2Id)); + + var acDefinitionProvider = mock(AcDefinitionProvider.class); + var acDefinition = new AutomationCompositionDefinition(); + acDefinition.setState(AcTypeState.COMMISSIONED); + acDefinition.setCompositionId(composition2Id); + when(acDefinitionProvider.getAcDefinition(composition2Id)).thenReturn(acDefinition); + + acDefinition = new AutomationCompositionDefinition(); + acDefinition.setCompositionId(compositionId); + acDefinition.setState(AcTypeState.PRIMED); + var nodeTemplateState = new NodeTemplateState(); + nodeTemplateState.setParticipantId(participantId); + acDefinition.setElementStateMap(Map.of("code", nodeTemplateState)); + when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition); + + var automationComposition = + InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Crud"); + automationComposition.getElements().values().iterator().next().setParticipantId(participantId); + var automationCompositionProvider = mock(AutomationCompositionProvider.class); + when(automationCompositionProvider.getAcInstancesByCompositionId(compositionId)) + .thenReturn(List.of(automationComposition)); + + var participantRegisterAckPublisher = mock(ParticipantRegisterAckPublisher.class); + var participantSyncPublisher = mock(ParticipantSyncPublisher.class); + var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher, + mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, acDefinitionProvider, + mock(ParticipantRestartPublisher.class), participantSyncPublisher, + CommonTestData.getTestParamaterGroup()); + handler.handleParticipantMessage(participantRegisterMessage); + + verify(participantRegisterAckPublisher).send(participantRegisterMessage.getMessageId(), participantId); + verify(acDefinitionProvider, times(0)).updateAcDefinition(any(AutomationCompositionDefinition.class), + eq(CommonTestData.TOSCA_COMP_NAME)); + verify(participantSyncPublisher) + .sendRestartMsg(any(), any(), any(AutomationCompositionDefinition.class), any()); + } + + @Test void testHandleParticipantStatus() { var participantStatusMessage = createParticipantStatus(); participantStatusMessage.setAutomationCompositionInfoList(List.of(new AutomationCompositionInfo())); @@ -165,7 +234,7 @@ class SupervisionParticipantHandlerTest { new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); when(participantProvider.findParticipant(CommonTestData.getParticipantId())) .thenReturn(Optional.of(participant)); @@ -201,7 +270,7 @@ class SupervisionParticipantHandlerTest { new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(ParticipantRestartPublisher.class), - CommonTestData.getTestParamaterGroup()); + mock(ParticipantSyncPublisher.class), CommonTestData.getTestParamaterGroup()); handler.handleParticipantMessage(participantStatusMessage); verify(acDefinitionProvider).updateAcDefinition(acDefinition, CommonTestData.TOSCA_COMP_NAME); @@ -218,7 +287,7 @@ class SupervisionParticipantHandlerTest { new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); handler.handleParticipantMessage(participantStatusMessage); verify(participantProvider).saveParticipant(any()); @@ -236,9 +305,8 @@ class SupervisionParticipantHandlerTest { new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); - participant.setParticipantState(ParticipantState.OFF_LINE); when(participantProvider.findParticipant(CommonTestData.getParticipantId())) .thenReturn(Optional.of(participant)); handler.handleParticipantMessage(participantStatusMessage); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java index 690ad9672..0ae1c1a06 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java @@ -29,28 +29,26 @@ import static org.mockito.Mockito.when; import java.util.List; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; class SupervisionParticipantScannerTest { @Test void testScanParticipant() { - var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant"); - acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1); - - var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); var participantProvider = mock(ParticipantProvider.class); - when(participantProvider.getParticipants()).thenReturn(List.of(participant)); + var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId()); + when(participantProvider.findReplicasOnLine()).thenReturn(List.of(replica)); - var supervisionScanner = new SupervisionPartecipantScanner(participantProvider, acRuntimeParameterGroup); + var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant"); + var supervisionScanner = new SupervisionParticipantScanner(participantProvider, acRuntimeParameterGroup); - participant.setParticipantState(ParticipantState.OFF_LINE); + acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(100000); supervisionScanner.run(); - verify(participantProvider, times(0)).saveParticipant(any()); + verify(participantProvider, times(0)).saveParticipantReplica(any()); - participant.setParticipantState(ParticipantState.ON_LINE); + acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1); + supervisionScanner = new SupervisionParticipantScanner(participantProvider, acRuntimeParameterGroup); supervisionScanner.run(); - verify(participantProvider, times(1)).saveParticipant(any()); + verify(participantProvider).deleteParticipantReplica(CommonTestData.getReplicaId()); } } diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java index d5163be14..fa5929f0b 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; @@ -101,7 +102,7 @@ class SupervisionScannerTest { var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class), - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any(), any()); } @@ -113,7 +114,7 @@ class SupervisionScannerTest { var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class), - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); // Ac Definition in Priming state verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any(), any()); @@ -121,7 +122,7 @@ class SupervisionScannerTest { acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1); supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class), - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); // set Timeout verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(), @@ -164,7 +165,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); // not in transition supervisionScanner.run(); @@ -192,7 +193,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(automationCompositionProvider).updateAutomationComposition(any(AutomationComposition.class)); @@ -213,7 +214,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(automationCompositionProvider).deleteAutomationComposition(automationComposition.getInstanceId()); @@ -232,7 +233,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class)); @@ -263,7 +264,7 @@ class SupervisionScannerTest { // verify timeout scenario var scannerObj2 = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); automationComposition.setLastMsg(TimestampHelper.now()); @@ -312,7 +313,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); @@ -347,7 +348,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class)); @@ -390,7 +391,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java index 766380ac4..cab5adf9c 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java @@ -35,7 +35,6 @@ import java.util.UUID; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler; @@ -149,7 +148,7 @@ class SupervisionMessagesTest { @Test void testParticipantPrimePublisherDecommissioning() { var publisher = new ParticipantPrimePublisher(mock(ParticipantProvider.class), - mock(AcmParticipantProvider.class), mock(AcRuntimeParameterGroup.class)); + mock(AcRuntimeParameterGroup.class)); var topicSink = mock(TopicSink.class); publisher.active(topicSink); publisher.sendDepriming(UUID.randomUUID()); @@ -170,8 +169,7 @@ class SupervisionMessagesTest { participantId); var participantProvider = mock(ParticipantProvider.class); when(participantProvider.getSupportedElementMap()).thenReturn(supportedElementMap); - var publisher = new ParticipantPrimePublisher(participantProvider, mock(AcmParticipantProvider.class), - CommonTestData.getTestParamaterGroup()); + var publisher = new ParticipantPrimePublisher(participantProvider, CommonTestData.getTestParamaterGroup()); var topicSink = mock(TopicSink.class); publisher.active(topicSink); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); @@ -262,29 +260,70 @@ class SupervisionMessagesTest { } @Test - void testParticipantSyncPublisher() { + void testParticipantSyncPublisherAutomationComposition() { var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup()); var topicSink = mock(TopicSink.class); publisher.active(topicSink); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); - var acmDefinition = new AutomationCompositionDefinition(); - acmDefinition.setCompositionId(UUID.randomUUID()); - acmDefinition.setServiceTemplate(serviceTemplate); - var acElements = AcmUtils - .extractAcElementsFromServiceTemplate(serviceTemplate, ""); - acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED)); - var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud"); + publisher.sendSync(serviceTemplate, automationComposition); + verify(topicSink).send(anyString()); + } + + @Test + void testParticipantSyncPublisherAcDefinition() { + var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup()); + var topicSink = mock(TopicSink.class); + publisher.active(topicSink); + + var acmDefinition = getAcmDefinition(); + publisher.sendSync(acmDefinition, null); + verify(topicSink).send(anyString()); + } + + @Test + void testParticipantSyncPublisherAcDefinitionCommissioned() { + var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup()); + var topicSink = mock(TopicSink.class); + publisher.active(topicSink); + var acmDefinition = getAcmDefinition(); + acmDefinition.setState(AcTypeState.COMMISSIONED); + publisher.sendSync(acmDefinition, UUID.randomUUID()); + verify(topicSink).send(anyString()); + } + + @Test + void testParticipantSyncPublisherRestart() { + var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup()); + var topicSink = mock(TopicSink.class); + publisher.active(topicSink); + + var automationComposition = + InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud"); var participantId = automationComposition.getElements().values().iterator().next().getParticipantId(); + var acmDefinition = getAcmDefinition(); acmDefinition.getElementStateMap().values().iterator().next().setParticipantId(participantId); - - publisher.send(participantId, acmDefinition, List.of(automationComposition)); + var replicaId = UUID.randomUUID(); + publisher.sendRestartMsg(participantId, replicaId, acmDefinition, List.of(automationComposition)); verify(topicSink).send(anyString()); } + private AutomationCompositionDefinition getAcmDefinition() { + var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); + var acmDefinition = new AutomationCompositionDefinition(); + acmDefinition.setCompositionId(UUID.randomUUID()); + acmDefinition.setState(AcTypeState.PRIMED); + acmDefinition.setServiceTemplate(serviceTemplate); + var acElements = AcmUtils + .extractAcElementsFromServiceTemplate(serviceTemplate, TOSCA_ELEMENT_NAME); + acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED)); + acmDefinition.getElementStateMap().values().forEach(element -> element.setParticipantId(UUID.randomUUID())); + return acmDefinition; + } + @Test void testParticipantRegisterListener() { final var participantRegister = new ParticipantRegister(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java index e031e0f5a..c3b5ff919 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java @@ -28,6 +28,7 @@ import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeEx import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.Participant; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType; import org.onap.policy.clamp.models.acm.utils.AcmUtils; @@ -88,12 +89,24 @@ public class CommonTestData { public static Participant createParticipant(UUID participantId) { var participant = new Participant(); participant.setParticipantId(participantId); - participant.setParticipantState(ParticipantState.ON_LINE); - participant.setLastMsg(TimestampHelper.now()); return participant; } /** + * Create a new ParticipantReplica. + * + * @param replicaId the replica id + * @return a new ParticipantReplica + */ + public static ParticipantReplica createParticipantReplica(UUID replicaId) { + var replica = new ParticipantReplica(); + replica.setReplicaId(replicaId); + replica.setParticipantState(ParticipantState.ON_LINE); + replica.setLastMsg(TimestampHelper.now()); + return replica; + } + + /** * Create a new ParticipantSupportedElementType. * * @return a new ParticipantSupportedElementType @@ -105,6 +118,10 @@ public class CommonTestData { return supportedElementType; } + public static UUID getReplicaId() { + return UUID.fromString("201c62b3-8918-41b9-a747-d21eb79c6c09"); + } + public static UUID getParticipantId() { return UUID.fromString("101c62b3-8918-41b9-a747-d21eb79c6c03"); } |