diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2024-06-18 16:32:30 +0100 |
---|---|---|
committer | Francesco Fiora <francesco.fiora@est.tech> | 2024-06-19 08:16:09 +0000 |
commit | 9cdfa4dc5aadaaf8ec11223c4991b61c0aa6d0b0 (patch) | |
tree | d91f07e3bf3054698f3702ce6c660af3ddc040d6 /runtime-acm/src/main | |
parent | 29d86f951b30f4941ee63b0d2badef810b856e53 (diff) |
Add support for sync messages in ACM-runtime
Issue-ID: POLICY-5035
Change-Id: Ibcf1c6a414a7ba9d1cafd42041551bb0fb198088
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-acm/src/main')
13 files changed, 232 insertions, 203 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java index 74ccb9cc6..8a56fbb1e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java @@ -29,7 +29,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -39,6 +38,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.Commissionin import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.TimestampHelper; import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; @@ -56,7 +56,7 @@ public class CommissioningProvider { private final AcDefinitionProvider acDefinitionProvider; private final AutomationCompositionProvider acProvider; - private final AcmParticipantProvider acmParticipantProvider; + private final ParticipantProvider participantProvider; private final AcTypeStateResolver acTypeStateResolver; private final ParticipantPrimePublisher participantPrimePublisher; private final AcRuntimeParameterGroup acRuntimeParameterGroup; @@ -229,7 +229,7 @@ public class CommissioningProvider { } } if (!participantIds.isEmpty()) { - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); } acmDefinition.setState(AcTypeState.DEPRIMING); acmDefinition.setLastMsg(TimestampHelper.now()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java index 220636b9d..2bf08220a 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; @@ -43,6 +42,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.Instantiatio import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ObjectValidationResult; @@ -69,7 +69,7 @@ public class AutomationCompositionInstantiationProvider { private final AcDefinitionProvider acDefinitionProvider; private final AcInstanceStateResolver acInstanceStateResolver; private final SupervisionAcHandler supervisionAcHandler; - private final AcmParticipantProvider acmParticipantProvider; + private final ParticipantProvider participantProvider; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -265,7 +265,7 @@ public class AutomationCompositionInstantiationProvider { var participantIds = acDefinitionOpt.get().getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); result.addResult(AcmUtils.validateAutomationComposition(automationComposition, acDefinitionOpt.get().getServiceTemplate(), @@ -331,7 +331,7 @@ public class AutomationCompositionInstantiationProvider { var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); var participantIds = acDefinition.getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); supervisionAcHandler.delete(automationComposition, acDefinition); var response = new InstantiationResponse(); response.setInstanceId(automationComposition.getInstanceId()); @@ -374,7 +374,7 @@ public class AutomationCompositionInstantiationProvider { var participantIds = acDefinition.getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); var result = acInstanceStateResolver.resolve(acInstanceStateUpdate.getDeployOrder(), acInstanceStateUpdate.getLockOrder(), automationComposition.getDeployState(), automationComposition.getLockState(), automationComposition.getStateChangeResult()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java index 282389a74..62ba7b017 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java @@ -20,12 +20,10 @@ package org.onap.policy.clamp.acm.runtime.participants; -import jakarta.ws.rs.core.Response; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.MapUtils; @@ -33,9 +31,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPu import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.ParticipantInformation; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; -import org.onap.policy.models.base.PfModelRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -94,12 +90,11 @@ public class AcmParticipantProvider { * @param participantId The UUID of the participant to send request to */ public void sendParticipantStatusRequest(UUID participantId) { - var participant = this.participantProvider.getParticipantById(participantId); + // check if participant is present + this.participantProvider.getParticipantById(participantId); LOGGER.debug("Requesting Participant Status Now ParticipantStatusReq"); participantStatusReqPublisher.send(participantId); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.saveParticipant(participant); } /** @@ -110,22 +105,6 @@ public class AcmParticipantProvider { this.participantStatusReqPublisher.send((UUID) null); } - /** - * Verify Participant state. - * - * @param participantIds The list of UUIDs of the participants to get - * @throws PfModelRuntimeException in case the participant is offline - */ - public void verifyParticipantState(Set<UUID> participantIds) { - for (UUID participantId : participantIds) { - var participant = this.participantProvider.getParticipantById(participantId); - if (! participant.getParticipantState().equals(ParticipantState.ON_LINE)) { - throw new PfModelRuntimeException(Response.Status.CONFLICT, - "Participant: " + participantId + " is OFFLINE"); - } - } - } - private Map<UUID, AutomationCompositionElement> getAutomationCompositionElementsForParticipant(UUID participantId) { var automationCompositionElements = participantProvider .getAutomationCompositionElements(participantId); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java index 802c6603b..3e2057ed5 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java @@ -33,6 +33,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -42,6 +43,7 @@ import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck; +import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.slf4j.Logger; @@ -58,12 +60,14 @@ public class SupervisionAcHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAcHandler.class); private final AutomationCompositionProvider automationCompositionProvider; + private final AcDefinitionProvider acDefinitionProvider; // Publishers for participant communication private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher; private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher; private final AcElementPropertiesPublisher acElementPropertiesPublisher; private final AutomationCompositionMigrationPublisher acCompositionMigrationPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; private final ExecutorService executor = Context.taskWrapping(Executors.newFixedThreadPool(1)); @@ -260,6 +264,8 @@ public class SupervisionAcHandler { automationCompositionAckMessage.getStateChangeResult()); if (updated) { automationCompositionProvider.updateAutomationComposition(automationComposition); + var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); + participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), automationComposition); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java index 8f3a4c2eb..9ef979f8e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java @@ -41,9 +41,9 @@ public class SupervisionAspect implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class); private final SupervisionScanner supervisionScanner; - private final SupervisionPartecipantScanner partecipantScanner; + private final SupervisionParticipantScanner participantScanner; - private ThreadPoolExecutor executor = + private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); @Scheduled( @@ -56,7 +56,7 @@ public class SupervisionAspect implements Closeable { private void executeScan() { supervisionScanner.run(); - partecipantScanner.run(); + participantScanner.run(); } /** diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java index 963e4830e..a4e470495 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java @@ -23,9 +23,10 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; import lombok.AllArgsConstructor; -import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; @@ -43,7 +44,7 @@ public class SupervisionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class); private final AcDefinitionProvider acDefinitionProvider; - private final AcRuntimeParameterGroup acRuntimeParameterGroup; + private final ParticipantSyncPublisher participantSyncPublisher; /** * Handle a ParticipantPrimeAck message from a participant. @@ -82,12 +83,7 @@ public class SupervisionHandler { boolean completed = true; boolean restarting = false; for (var element : acDefinition.getElementStateMap().values()) { - if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) { - element.setMessage(participantPrimeAckMessage.getMessage()); - element.setState(participantPrimeAckMessage.getCompositionState()); - element.setRestarting(null); - acDefinitionProvider.updateAcDefinitionElement(element, acDefinition.getCompositionId()); - } + handlePrimeAckElement(participantPrimeAckMessage, element); if (!finalState.equals(element.getState())) { completed = false; } @@ -110,6 +106,18 @@ public class SupervisionHandler { if (toUpdate) { acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(), acDefinition.getStateChangeResult(), acDefinition.getRestarting()); + if (!participantPrimeAckMessage.getParticipantId().equals(participantPrimeAckMessage.getReplicaId())) { + participantSyncPublisher.sendSync(acDefinition, participantPrimeAckMessage.getReplicaId()); + } + } + } + + private void handlePrimeAckElement(ParticipantPrimeAck participantPrimeAckMessage, NodeTemplateState element) { + if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) { + element.setMessage(participantPrimeAckMessage.getMessage()); + element.setState(participantPrimeAckMessage.getCompositionState()); + element.setRestarting(null); + acDefinitionProvider.updateAcDefinitionElement(element, participantPrimeAckMessage.getCompositionId()); } } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java index 609e036be..4c8c5815c 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java @@ -21,7 +21,6 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,11 +31,13 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.Participant; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; @@ -65,6 +66,7 @@ public class SupervisionParticipantHandler { private final AutomationCompositionProvider automationCompositionProvider; private final AcDefinitionProvider acDefinitionProvider; private final ParticipantRestartPublisher participantRestartPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -72,21 +74,11 @@ public class SupervisionParticipantHandler { * * @param participantRegisterMsg the ParticipantRegister message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received") public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) { - var participantOpt = participantProvider.findParticipant(participantRegisterMsg.getParticipantId()); - - if (participantOpt.isPresent()) { - var participant = participantOpt.get(); - checkOnline(participant); - handleRestart(participant.getParticipantId()); - } else { - var participant = createParticipant(participantRegisterMsg.getParticipantId(), - listToMap(participantRegisterMsg.getParticipantSupportedElementType())); - participantProvider.saveParticipant(participant); - - } + saveIfNotPresent(participantRegisterMsg.getReplicaId(), + participantRegisterMsg.getParticipantId(), + participantRegisterMsg.getParticipantSupportedElementType(), true); participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(), participantRegisterMsg.getParticipantId()); @@ -97,15 +89,13 @@ public class SupervisionParticipantHandler { * * @param participantDeregisterMsg the ParticipantDeregister message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received") public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) { - var participantOpt = participantProvider.findParticipant(participantDeregisterMsg.getParticipantId()); - - if (participantOpt.isPresent()) { - var participant = participantOpt.get(); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.saveParticipant(participant); + var replicaId = participantDeregisterMsg.getReplicaId() != null + ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId(); + var replicaOpt = participantProvider.findParticipantReplica(replicaId); + if (replicaOpt.isPresent()) { + participantProvider.deleteParticipantReplica(replicaId); } participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId()); @@ -116,32 +106,57 @@ public class SupervisionParticipantHandler { * * @param participantStatusMsg the ParticipantStatus message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received") public void handleParticipantMessage(ParticipantStatus participantStatusMsg) { + saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(), + participantStatusMsg.getParticipantSupportedElementType(), false); - var participantOpt = participantProvider.findParticipant(participantStatusMsg.getParticipantId()); - if (participantOpt.isEmpty()) { - var participant = createParticipant(participantStatusMsg.getParticipantId(), - listToMap(participantStatusMsg.getParticipantSupportedElementType())); - participantProvider.saveParticipant(participant); - } else { - checkOnline(participantOpt.get()); - } if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) { automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList()); } if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty() && participantStatusMsg.getCompositionId() != null) { updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(), - participantStatusMsg.getParticipantDefinitionUpdates()); + participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates()); } } - private void updateAcDefinitionOutProperties(UUID composotionId, List<ParticipantDefinition> list) { - var acDefinitionOpt = acDefinitionProvider.findAcDefinition(composotionId); + private void saveIfNotPresent(UUID msgReplicaId, UUID participantId, + List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) { + var replicaId = msgReplicaId != null ? msgReplicaId : participantId; + var replicaOpt = participantProvider.findParticipantReplica(replicaId); + if (replicaOpt.isPresent()) { + var replica = replicaOpt.get(); + checkOnline(replica); + } else { + var participant = getParticipant(participantId, listToMap(participantSupportedElementType)); + participant.getReplicas().put(replicaId, createReplica(replicaId)); + participantProvider.saveParticipant(participant); + } + if (registration) { + handleRestart(participantId, replicaId); + } + } + + private Participant getParticipant(UUID participantId, + Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) { + var participantOpt = participantProvider.findParticipant(participantId); + return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType)); + } + + private ParticipantReplica createReplica(UUID replicaId) { + var replica = new ParticipantReplica(); + replica.setReplicaId(replicaId); + replica.setParticipantState(ParticipantState.ON_LINE); + replica.setLastMsg(TimestampHelper.now()); + return replica; + + } + + private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) { + var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId); if (acDefinitionOpt.isEmpty()) { - LOGGER.error("Ac Definition with id {} not found", composotionId); + LOGGER.error("Ac Definition with id {} not found", compositionId); return; } var acDefinition = acDefinitionOpt.get(); @@ -155,26 +170,32 @@ public class SupervisionParticipantHandler { } acDefinitionProvider.updateAcDefinition(acDefinition, acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName()); + participantSyncPublisher.sendSync(acDefinition, replicaId); } - private void checkOnline(Participant participant) { - if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) { - participant.setParticipantState(ParticipantState.ON_LINE); + private void checkOnline(ParticipantReplica replica) { + if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) { + replica.setParticipantState(ParticipantState.ON_LINE); } - participant.setLastMsg(TimestampHelper.now()); - participantProvider.saveParticipant(participant); + replica.setLastMsg(TimestampHelper.now()); + participantProvider.saveParticipantReplica(replica); } - private void handleRestart(UUID participantId) { + private void handleRestart(UUID participantId, UUID replicaId) { var compositionIds = participantProvider.getCompositionIds(participantId); + var oldParticipant = participantId.equals(replicaId); for (var compositionId : compositionIds) { var acDefinition = acDefinitionProvider.getAcDefinition(compositionId); LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId()); - handleRestart(participantId, acDefinition); + if (oldParticipant) { + handleRestart(participantId, acDefinition); + } else { + handleSyncRestart(participantId, replicaId, acDefinition); + } } } - private void handleRestart(UUID participantId, AutomationCompositionDefinition acDefinition) { + private void handleRestart(final UUID participantId, AutomationCompositionDefinition acDefinition) { if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId()); return; @@ -185,14 +206,6 @@ public class SupervisionParticipantHandler { elementState.setRestarting(true); } } - var automationCompositionList = - automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); - List<AutomationComposition> automationCompositions = new ArrayList<>(); - for (var automationComposition : automationCompositionList) { - if (isAcToBeRestarted(participantId, automationComposition)) { - automationCompositions.add(automationComposition); - } - } // expected final state if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) { acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR); @@ -200,6 +213,11 @@ public class SupervisionParticipantHandler { acDefinition.setRestarting(true); acDefinitionProvider.updateAcDefinition(acDefinition, acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName()); + + var automationCompositionList = + automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); + var automationCompositions = automationCompositionList.stream() + .filter(ac -> isAcToBeRestarted(participantId, ac)).toList(); participantRestartPublisher.send(participantId, acDefinition, automationCompositions); } @@ -222,13 +240,34 @@ public class SupervisionParticipantHandler { return toAdd; } + private void handleSyncRestart(final UUID participantId, UUID replicaId, + AutomationCompositionDefinition acDefinition) { + if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { + LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId()); + return; + } + LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId()); + var automationCompositionList = + automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); + var automationCompositions = automationCompositionList.stream() + .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList(); + participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions); + } + + private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) { + for (var element : automationComposition.getElements().values()) { + if (participantId.equals(element.getParticipantId())) { + return true; + } + } + return false; + } + private Participant createParticipant(UUID participantId, Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) { var participant = new Participant(); participant.setParticipantId(participantId); participant.setParticipantSupportedElementTypes(participantSupportedElementType); - participant.setParticipantState(ParticipantState.ON_LINE); - participant.setLastMsg(TimestampHelper.now()); return participant; } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java index b7f0be8eb..4ada199b6 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java @@ -21,8 +21,7 @@ package org.onap.policy.clamp.acm.runtime.supervision; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.models.acm.concepts.Participant; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.TimestampHelper; import org.slf4j.Logger; @@ -33,20 +32,20 @@ import org.springframework.stereotype.Component; * This class is used to scan the automation compositions in the database and check if they are in the correct state. */ @Component -public class SupervisionPartecipantScanner { - private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionPartecipantScanner.class); +public class SupervisionParticipantScanner { + private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantScanner.class); private final long maxWaitMs; private final ParticipantProvider participantProvider; /** - * Constructor for instantiating SupervisionPartecipantScanner. + * Constructor for instantiating SupervisionParticipantScanner. * * @param participantProvider the Participant Provider * @param acRuntimeParameterGroup the parameters for the automation composition runtime */ - public SupervisionPartecipantScanner(final ParticipantProvider participantProvider, + public SupervisionParticipantScanner(final ParticipantProvider participantProvider, final AcRuntimeParameterGroup acRuntimeParameterGroup) { this.participantProvider = participantProvider; this.maxWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs(); @@ -56,27 +55,17 @@ public class SupervisionPartecipantScanner { * Run Scanning. */ public void run() { - LOGGER.debug("Scanning participans in the database . . ."); - - for (var participant : participantProvider.getParticipants()) { - scanParticipantStatus(participant); - } - - LOGGER.debug("Participans scan complete . . ."); + LOGGER.debug("Scanning participants in the database . . ."); + participantProvider.findReplicasOnLine().forEach(this::scanParticipantReplicaStatus); + LOGGER.debug("Participants scan complete . . ."); } - private void scanParticipantStatus(Participant participant) { - var id = participant.getParticipantId(); - if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) { - LOGGER.debug("report Participant is still OFF_LINE {}", id); - return; - } + private void scanParticipantReplicaStatus(ParticipantReplica replica) { var now = TimestampHelper.nowEpochMilli(); - var lastMsg = TimestampHelper.toEpochMilli(participant.getLastMsg()); + var lastMsg = TimestampHelper.toEpochMilli(replica.getLastMsg()); if ((now - lastMsg) > maxWaitMs) { - LOGGER.debug("report Participant OFF_LINE {}", id); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.saveParticipant(participant); + LOGGER.debug("Participant OFF_LINE {}", replica.getReplicaId()); + participantProvider.deleteParticipantReplica(replica.getReplicaId()); } } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java index 06d464671..75a2f0540 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -55,6 +56,7 @@ public class SupervisionScanner { private final AcDefinitionProvider acDefinitionProvider; private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher; private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; /** * Constructor for instantiating SupervisionScanner. @@ -69,11 +71,13 @@ public class SupervisionScanner { final AcDefinitionProvider acDefinitionProvider, final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher, final AutomationCompositionDeployPublisher automationCompositionDeployPublisher, + final ParticipantSyncPublisher participantSyncPublisher, final AcRuntimeParameterGroup acRuntimeParameterGroup) { this.automationCompositionProvider = automationCompositionProvider; this.acDefinitionProvider = acDefinitionProvider; this.automationCompositionStateChangePublisher = automationCompositionStateChangePublisher; this.automationCompositionDeployPublisher = automationCompositionDeployPublisher; + this.participantSyncPublisher = participantSyncPublisher; this.maxStatusWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs(); } @@ -118,6 +122,7 @@ public class SupervisionScanner { if (completed) { acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), finalState, StateChangeResult.NO_ERROR, null); + participantSyncPublisher.sendSync(acDefinition, null); } else { handleTimeout(acDefinition); } @@ -132,7 +137,6 @@ public class SupervisionScanner { || StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult())) { LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId()); - // Clear Timeout on automation composition return; } @@ -158,7 +162,7 @@ public class SupervisionScanner { LOGGER.debug("automation composition scan: transition state {} {} completed", automationComposition.getDeployState(), automationComposition.getLockState()); - complete(automationComposition); + complete(automationComposition, serviceTemplate); } else { LOGGER.debug("automation composition scan: transition state {} {} not completed", automationComposition.getDeployState(), automationComposition.getLockState()); @@ -183,7 +187,8 @@ public class SupervisionScanner { } } - private void complete(final AutomationComposition automationComposition) { + private void complete(final AutomationComposition automationComposition, + ToscaServiceTemplate serviceTemplate) { var deployState = automationComposition.getDeployState(); if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) { // migration scenario @@ -201,6 +206,7 @@ public class SupervisionScanner { } else { automationCompositionProvider.updateAutomationComposition(automationComposition); } + participantSyncPublisher.sendSync(serviceTemplate, automationComposition); } private void handleTimeout(AutomationCompositionDefinition acDefinition) { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java index 89763a2b6..b0848bd51 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.UUID; import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; @@ -55,7 +54,6 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantPrimePublisher.class); private final ParticipantProvider participantProvider; - private final AcmParticipantProvider acmParticipantProvider; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -99,9 +97,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); elementState.setState(AcTypeState.PRIMING); participantIds.add(elementState.getParticipantId()); - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - supportedElementMap.put(type, elementState.getParticipantId()); + supportedElementMap.put(AcmUtils.getType(elementEntry.getValue()), elementState.getParticipantId()); } } else { // scenario Prime participants not assigned yet @@ -109,16 +105,14 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part for (var elementEntry : acElements) { var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); elementState.setState(AcTypeState.PRIMING); - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - var participantId = supportedElementMap.get(type); + var participantId = supportedElementMap.get(AcmUtils.getType(elementEntry.getValue())); if (participantId != null) { elementState.setParticipantId(participantId); participantIds.add(participantId); } } } - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); return AcmUtils.prepareParticipantPriming(acElements, supportedElementMap); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java index 4f28eab8e..3fe46a928 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java @@ -22,22 +22,15 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import io.micrometer.core.annotation.Timed; import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.UUID; import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; -import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart; import org.onap.policy.clamp.models.acm.utils.AcmUtils; -import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; -import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -66,52 +59,18 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher<Pa message.setMessageId(UUID.randomUUID()); message.setTimestamp(Instant.now()); message.setState(acmDefinition.getState()); - message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition)); + message.setParticipantDefinitionUpdates( + AcmUtils.prepareParticipantRestarting(participantId, acmDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate()); for (var automationComposition : automationCompositions) { - var restartAc = new ParticipantRestartAc(); - restartAc.setAutomationCompositionId(automationComposition.getInstanceId()); - for (var element : automationComposition.getElements().values()) { - if (participantId.equals(element.getParticipantId())) { - var acElementRestart = AcmUtils.createAcElementRestart(element); - acElementRestart.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); - restartAc.getAcElementList().add(acElementRestart); - } - } + var restartAc = AcmUtils + .createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment); message.getAutomationcompositionList().add(restartAc); } LOGGER.debug("Participant Restart sent {}", message); super.send(message); } - - protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId, - AutomationCompositionDefinition acmDefinition) { - var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(), - acRuntimeParameterGroup.getAcmParameters().getToscaElementName()); - - // list of entry filtered by participantId - List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>(); - Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>(); - for (var elementEntry : acElements) { - var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); - if (participantId.equals(elementState.getParticipantId())) { - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - supportedElementMap.put(type, participantId); - elementList.add(elementEntry); - } - } - var list = AcmUtils.prepareParticipantPriming(elementList, supportedElementMap); - for (var participantDefinition : list) { - for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) { - var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName()); - if (state != null) { - elementDe.setOutProperties(state.getOutProperties()); - } - } - } - return list; - } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java index 76feee72f..2eb434b64 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java @@ -40,7 +40,7 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher< */ @Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published") public void send(UUID participantId) { - ParticipantStatusReq message = new ParticipantStatusReq(); + var message = new ParticipantStatusReq(); message.setParticipantId(participantId); message.setTimestamp(Instant.now()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java index ae7eda1ee..b63bc0a6b 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java @@ -23,69 +23,58 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import io.micrometer.core.annotation.Timed; import java.time.Instant; import java.util.List; -import java.util.Optional; import java.util.UUID; +import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; +import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync; import org.onap.policy.clamp.models.acm.utils.AcmUtils; -import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; - @Component -public class ParticipantSyncPublisher extends ParticipantRestartPublisher { +@AllArgsConstructor +public class ParticipantSyncPublisher extends AbstractParticipantPublisher<ParticipantSync> { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class); - private final AcRuntimeParameterGroup acRuntimeParameterGroup; - public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) { - super(acRuntimeParameterGroup); - this.acRuntimeParameterGroup = acRuntimeParameterGroup; - } - - /** - * Send sync msg to Participant. + * Send Restart sync msg to Participant by participantId. * - * @param participantId the ParticipantId + * @param participantId the participantId + * @param replicaId the replicaId * @param acmDefinition the AutomationComposition Definition * @param automationCompositions the list of automationCompositions */ - @Override @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") - public void send(UUID participantId, AutomationCompositionDefinition acmDefinition, + public void sendRestartMsg(UUID participantId, UUID replicaId, AutomationCompositionDefinition acmDefinition, List<AutomationComposition> automationCompositions) { var message = new ParticipantSync(); message.setParticipantId(participantId); + message.setReplicaId(replicaId); + message.setRestarting(true); message.setCompositionId(acmDefinition.getCompositionId()); message.setMessageId(UUID.randomUUID()); message.setTimestamp(Instant.now()); message.setState(acmDefinition.getState()); - message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition)); + message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(participantId, acmDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate()); for (var automationComposition : automationCompositions) { - var syncAc = new ParticipantRestartAc(); - syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); - for (var element : automationComposition.getElements().values()) { - if (participantId.equals(element.getParticipantId())) { - var acElementSync = AcmUtils.createAcElementRestart(element); - acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); - syncAc.getAcElementList().add(acElementSync); - } - } + var syncAc = AcmUtils.createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment); message.getAutomationcompositionList().add(syncAc); } - LOGGER.debug("Participant Sync sent {}", message); + LOGGER.debug("Participant Restarting Sync sent {}", message); super.send(message); } @@ -98,4 +87,64 @@ public class ParticipantSyncPublisher extends ParticipantRestartPublisher { return false; } + /** + * Send AutomationCompositionDefinition sync msg to all Participants. + * + * @param acDefinition the AutomationComposition Definition + * @param excludeReplicaId the replica to be excluded + */ + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void sendSync(AutomationCompositionDefinition acDefinition, UUID excludeReplicaId) { + var message = new ParticipantSync(); + message.setCompositionId(acDefinition.getCompositionId()); + if (excludeReplicaId != null) { + message.getExcludeReplicas().add(excludeReplicaId); + } + message.setState(acDefinition.getState()); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { + message.setDelete(true); + } else { + message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); + } + LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message); + super.send(message); + } + + /** + * Send AutomationComposition sync msg to all Participants. + * + * @param serviceTemplate the ServiceTemplate + * @param automationComposition the automationComposition + */ + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void sendSync(ToscaServiceTemplate serviceTemplate, AutomationComposition automationComposition) { + var message = new ParticipantSync(); + message.setCompositionId(automationComposition.getCompositionId()); + message.setAutomationCompositionId(automationComposition.getInstanceId()); + message.setState(AcTypeState.PRIMED); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + var syncAc = new ParticipantRestartAc(); + syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); + syncAc.setDeployState(automationComposition.getDeployState()); + syncAc.setLockState(automationComposition.getLockState()); + if (DeployState.DELETED.equals(automationComposition.getDeployState())) { + message.setDelete(true); + } else { + var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(serviceTemplate); + for (var element : automationComposition.getElements().values()) { + var acElementSync = AcmUtils.createAcElementRestart(element); + acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); + syncAc.getAcElementList().add(acElementSync); + + } + } + message.getAutomationcompositionList().add(syncAc); + + LOGGER.debug("Participant AutomationComposition Sync sent {}", message); + super.send(message); + } } |