aboutsummaryrefslogtreecommitdiffstats
path: root/runtime-acm
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2024-06-18 16:32:30 +0100
committerFrancesco Fiora <francesco.fiora@est.tech>2024-06-19 08:16:09 +0000
commit9cdfa4dc5aadaaf8ec11223c4991b61c0aa6d0b0 (patch)
treed91f07e3bf3054698f3702ce6c660af3ddc040d6 /runtime-acm
parent29d86f951b30f4941ee63b0d2badef810b856e53 (diff)
Add support for sync messages in ACM-runtime
Issue-ID: POLICY-5035 Change-Id: Ibcf1c6a414a7ba9d1cafd42041551bb0fb198088 Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-acm')
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java10
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java25
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java24
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java143
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java (renamed from runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java)35
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java12
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java12
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java51
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java103
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java18
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java44
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java5
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java75
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java10
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java15
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java92
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java20
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java23
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java67
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java21
24 files changed, 500 insertions, 325 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
index 74ccb9cc6..8a56fbb1e 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -39,6 +38,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.Commissionin
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
import org.onap.policy.models.base.PfModelRuntimeException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
@@ -56,7 +56,7 @@ public class CommissioningProvider {
private final AcDefinitionProvider acDefinitionProvider;
private final AutomationCompositionProvider acProvider;
- private final AcmParticipantProvider acmParticipantProvider;
+ private final ParticipantProvider participantProvider;
private final AcTypeStateResolver acTypeStateResolver;
private final ParticipantPrimePublisher participantPrimePublisher;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
@@ -229,7 +229,7 @@ public class CommissioningProvider {
}
}
if (!participantIds.isEmpty()) {
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
}
acmDefinition.setState(AcTypeState.DEPRIMING);
acmDefinition.setLastMsg(TimestampHelper.now());
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
index 220636b9d..2bf08220a 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
@@ -29,7 +29,6 @@ import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -43,6 +42,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.Instantiatio
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.common.parameters.ObjectValidationResult;
@@ -69,7 +69,7 @@ public class AutomationCompositionInstantiationProvider {
private final AcDefinitionProvider acDefinitionProvider;
private final AcInstanceStateResolver acInstanceStateResolver;
private final SupervisionAcHandler supervisionAcHandler;
- private final AcmParticipantProvider acmParticipantProvider;
+ private final ParticipantProvider participantProvider;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
/**
@@ -265,7 +265,7 @@ public class AutomationCompositionInstantiationProvider {
var participantIds = acDefinitionOpt.get().getElementStateMap().values().stream()
.map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
result.addResult(AcmUtils.validateAutomationComposition(automationComposition,
acDefinitionOpt.get().getServiceTemplate(),
@@ -331,7 +331,7 @@ public class AutomationCompositionInstantiationProvider {
var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
var participantIds = acDefinition.getElementStateMap().values().stream()
.map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
supervisionAcHandler.delete(automationComposition, acDefinition);
var response = new InstantiationResponse();
response.setInstanceId(automationComposition.getInstanceId());
@@ -374,7 +374,7 @@ public class AutomationCompositionInstantiationProvider {
var participantIds = acDefinition.getElementStateMap().values().stream()
.map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
var result = acInstanceStateResolver.resolve(acInstanceStateUpdate.getDeployOrder(),
acInstanceStateUpdate.getLockOrder(), automationComposition.getDeployState(),
automationComposition.getLockState(), automationComposition.getStateChangeResult());
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
index 282389a74..62ba7b017 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
@@ -20,12 +20,10 @@
package org.onap.policy.clamp.acm.runtime.participants;
-import jakarta.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.MapUtils;
@@ -33,9 +31,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPu
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantInformation;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
-import org.onap.policy.models.base.PfModelRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -94,12 +90,11 @@ public class AcmParticipantProvider {
* @param participantId The UUID of the participant to send request to
*/
public void sendParticipantStatusRequest(UUID participantId) {
- var participant = this.participantProvider.getParticipantById(participantId);
+ // check if participant is present
+ this.participantProvider.getParticipantById(participantId);
LOGGER.debug("Requesting Participant Status Now ParticipantStatusReq");
participantStatusReqPublisher.send(participantId);
- participant.setParticipantState(ParticipantState.OFF_LINE);
- participantProvider.saveParticipant(participant);
}
/**
@@ -110,22 +105,6 @@ public class AcmParticipantProvider {
this.participantStatusReqPublisher.send((UUID) null);
}
- /**
- * Verify Participant state.
- *
- * @param participantIds The list of UUIDs of the participants to get
- * @throws PfModelRuntimeException in case the participant is offline
- */
- public void verifyParticipantState(Set<UUID> participantIds) {
- for (UUID participantId : participantIds) {
- var participant = this.participantProvider.getParticipantById(participantId);
- if (! participant.getParticipantState().equals(ParticipantState.ON_LINE)) {
- throw new PfModelRuntimeException(Response.Status.CONFLICT,
- "Participant: " + participantId + " is OFFLINE");
- }
- }
- }
-
private Map<UUID, AutomationCompositionElement> getAutomationCompositionElementsForParticipant(UUID participantId) {
var automationCompositionElements = participantProvider
.getAutomationCompositionElements(participantId);
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
index 802c6603b..3e2057ed5 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
@@ -33,6 +33,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -42,6 +43,7 @@ import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
+import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
import org.slf4j.Logger;
@@ -58,12 +60,14 @@ public class SupervisionAcHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAcHandler.class);
private final AutomationCompositionProvider automationCompositionProvider;
+ private final AcDefinitionProvider acDefinitionProvider;
// Publishers for participant communication
private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher;
private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
private final AcElementPropertiesPublisher acElementPropertiesPublisher;
private final AutomationCompositionMigrationPublisher acCompositionMigrationPublisher;
+ private final ParticipantSyncPublisher participantSyncPublisher;
private final ExecutorService executor = Context.taskWrapping(Executors.newFixedThreadPool(1));
@@ -260,6 +264,8 @@ public class SupervisionAcHandler {
automationCompositionAckMessage.getStateChangeResult());
if (updated) {
automationCompositionProvider.updateAutomationComposition(automationComposition);
+ var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
+ participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), automationComposition);
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
index 8f3a4c2eb..9ef979f8e 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
@@ -41,9 +41,9 @@ public class SupervisionAspect implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class);
private final SupervisionScanner supervisionScanner;
- private final SupervisionPartecipantScanner partecipantScanner;
+ private final SupervisionParticipantScanner participantScanner;
- private ThreadPoolExecutor executor =
+ private final ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
@Scheduled(
@@ -56,7 +56,7 @@ public class SupervisionAspect implements Closeable {
private void executeScan() {
supervisionScanner.run();
- partecipantScanner.run();
+ participantScanner.run();
}
/**
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
index 963e4830e..a4e470495 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
@@ -23,9 +23,10 @@ package org.onap.policy.clamp.acm.runtime.supervision;
import io.micrometer.core.annotation.Timed;
import lombok.AllArgsConstructor;
-import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
@@ -43,7 +44,7 @@ public class SupervisionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class);
private final AcDefinitionProvider acDefinitionProvider;
- private final AcRuntimeParameterGroup acRuntimeParameterGroup;
+ private final ParticipantSyncPublisher participantSyncPublisher;
/**
* Handle a ParticipantPrimeAck message from a participant.
@@ -82,12 +83,7 @@ public class SupervisionHandler {
boolean completed = true;
boolean restarting = false;
for (var element : acDefinition.getElementStateMap().values()) {
- if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) {
- element.setMessage(participantPrimeAckMessage.getMessage());
- element.setState(participantPrimeAckMessage.getCompositionState());
- element.setRestarting(null);
- acDefinitionProvider.updateAcDefinitionElement(element, acDefinition.getCompositionId());
- }
+ handlePrimeAckElement(participantPrimeAckMessage, element);
if (!finalState.equals(element.getState())) {
completed = false;
}
@@ -110,6 +106,18 @@ public class SupervisionHandler {
if (toUpdate) {
acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(),
acDefinition.getStateChangeResult(), acDefinition.getRestarting());
+ if (!participantPrimeAckMessage.getParticipantId().equals(participantPrimeAckMessage.getReplicaId())) {
+ participantSyncPublisher.sendSync(acDefinition, participantPrimeAckMessage.getReplicaId());
+ }
+ }
+ }
+
+ private void handlePrimeAckElement(ParticipantPrimeAck participantPrimeAckMessage, NodeTemplateState element) {
+ if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) {
+ element.setMessage(participantPrimeAckMessage.getMessage());
+ element.setState(participantPrimeAckMessage.getCompositionState());
+ element.setRestarting(null);
+ acDefinitionProvider.updateAcDefinitionElement(element, participantPrimeAckMessage.getCompositionId());
}
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
index 609e036be..4c8c5815c 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
@@ -21,7 +21,6 @@
package org.onap.policy.clamp.acm.runtime.supervision;
import io.micrometer.core.annotation.Timed;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,11 +31,13 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.Participant;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
@@ -65,6 +66,7 @@ public class SupervisionParticipantHandler {
private final AutomationCompositionProvider automationCompositionProvider;
private final AcDefinitionProvider acDefinitionProvider;
private final ParticipantRestartPublisher participantRestartPublisher;
+ private final ParticipantSyncPublisher participantSyncPublisher;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
/**
@@ -72,21 +74,11 @@ public class SupervisionParticipantHandler {
*
* @param participantRegisterMsg the ParticipantRegister message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) {
- var participantOpt = participantProvider.findParticipant(participantRegisterMsg.getParticipantId());
-
- if (participantOpt.isPresent()) {
- var participant = participantOpt.get();
- checkOnline(participant);
- handleRestart(participant.getParticipantId());
- } else {
- var participant = createParticipant(participantRegisterMsg.getParticipantId(),
- listToMap(participantRegisterMsg.getParticipantSupportedElementType()));
- participantProvider.saveParticipant(participant);
-
- }
+ saveIfNotPresent(participantRegisterMsg.getReplicaId(),
+ participantRegisterMsg.getParticipantId(),
+ participantRegisterMsg.getParticipantSupportedElementType(), true);
participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(),
participantRegisterMsg.getParticipantId());
@@ -97,15 +89,13 @@ public class SupervisionParticipantHandler {
*
* @param participantDeregisterMsg the ParticipantDeregister message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received")
public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) {
- var participantOpt = participantProvider.findParticipant(participantDeregisterMsg.getParticipantId());
-
- if (participantOpt.isPresent()) {
- var participant = participantOpt.get();
- participant.setParticipantState(ParticipantState.OFF_LINE);
- participantProvider.saveParticipant(participant);
+ var replicaId = participantDeregisterMsg.getReplicaId() != null
+ ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId();
+ var replicaOpt = participantProvider.findParticipantReplica(replicaId);
+ if (replicaOpt.isPresent()) {
+ participantProvider.deleteParticipantReplica(replicaId);
}
participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId());
@@ -116,32 +106,57 @@ public class SupervisionParticipantHandler {
*
* @param participantStatusMsg the ParticipantStatus message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
+ saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
+ participantStatusMsg.getParticipantSupportedElementType(), false);
- var participantOpt = participantProvider.findParticipant(participantStatusMsg.getParticipantId());
- if (participantOpt.isEmpty()) {
- var participant = createParticipant(participantStatusMsg.getParticipantId(),
- listToMap(participantStatusMsg.getParticipantSupportedElementType()));
- participantProvider.saveParticipant(participant);
- } else {
- checkOnline(participantOpt.get());
- }
if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList());
}
if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
&& participantStatusMsg.getCompositionId() != null) {
updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(),
- participantStatusMsg.getParticipantDefinitionUpdates());
+ participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates());
}
}
- private void updateAcDefinitionOutProperties(UUID composotionId, List<ParticipantDefinition> list) {
- var acDefinitionOpt = acDefinitionProvider.findAcDefinition(composotionId);
+ private void saveIfNotPresent(UUID msgReplicaId, UUID participantId,
+ List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) {
+ var replicaId = msgReplicaId != null ? msgReplicaId : participantId;
+ var replicaOpt = participantProvider.findParticipantReplica(replicaId);
+ if (replicaOpt.isPresent()) {
+ var replica = replicaOpt.get();
+ checkOnline(replica);
+ } else {
+ var participant = getParticipant(participantId, listToMap(participantSupportedElementType));
+ participant.getReplicas().put(replicaId, createReplica(replicaId));
+ participantProvider.saveParticipant(participant);
+ }
+ if (registration) {
+ handleRestart(participantId, replicaId);
+ }
+ }
+
+ private Participant getParticipant(UUID participantId,
+ Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
+ var participantOpt = participantProvider.findParticipant(participantId);
+ return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType));
+ }
+
+ private ParticipantReplica createReplica(UUID replicaId) {
+ var replica = new ParticipantReplica();
+ replica.setReplicaId(replicaId);
+ replica.setParticipantState(ParticipantState.ON_LINE);
+ replica.setLastMsg(TimestampHelper.now());
+ return replica;
+
+ }
+
+ private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) {
+ var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId);
if (acDefinitionOpt.isEmpty()) {
- LOGGER.error("Ac Definition with id {} not found", composotionId);
+ LOGGER.error("Ac Definition with id {} not found", compositionId);
return;
}
var acDefinition = acDefinitionOpt.get();
@@ -155,26 +170,32 @@ public class SupervisionParticipantHandler {
}
acDefinitionProvider.updateAcDefinition(acDefinition,
acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
+ participantSyncPublisher.sendSync(acDefinition, replicaId);
}
- private void checkOnline(Participant participant) {
- if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) {
- participant.setParticipantState(ParticipantState.ON_LINE);
+ private void checkOnline(ParticipantReplica replica) {
+ if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
+ replica.setParticipantState(ParticipantState.ON_LINE);
}
- participant.setLastMsg(TimestampHelper.now());
- participantProvider.saveParticipant(participant);
+ replica.setLastMsg(TimestampHelper.now());
+ participantProvider.saveParticipantReplica(replica);
}
- private void handleRestart(UUID participantId) {
+ private void handleRestart(UUID participantId, UUID replicaId) {
var compositionIds = participantProvider.getCompositionIds(participantId);
+ var oldParticipant = participantId.equals(replicaId);
for (var compositionId : compositionIds) {
var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId());
- handleRestart(participantId, acDefinition);
+ if (oldParticipant) {
+ handleRestart(participantId, acDefinition);
+ } else {
+ handleSyncRestart(participantId, replicaId, acDefinition);
+ }
}
}
- private void handleRestart(UUID participantId, AutomationCompositionDefinition acDefinition) {
+ private void handleRestart(final UUID participantId, AutomationCompositionDefinition acDefinition) {
if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
return;
@@ -185,14 +206,6 @@ public class SupervisionParticipantHandler {
elementState.setRestarting(true);
}
}
- var automationCompositionList =
- automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
- List<AutomationComposition> automationCompositions = new ArrayList<>();
- for (var automationComposition : automationCompositionList) {
- if (isAcToBeRestarted(participantId, automationComposition)) {
- automationCompositions.add(automationComposition);
- }
- }
// expected final state
if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) {
acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR);
@@ -200,6 +213,11 @@ public class SupervisionParticipantHandler {
acDefinition.setRestarting(true);
acDefinitionProvider.updateAcDefinition(acDefinition,
acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
+
+ var automationCompositionList =
+ automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
+ var automationCompositions = automationCompositionList.stream()
+ .filter(ac -> isAcToBeRestarted(participantId, ac)).toList();
participantRestartPublisher.send(participantId, acDefinition, automationCompositions);
}
@@ -222,13 +240,34 @@ public class SupervisionParticipantHandler {
return toAdd;
}
+ private void handleSyncRestart(final UUID participantId, UUID replicaId,
+ AutomationCompositionDefinition acDefinition) {
+ if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
+ LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
+ return;
+ }
+ LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
+ var automationCompositionList =
+ automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
+ var automationCompositions = automationCompositionList.stream()
+ .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList();
+ participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions);
+ }
+
+ private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) {
+ for (var element : automationComposition.getElements().values()) {
+ if (participantId.equals(element.getParticipantId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private Participant createParticipant(UUID participantId,
Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
var participant = new Participant();
participant.setParticipantId(participantId);
participant.setParticipantSupportedElementTypes(participantSupportedElementType);
- participant.setParticipantState(ParticipantState.ON_LINE);
- participant.setLastMsg(TimestampHelper.now());
return participant;
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java
index b7f0be8eb..4ada199b6 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java
@@ -21,8 +21,7 @@
package org.onap.policy.clamp.acm.runtime.supervision;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.models.acm.concepts.Participant;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
import org.slf4j.Logger;
@@ -33,20 +32,20 @@ import org.springframework.stereotype.Component;
* This class is used to scan the automation compositions in the database and check if they are in the correct state.
*/
@Component
-public class SupervisionPartecipantScanner {
- private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionPartecipantScanner.class);
+public class SupervisionParticipantScanner {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantScanner.class);
private final long maxWaitMs;
private final ParticipantProvider participantProvider;
/**
- * Constructor for instantiating SupervisionPartecipantScanner.
+ * Constructor for instantiating SupervisionParticipantScanner.
*
* @param participantProvider the Participant Provider
* @param acRuntimeParameterGroup the parameters for the automation composition runtime
*/
- public SupervisionPartecipantScanner(final ParticipantProvider participantProvider,
+ public SupervisionParticipantScanner(final ParticipantProvider participantProvider,
final AcRuntimeParameterGroup acRuntimeParameterGroup) {
this.participantProvider = participantProvider;
this.maxWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs();
@@ -56,27 +55,17 @@ public class SupervisionPartecipantScanner {
* Run Scanning.
*/
public void run() {
- LOGGER.debug("Scanning participans in the database . . .");
-
- for (var participant : participantProvider.getParticipants()) {
- scanParticipantStatus(participant);
- }
-
- LOGGER.debug("Participans scan complete . . .");
+ LOGGER.debug("Scanning participants in the database . . .");
+ participantProvider.findReplicasOnLine().forEach(this::scanParticipantReplicaStatus);
+ LOGGER.debug("Participants scan complete . . .");
}
- private void scanParticipantStatus(Participant participant) {
- var id = participant.getParticipantId();
- if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) {
- LOGGER.debug("report Participant is still OFF_LINE {}", id);
- return;
- }
+ private void scanParticipantReplicaStatus(ParticipantReplica replica) {
var now = TimestampHelper.nowEpochMilli();
- var lastMsg = TimestampHelper.toEpochMilli(participant.getLastMsg());
+ var lastMsg = TimestampHelper.toEpochMilli(replica.getLastMsg());
if ((now - lastMsg) > maxWaitMs) {
- LOGGER.debug("report Participant OFF_LINE {}", id);
- participant.setParticipantState(ParticipantState.OFF_LINE);
- participantProvider.saveParticipant(participant);
+ LOGGER.debug("Participant OFF_LINE {}", replica.getReplicaId());
+ participantProvider.deleteParticipantReplica(replica.getReplicaId());
}
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
index 06d464671..75a2f0540 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -55,6 +56,7 @@ public class SupervisionScanner {
private final AcDefinitionProvider acDefinitionProvider;
private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher;
+ private final ParticipantSyncPublisher participantSyncPublisher;
/**
* Constructor for instantiating SupervisionScanner.
@@ -69,11 +71,13 @@ public class SupervisionScanner {
final AcDefinitionProvider acDefinitionProvider,
final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher,
final AutomationCompositionDeployPublisher automationCompositionDeployPublisher,
+ final ParticipantSyncPublisher participantSyncPublisher,
final AcRuntimeParameterGroup acRuntimeParameterGroup) {
this.automationCompositionProvider = automationCompositionProvider;
this.acDefinitionProvider = acDefinitionProvider;
this.automationCompositionStateChangePublisher = automationCompositionStateChangePublisher;
this.automationCompositionDeployPublisher = automationCompositionDeployPublisher;
+ this.participantSyncPublisher = participantSyncPublisher;
this.maxStatusWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs();
}
@@ -118,6 +122,7 @@ public class SupervisionScanner {
if (completed) {
acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), finalState,
StateChangeResult.NO_ERROR, null);
+ participantSyncPublisher.sendSync(acDefinition, null);
} else {
handleTimeout(acDefinition);
}
@@ -132,7 +137,6 @@ public class SupervisionScanner {
|| StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult())) {
LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId());
- // Clear Timeout on automation composition
return;
}
@@ -158,7 +162,7 @@ public class SupervisionScanner {
LOGGER.debug("automation composition scan: transition state {} {} completed",
automationComposition.getDeployState(), automationComposition.getLockState());
- complete(automationComposition);
+ complete(automationComposition, serviceTemplate);
} else {
LOGGER.debug("automation composition scan: transition state {} {} not completed",
automationComposition.getDeployState(), automationComposition.getLockState());
@@ -183,7 +187,8 @@ public class SupervisionScanner {
}
}
- private void complete(final AutomationComposition automationComposition) {
+ private void complete(final AutomationComposition automationComposition,
+ ToscaServiceTemplate serviceTemplate) {
var deployState = automationComposition.getDeployState();
if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
// migration scenario
@@ -201,6 +206,7 @@ public class SupervisionScanner {
} else {
automationCompositionProvider.updateAutomationComposition(automationComposition);
}
+ participantSyncPublisher.sendSync(serviceTemplate, automationComposition);
}
private void handleTimeout(AutomationCompositionDefinition acDefinition) {
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
index 89763a2b6..b0848bd51 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
@@ -55,7 +54,6 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantPrimePublisher.class);
private final ParticipantProvider participantProvider;
- private final AcmParticipantProvider acmParticipantProvider;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
/**
@@ -99,9 +97,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
elementState.setState(AcTypeState.PRIMING);
participantIds.add(elementState.getParticipantId());
- var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
- elementEntry.getValue().getTypeVersion());
- supportedElementMap.put(type, elementState.getParticipantId());
+ supportedElementMap.put(AcmUtils.getType(elementEntry.getValue()), elementState.getParticipantId());
}
} else {
// scenario Prime participants not assigned yet
@@ -109,16 +105,14 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
for (var elementEntry : acElements) {
var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
elementState.setState(AcTypeState.PRIMING);
- var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
- elementEntry.getValue().getTypeVersion());
- var participantId = supportedElementMap.get(type);
+ var participantId = supportedElementMap.get(AcmUtils.getType(elementEntry.getValue()));
if (participantId != null) {
elementState.setParticipantId(participantId);
participantIds.add(participantId);
}
}
}
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
return AcmUtils.prepareParticipantPriming(acElements, supportedElementMap);
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
index 4f28eab8e..3fe46a928 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
@@ -22,22 +22,15 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import io.micrometer.core.annotation.Timed;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.UUID;
import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -66,52 +59,18 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher<Pa
message.setMessageId(UUID.randomUUID());
message.setTimestamp(Instant.now());
message.setState(acmDefinition.getState());
- message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+ message.setParticipantDefinitionUpdates(
+ AcmUtils.prepareParticipantRestarting(participantId, acmDefinition,
+ acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
for (var automationComposition : automationCompositions) {
- var restartAc = new ParticipantRestartAc();
- restartAc.setAutomationCompositionId(automationComposition.getInstanceId());
- for (var element : automationComposition.getElements().values()) {
- if (participantId.equals(element.getParticipantId())) {
- var acElementRestart = AcmUtils.createAcElementRestart(element);
- acElementRestart.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
- restartAc.getAcElementList().add(acElementRestart);
- }
- }
+ var restartAc = AcmUtils
+ .createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment);
message.getAutomationcompositionList().add(restartAc);
}
LOGGER.debug("Participant Restart sent {}", message);
super.send(message);
}
-
- protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
- AutomationCompositionDefinition acmDefinition) {
- var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(),
- acRuntimeParameterGroup.getAcmParameters().getToscaElementName());
-
- // list of entry filtered by participantId
- List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>();
- Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
- for (var elementEntry : acElements) {
- var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
- if (participantId.equals(elementState.getParticipantId())) {
- var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
- elementEntry.getValue().getTypeVersion());
- supportedElementMap.put(type, participantId);
- elementList.add(elementEntry);
- }
- }
- var list = AcmUtils.prepareParticipantPriming(elementList, supportedElementMap);
- for (var participantDefinition : list) {
- for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) {
- var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName());
- if (state != null) {
- elementDe.setOutProperties(state.getOutProperties());
- }
- }
- }
- return list;
- }
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
index 76feee72f..2eb434b64 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
@@ -40,7 +40,7 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher<
*/
@Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published")
public void send(UUID participantId) {
- ParticipantStatusReq message = new ParticipantStatusReq();
+ var message = new ParticipantStatusReq();
message.setParticipantId(participantId);
message.setTimestamp(Instant.now());
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
index ae7eda1ee..b63bc0a6b 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
@@ -23,69 +23,58 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import io.micrometer.core.annotation.Timed;
import java.time.Instant;
import java.util.List;
-import java.util.Optional;
import java.util.UUID;
+import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-
@Component
-public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
+@AllArgsConstructor
+public class ParticipantSyncPublisher extends AbstractParticipantPublisher<ParticipantSync> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class);
-
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
- public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) {
- super(acRuntimeParameterGroup);
- this.acRuntimeParameterGroup = acRuntimeParameterGroup;
- }
-
-
/**
- * Send sync msg to Participant.
+ * Send Restart sync msg to Participant by participantId.
*
- * @param participantId the ParticipantId
+ * @param participantId the participantId
+ * @param replicaId the replicaId
* @param acmDefinition the AutomationComposition Definition
* @param automationCompositions the list of automationCompositions
*/
- @Override
@Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
- public void send(UUID participantId, AutomationCompositionDefinition acmDefinition,
+ public void sendRestartMsg(UUID participantId, UUID replicaId, AutomationCompositionDefinition acmDefinition,
List<AutomationComposition> automationCompositions) {
var message = new ParticipantSync();
message.setParticipantId(participantId);
+ message.setReplicaId(replicaId);
+ message.setRestarting(true);
message.setCompositionId(acmDefinition.getCompositionId());
message.setMessageId(UUID.randomUUID());
message.setTimestamp(Instant.now());
message.setState(acmDefinition.getState());
- message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+ message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(participantId, acmDefinition,
+ acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
for (var automationComposition : automationCompositions) {
- var syncAc = new ParticipantRestartAc();
- syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
- for (var element : automationComposition.getElements().values()) {
- if (participantId.equals(element.getParticipantId())) {
- var acElementSync = AcmUtils.createAcElementRestart(element);
- acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
- syncAc.getAcElementList().add(acElementSync);
- }
- }
+ var syncAc = AcmUtils.createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment);
message.getAutomationcompositionList().add(syncAc);
}
- LOGGER.debug("Participant Sync sent {}", message);
+ LOGGER.debug("Participant Restarting Sync sent {}", message);
super.send(message);
}
@@ -98,4 +87,64 @@ public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
return false;
}
+ /**
+ * Send AutomationCompositionDefinition sync msg to all Participants.
+ *
+ * @param acDefinition the AutomationComposition Definition
+ * @param excludeReplicaId the replica to be excluded
+ */
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void sendSync(AutomationCompositionDefinition acDefinition, UUID excludeReplicaId) {
+ var message = new ParticipantSync();
+ message.setCompositionId(acDefinition.getCompositionId());
+ if (excludeReplicaId != null) {
+ message.getExcludeReplicas().add(excludeReplicaId);
+ }
+ message.setState(acDefinition.getState());
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+ if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
+ message.setDelete(true);
+ } else {
+ message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition,
+ acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
+ }
+ LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message);
+ super.send(message);
+ }
+
+ /**
+ * Send AutomationComposition sync msg to all Participants.
+ *
+ * @param serviceTemplate the ServiceTemplate
+ * @param automationComposition the automationComposition
+ */
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void sendSync(ToscaServiceTemplate serviceTemplate, AutomationComposition automationComposition) {
+ var message = new ParticipantSync();
+ message.setCompositionId(automationComposition.getCompositionId());
+ message.setAutomationCompositionId(automationComposition.getInstanceId());
+ message.setState(AcTypeState.PRIMED);
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+ var syncAc = new ParticipantRestartAc();
+ syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+ syncAc.setDeployState(automationComposition.getDeployState());
+ syncAc.setLockState(automationComposition.getLockState());
+ if (DeployState.DELETED.equals(automationComposition.getDeployState())) {
+ message.setDelete(true);
+ } else {
+ var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(serviceTemplate);
+ for (var element : automationComposition.getElements().values()) {
+ var acElementSync = AcmUtils.createAcElementRestart(element);
+ acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
+ syncAc.getAcElementList().add(acElementSync);
+
+ }
+ }
+ message.getAutomationcompositionList().add(syncAc);
+
+ LOGGER.debug("Participant AutomationComposition Sync sent {}", message);
+ super.send(message);
+ }
}
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java
index 5c26ea3bd..413719110 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -36,7 +36,6 @@ import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher;
import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
@@ -47,6 +46,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.PrimeOrder;
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
class CommissioningProviderTest {
@@ -165,7 +165,7 @@ class CommissioningProviderTest {
var participantPrimePublisher = mock(ParticipantPrimePublisher.class);
var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class),
- mock(AcmParticipantProvider.class), new AcTypeStateResolver(), participantPrimePublisher,
+ mock(ParticipantProvider.class), new AcTypeStateResolver(), participantPrimePublisher,
CommonTestData.getTestParamaterGroup());
var acTypeStateUpdate = new AcTypeStateUpdate();
@@ -184,15 +184,15 @@ class CommissioningProviderTest {
when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition);
var participantPrimePublisher = mock(ParticipantPrimePublisher.class);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class),
- acmParticipantProvider, new AcTypeStateResolver(), participantPrimePublisher,
+ participantProvider, new AcTypeStateResolver(), participantPrimePublisher,
CommonTestData.getTestParamaterGroup());
var acTypeStateUpdate = new AcTypeStateUpdate();
acTypeStateUpdate.setPrimeOrder(PrimeOrder.DEPRIME);
- doNothing().when(acmParticipantProvider).verifyParticipantState(any());
+ doNothing().when(participantProvider).verifyParticipantState(any());
provider.compositionDefinitionPriming(compositionId, acTypeStateUpdate);
verify(participantPrimePublisher, timeout(1000).times(1)).sendDepriming(compositionId);
}
@@ -201,7 +201,7 @@ class CommissioningProviderTest {
void testBadRequest() {
var acProvider = mock(AutomationCompositionProvider.class);
var provider = new CommissioningProvider(mock(AcDefinitionProvider.class), acProvider,
- mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
+ mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
mock(AcRuntimeParameterGroup.class));
var compositionId = UUID.randomUUID();
@@ -225,7 +225,7 @@ class CommissioningProviderTest {
when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition);
var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class),
- mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
+ mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
mock(AcRuntimeParameterGroup.class));
assertThatThrownBy(() -> provider.updateCompositionDefinition(compositionId, toscaServiceTemplate))
@@ -245,7 +245,7 @@ class CommissioningProviderTest {
when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition);
var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class),
- mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
+ mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
mock(AcRuntimeParameterGroup.class));
var acTypeStateUpdate = new AcTypeStateUpdate();
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java
index fbd8330fc..2ee6a152e 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java
@@ -37,7 +37,6 @@ import java.util.UUID;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
@@ -52,6 +51,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.ProviderUtils;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
import org.onap.policy.models.tosca.simple.concepts.JpaToscaServiceTemplate;
@@ -101,9 +101,9 @@ class AutomationCompositionInstantiationProviderTest {
when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition);
var acProvider = mock(AutomationCompositionProvider.class);
var supervisionAcHandler = mock(SupervisionAcHandler.class);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
- null, supervisionAcHandler, acmParticipantProvider,
+ null, supervisionAcHandler, participantProvider,
CommonTestData.getTestParamaterGroup());
var automationCompositionCreate =
InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Crud");
@@ -141,7 +141,7 @@ class AutomationCompositionInstantiationProviderTest {
when(acProvider.deleteAutomationComposition(automationCompositionUpdate.getInstanceId()))
.thenReturn(automationCompositionUpdate);
- doNothing().when(acmParticipantProvider).verifyParticipantState(any());
+ doNothing().when(participantProvider).verifyParticipantState(any());
instantiationProvider.deleteAutomationComposition(automationCompositionCreate.getCompositionId(),
automationCompositionCreate.getInstanceId());
@@ -167,9 +167,9 @@ class AutomationCompositionInstantiationProviderTest {
when(acProvider.updateAutomationComposition(acmFromDb)).thenReturn(acmFromDb);
var supervisionAcHandler = mock(SupervisionAcHandler.class);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
- null, supervisionAcHandler, acmParticipantProvider,
+ null, supervisionAcHandler, participantProvider,
CommonTestData.getTestParamaterGroup());
var instantiationResponse = instantiationProvider.updateAutomationComposition(
automationCompositionUpdate.getCompositionId(), automationCompositionUpdate);
@@ -201,7 +201,7 @@ class AutomationCompositionInstantiationProviderTest {
var instantiationProvider =
new AutomationCompositionInstantiationProvider(acProvider, mock(AcDefinitionProvider.class), null,
- mock(SupervisionAcHandler.class), mock(AcmParticipantProvider.class),
+ mock(SupervisionAcHandler.class), mock(ParticipantProvider.class),
mock(AcRuntimeParameterGroup.class));
var compositionId = automationCompositionUpdate.getCompositionId();
@@ -232,7 +232,7 @@ class AutomationCompositionInstantiationProviderTest {
var instantiationProvider =
new AutomationCompositionInstantiationProvider(acProvider, mock(AcDefinitionProvider.class), null,
- mock(SupervisionAcHandler.class), mock(AcmParticipantProvider.class),
+ mock(SupervisionAcHandler.class), mock(ParticipantProvider.class),
mock(AcRuntimeParameterGroup.class));
var compositionId = automationCompositionUpdate.getCompositionId();
@@ -273,9 +273,9 @@ class AutomationCompositionInstantiationProviderTest {
.thenReturn(automationCompositionUpdate);
var supervisionAcHandler = mock(SupervisionAcHandler.class);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
- null, supervisionAcHandler, acmParticipantProvider,
+ null, supervisionAcHandler, participantProvider,
mock(AcRuntimeParameterGroup.class));
assertThatThrownBy(
() -> instantiationProvider.updateAutomationComposition(compositionId, automationCompositionUpdate))
@@ -303,9 +303,9 @@ class AutomationCompositionInstantiationProviderTest {
when(acProvider.updateAutomationComposition(automationComposition)).thenReturn(automationComposition);
var supervisionAcHandler = mock(SupervisionAcHandler.class);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
- null, supervisionAcHandler, acmParticipantProvider, new AcRuntimeParameterGroup());
+ null, supervisionAcHandler, participantProvider, new AcRuntimeParameterGroup());
assertThatThrownBy(() -> instantiationProvider
.updateAutomationComposition(automationComposition.getCompositionId(), automationComposition))
@@ -352,9 +352,9 @@ class AutomationCompositionInstantiationProviderTest {
automationComposition.getElements().clear();
var supervisionAcHandler = mock(SupervisionAcHandler.class);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
- null, supervisionAcHandler, acmParticipantProvider, new AcRuntimeParameterGroup());
+ null, supervisionAcHandler, participantProvider, new AcRuntimeParameterGroup());
assertThatThrownBy(() -> instantiationProvider
.updateAutomationComposition(automationComposition.getCompositionId(), acMigrate))
@@ -373,11 +373,11 @@ class AutomationCompositionInstantiationProviderTest {
when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition);
automationComposition.setCompositionId(compositionId);
var supervisionAcHandler = mock(SupervisionAcHandler.class);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class);
var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
- null, supervisionAcHandler, acmParticipantProvider, acRuntimeParameterGroup);
+ null, supervisionAcHandler, participantProvider, acRuntimeParameterGroup);
when(acProvider.getAutomationComposition(automationComposition.getInstanceId()))
.thenReturn(automationComposition);
@@ -436,10 +436,10 @@ class AutomationCompositionInstantiationProviderTest {
var acProvider = mock(AutomationCompositionProvider.class);
when(acProvider.createAutomationComposition(automationCompositionCreate))
.thenReturn(automationCompositionCreate);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
- null, null, acmParticipantProvider,
+ null, null, participantProvider,
CommonTestData.getTestParamaterGroup());
var instantiationResponse = instantiationProvider.createAutomationComposition(
@@ -457,7 +457,7 @@ class AutomationCompositionInstantiationProviderTest {
@Test
void testCreateAutomationCompositions_CommissionedAcElementNotFound() {
var acDefinitionProvider = mock(AcDefinitionProvider.class);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
var compositionId = acDefinition.getCompositionId();
when(acDefinitionProvider.findAcDefinition(compositionId)).thenReturn(Optional.of(acDefinition));
@@ -467,7 +467,7 @@ class AutomationCompositionInstantiationProviderTest {
var acProvider = mock(AutomationCompositionProvider.class);
var provider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, null, null,
- acmParticipantProvider, CommonTestData.getTestParamaterGroup());
+ participantProvider, CommonTestData.getTestParamaterGroup());
assertThatThrownBy(() -> provider.createAutomationComposition(compositionId, automationComposition))
.hasMessageMatching(AC_ELEMENT_NAME_NOT_FOUND);
@@ -572,9 +572,9 @@ class AutomationCompositionInstantiationProviderTest {
when(acProvider.getAutomationComposition(instanceId)).thenReturn(automationComposition);
var supervisionAcHandler = mock(SupervisionAcHandler.class);
- var acmParticipantProvider = mock(AcmParticipantProvider.class);
+ var participantProvider = mock(ParticipantProvider.class);
var provider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
- new AcInstanceStateResolver(), supervisionAcHandler, acmParticipantProvider,
+ new AcInstanceStateResolver(), supervisionAcHandler, participantProvider,
mock(AcRuntimeParameterGroup.class));
var acInstanceStateUpdate = new AcInstanceStateUpdate();
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java
index bcfdea1dd..ca58fad51 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java
@@ -374,6 +374,9 @@ class InstantiationControllerTest extends CommonRestController {
}
private void saveDummyParticipantInDb() {
- participantProvider.saveParticipant(CommonTestData.createParticipant(CommonTestData.getParticipantId()));
+ var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
+ var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId());
+ participant.getReplicas().put(replica.getReplicaId(), replica);
+ participantProvider.saveParticipant(participant);
}
}
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java
index 8f39c9e2e..0bec9d0ce 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java
@@ -39,16 +39,19 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
+import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
class SupervisionAcHandlerTest {
@@ -64,9 +67,14 @@ class SupervisionAcHandlerTest {
when(automationCompositionProvider.findAutomationComposition(IDENTIFIER))
.thenReturn(Optional.of(automationComposition));
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var acDefinitionProvider = mock(AcDefinitionProvider.class);
+ when(acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()))
+ .thenReturn(new AutomationCompositionDefinition());
+
+ var handler = new SupervisionAcHandler(automationCompositionProvider, acDefinitionProvider,
mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
- mock(AcElementPropertiesPublisher.class), null);
+ mock(AcElementPropertiesPublisher.class), null,
+ mock(ParticipantSyncPublisher.class));
var automationCompositionAckMessage =
getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK,
@@ -100,14 +108,19 @@ class SupervisionAcHandlerTest {
when(automationCompositionProvider.findAutomationComposition(IDENTIFIER))
.thenReturn(Optional.of(automationComposition));
+ var acDefinitionProvider = mock(AcDefinitionProvider.class);
+ when(acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()))
+ .thenReturn(new AutomationCompositionDefinition());
+
var automationCompositionAckMessage =
getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_DEPLOY_ACK,
automationComposition, DeployState.DEPLOYED, LockState.LOCKED);
automationCompositionAckMessage.setParticipantId(CommonTestData.getParticipantId());
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var handler = new SupervisionAcHandler(automationCompositionProvider, acDefinitionProvider,
mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
- mock(AcElementPropertiesPublisher.class), null);
+ mock(AcElementPropertiesPublisher.class), null,
+ mock(ParticipantSyncPublisher.class));
handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage);
@@ -142,9 +155,9 @@ class SupervisionAcHandlerTest {
var automationCompositionStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
mock(AutomationCompositionDeployPublisher.class), automationCompositionStateChangePublisher, null,
- null);
+ null, mock(ParticipantSyncPublisher.class));
handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage);
@@ -156,8 +169,9 @@ class SupervisionAcHandlerTest {
void testDeployFailed() {
var automationCompositionDeployPublisher = mock(AutomationCompositionDeployPublisher.class);
var automationCompositionProvider = mock(AutomationCompositionProvider.class);
- var handler = new SupervisionAcHandler(automationCompositionProvider, automationCompositionDeployPublisher,
- mock(AutomationCompositionStateChangePublisher.class), mock(AcElementPropertiesPublisher.class), null);
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
+ automationCompositionDeployPublisher, mock(AutomationCompositionStateChangePublisher.class),
+ mock(AcElementPropertiesPublisher.class), null, mock(ParticipantSyncPublisher.class));
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
@@ -174,9 +188,10 @@ class SupervisionAcHandlerTest {
void testUndeploy() {
var automationCompositionProvider = mock(AutomationCompositionProvider.class);
var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
- mock(AcElementPropertiesPublisher.class), null);
+ mock(AcElementPropertiesPublisher.class), null,
+ mock(ParticipantSyncPublisher.class));
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
var automationComposition =
@@ -191,9 +206,10 @@ class SupervisionAcHandlerTest {
void testUndeployFailed() {
var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
var automationCompositionProvider = mock(AutomationCompositionProvider.class);
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
- mock(AcElementPropertiesPublisher.class), null);
+ mock(AcElementPropertiesPublisher.class), null,
+ mock(ParticipantSyncPublisher.class));
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
@@ -211,9 +227,10 @@ class SupervisionAcHandlerTest {
void testUnlock() {
var automationCompositionProvider = mock(AutomationCompositionProvider.class);
var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
- mock(AcElementPropertiesPublisher.class), null);
+ mock(AcElementPropertiesPublisher.class), null,
+ mock(ParticipantSyncPublisher.class));
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
var automationComposition =
@@ -228,9 +245,10 @@ class SupervisionAcHandlerTest {
void testUnlockFailed() {
var automationCompositionProvider = mock(AutomationCompositionProvider.class);
var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
- mock(AcElementPropertiesPublisher.class), null);
+ mock(AcElementPropertiesPublisher.class), null,
+ mock(ParticipantSyncPublisher.class));
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
var automationComposition =
@@ -247,9 +265,10 @@ class SupervisionAcHandlerTest {
void testLock() {
var automationCompositionProvider = mock(AutomationCompositionProvider.class);
var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
- mock(AcElementPropertiesPublisher.class), null);
+ mock(AcElementPropertiesPublisher.class), null,
+ mock(ParticipantSyncPublisher.class));
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
var automationComposition =
@@ -264,9 +283,10 @@ class SupervisionAcHandlerTest {
void testLockFailed() {
var automationCompositionProvider = mock(AutomationCompositionProvider.class);
var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
- mock(AcElementPropertiesPublisher.class), null);
+ mock(AcElementPropertiesPublisher.class), null,
+ mock(ParticipantSyncPublisher.class));
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
var automationComposition =
@@ -294,9 +314,10 @@ class SupervisionAcHandlerTest {
.setParticipantId(automationComposition.getElements().values().iterator().next().getParticipantId());
automationCompositionAckMessage.setAutomationCompositionId(IDENTIFIER);
- var handler = new SupervisionAcHandler(automationCompositionProvider,
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
- mock(AcElementPropertiesPublisher.class), null);
+ mock(AcElementPropertiesPublisher.class), null,
+ mock(ParticipantSyncPublisher.class));
handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage);
@@ -308,8 +329,9 @@ class SupervisionAcHandlerTest {
void testUpdate() {
var acElementPropertiesPublisher = mock(AcElementPropertiesPublisher.class);
var handler = new SupervisionAcHandler(mock(AutomationCompositionProvider.class),
- mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
- acElementPropertiesPublisher, null);
+ mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class),
+ mock(AutomationCompositionStateChangePublisher.class), acElementPropertiesPublisher, null,
+ mock(ParticipantSyncPublisher.class));
var automationComposition =
InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Lock");
handler.update(automationComposition);
@@ -320,8 +342,9 @@ class SupervisionAcHandlerTest {
void testMigrate() {
var automationCompositionProvider = mock(AutomationCompositionProvider.class);
var acCompositionMigrationPublisher = mock(AutomationCompositionMigrationPublisher.class);
- var handler = new SupervisionAcHandler(automationCompositionProvider, null, null, null,
- acCompositionMigrationPublisher);
+ var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
+ null, null, null,
+ acCompositionMigrationPublisher, mock(ParticipantSyncPublisher.class));
var automationComposition =
InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Migrate");
handler.migrate(automationComposition, UUID.randomUUID());
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java
index f78344bcb..7a72e0ef5 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java
@@ -32,19 +32,19 @@ class SupervisionAspectTest {
@Test
void testSchedule() throws Exception {
var supervisionScanner = mock(SupervisionScanner.class);
- var partecipantScanner = mock(SupervisionPartecipantScanner.class);
- try (var supervisionAspect = new SupervisionAspect(supervisionScanner, partecipantScanner)) {
+ var participantScanner = mock(SupervisionParticipantScanner.class);
+ try (var supervisionAspect = new SupervisionAspect(supervisionScanner, participantScanner)) {
supervisionAspect.schedule();
verify(supervisionScanner, timeout(500)).run();
- verify(partecipantScanner, timeout(500)).run();
+ verify(participantScanner, timeout(500)).run();
}
}
@Test
void testDoCheck() throws Exception {
var supervisionScanner = mock(SupervisionScanner.class);
- var partecipantScanner = mock(SupervisionPartecipantScanner.class);
- try (var supervisionAspect = new SupervisionAspect(supervisionScanner, partecipantScanner)) {
+ var participantScanner = mock(SupervisionParticipantScanner.class);
+ try (var supervisionAspect = new SupervisionAspect(supervisionScanner, participantScanner)) {
supervisionAspect.doCheck();
supervisionAspect.doCheck();
verify(supervisionScanner, timeout(500).times(2)).run();
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java
index 448666f8f..e8be3b6b7 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java
@@ -30,7 +30,7 @@ import static org.onap.policy.clamp.acm.runtime.util.CommonTestData.TOSCA_SERVIC
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
-import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
@@ -46,8 +46,7 @@ class SupervisionHandlerTest {
participantPrimeAckMessage.setParticipantId(CommonTestData.getParticipantId());
participantPrimeAckMessage.setState(ParticipantState.ON_LINE);
var acDefinitionProvider = mock(AcDefinitionProvider.class);
- var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class);
- var handler = new SupervisionHandler(acDefinitionProvider, acRuntimeParameterGroup);
+ var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
handler.handleParticipantMessage(participantPrimeAckMessage);
verify(acDefinitionProvider).findAcDefinition(any());
@@ -66,9 +65,7 @@ class SupervisionHandlerTest {
var acDefinitionProvider = mock(AcDefinitionProvider.class);
when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
.thenReturn(Optional.of(acDefinition));
- var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class);
-
- var handler = new SupervisionHandler(acDefinitionProvider, acRuntimeParameterGroup);
+ var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
handler.handleParticipantMessage(participantPrimeAckMessage);
verify(acDefinitionProvider).findAcDefinition(any());
@@ -93,7 +90,7 @@ class SupervisionHandlerTest {
when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
.thenReturn(Optional.of(acDefinition));
- var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup());
+ var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
handler.handleParticipantMessage(participantPrimeAckMessage);
verify(acDefinitionProvider).findAcDefinition(any());
@@ -120,7 +117,7 @@ class SupervisionHandlerTest {
when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
.thenReturn(Optional.of(acDefinition));
- var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup());
+ var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
handler.handleParticipantMessage(participantPrimeAckMessage);
verify(acDefinitionProvider).findAcDefinition(any());
@@ -150,7 +147,7 @@ class SupervisionHandlerTest {
when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
.thenReturn(Optional.of(acDefinition));
- var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup());
+ var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
handler.handleParticipantMessage(participantPrimeAckMessage);
verify(acDefinitionProvider).findAcDefinition(any());
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java
index e352d2f2a..bebaa3319 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java
@@ -23,6 +23,7 @@ package org.onap.policy.clamp.acm.runtime.supervision;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -37,6 +38,7 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -45,6 +47,7 @@ import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo;
import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
import org.onap.policy.clamp.models.acm.concepts.Participant;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
@@ -60,25 +63,26 @@ class SupervisionParticipantHandlerTest {
@Test
void testHandleParticipantDeregister() {
- var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
+ var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId());
var participantProvider = mock(ParticipantProvider.class);
- when(participantProvider.findParticipant(CommonTestData.getParticipantId()))
- .thenReturn(Optional.of(participant));
+ when(participantProvider.findParticipantReplica(replica.getReplicaId()))
+ .thenReturn(Optional.of(replica));
var participantDeregisterMessage = new ParticipantDeregister();
participantDeregisterMessage.setMessageId(UUID.randomUUID());
participantDeregisterMessage.setParticipantId(CommonTestData.getParticipantId());
+ participantDeregisterMessage.setReplicaId(replica.getReplicaId());
var participantDeregisterAckPublisher = mock(ParticipantDeregisterAckPublisher.class);
var handler =
new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
participantDeregisterAckPublisher, mock(AutomationCompositionProvider.class),
mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
- mock(AcRuntimeParameterGroup.class));
+ mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
handler.handleParticipantMessage(participantDeregisterMessage);
- verify(participantProvider).saveParticipant(any());
+ verify(participantProvider).deleteParticipantReplica(CommonTestData.getReplicaId());
verify(participantDeregisterAckPublisher).send(participantDeregisterMessage.getMessageId());
}
@@ -95,7 +99,7 @@ class SupervisionParticipantHandlerTest {
var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher,
mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class),
mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
- mock(AcRuntimeParameterGroup.class));
+ mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
handler.handleParticipantMessage(participantRegisterMessage);
verify(participantProvider).saveParticipant(any());
@@ -109,13 +113,18 @@ class SupervisionParticipantHandlerTest {
participantRegisterMessage.setMessageId(UUID.randomUUID());
var participantId = CommonTestData.getParticipantId();
participantRegisterMessage.setParticipantId(participantId);
+ participantRegisterMessage.setReplicaId(participantId);
var supportedElementType = CommonTestData.createParticipantSupportedElementType();
participantRegisterMessage.setParticipantSupportedElementType(List.of(supportedElementType));
var participant = new Participant();
+ var replica = new ParticipantReplica();
+ replica.setReplicaId(participantId);
participant.setParticipantId(participantId);
+ participant.getReplicas().put(replica.getReplicaId(), replica);
var participantProvider = mock(ParticipantProvider.class);
when(participantProvider.findParticipant(participantId)).thenReturn(Optional.of(participant));
+ when(participantProvider.findParticipantReplica(participantId)).thenReturn(Optional.of(replica));
var compositionId = UUID.randomUUID();
var composition2Id = UUID.randomUUID();
when(participantProvider.getCompositionIds(participantId)).thenReturn(Set.of(compositionId, composition2Id));
@@ -145,7 +154,8 @@ class SupervisionParticipantHandlerTest {
var participantRestartPublisher = mock(ParticipantRestartPublisher.class);
var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher,
mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, acDefinitionProvider,
- participantRestartPublisher, CommonTestData.getTestParamaterGroup());
+ participantRestartPublisher, mock(ParticipantSyncPublisher.class),
+ CommonTestData.getTestParamaterGroup());
handler.handleParticipantMessage(participantRegisterMessage);
verify(participantRegisterAckPublisher).send(participantRegisterMessage.getMessageId(), participantId);
@@ -155,6 +165,65 @@ class SupervisionParticipantHandlerTest {
}
@Test
+ void testHandleParticipantSyncRestart() {
+ var participantRegisterMessage = new ParticipantRegister();
+ participantRegisterMessage.setMessageId(UUID.randomUUID());
+ var participantId = CommonTestData.getParticipantId();
+ participantRegisterMessage.setParticipantId(participantId);
+ var replicaId = CommonTestData.getReplicaId();
+ participantRegisterMessage.setReplicaId(replicaId);
+ var supportedElementType = CommonTestData.createParticipantSupportedElementType();
+ participantRegisterMessage.setParticipantSupportedElementType(List.of(supportedElementType));
+
+ var participant = new Participant();
+ var replica = new ParticipantReplica();
+ replica.setReplicaId(replicaId);
+ participant.setParticipantId(participantId);
+ participant.getReplicas().put(replica.getReplicaId(), replica);
+ var participantProvider = mock(ParticipantProvider.class);
+ when(participantProvider.findParticipant(participantId)).thenReturn(Optional.of(participant));
+ when(participantProvider.findParticipantReplica(replicaId)).thenReturn(Optional.of(replica));
+ var compositionId = UUID.randomUUID();
+ var composition2Id = UUID.randomUUID();
+ when(participantProvider.getCompositionIds(participantId)).thenReturn(Set.of(compositionId, composition2Id));
+
+ var acDefinitionProvider = mock(AcDefinitionProvider.class);
+ var acDefinition = new AutomationCompositionDefinition();
+ acDefinition.setState(AcTypeState.COMMISSIONED);
+ acDefinition.setCompositionId(composition2Id);
+ when(acDefinitionProvider.getAcDefinition(composition2Id)).thenReturn(acDefinition);
+
+ acDefinition = new AutomationCompositionDefinition();
+ acDefinition.setCompositionId(compositionId);
+ acDefinition.setState(AcTypeState.PRIMED);
+ var nodeTemplateState = new NodeTemplateState();
+ nodeTemplateState.setParticipantId(participantId);
+ acDefinition.setElementStateMap(Map.of("code", nodeTemplateState));
+ when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition);
+
+ var automationComposition =
+ InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Crud");
+ automationComposition.getElements().values().iterator().next().setParticipantId(participantId);
+ var automationCompositionProvider = mock(AutomationCompositionProvider.class);
+ when(automationCompositionProvider.getAcInstancesByCompositionId(compositionId))
+ .thenReturn(List.of(automationComposition));
+
+ var participantRegisterAckPublisher = mock(ParticipantRegisterAckPublisher.class);
+ var participantSyncPublisher = mock(ParticipantSyncPublisher.class);
+ var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher,
+ mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, acDefinitionProvider,
+ mock(ParticipantRestartPublisher.class), participantSyncPublisher,
+ CommonTestData.getTestParamaterGroup());
+ handler.handleParticipantMessage(participantRegisterMessage);
+
+ verify(participantRegisterAckPublisher).send(participantRegisterMessage.getMessageId(), participantId);
+ verify(acDefinitionProvider, times(0)).updateAcDefinition(any(AutomationCompositionDefinition.class),
+ eq(CommonTestData.TOSCA_COMP_NAME));
+ verify(participantSyncPublisher)
+ .sendRestartMsg(any(), any(), any(AutomationCompositionDefinition.class), any());
+ }
+
+ @Test
void testHandleParticipantStatus() {
var participantStatusMessage = createParticipantStatus();
participantStatusMessage.setAutomationCompositionInfoList(List.of(new AutomationCompositionInfo()));
@@ -165,7 +234,7 @@ class SupervisionParticipantHandlerTest {
new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider,
mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
- mock(AcRuntimeParameterGroup.class));
+ mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
when(participantProvider.findParticipant(CommonTestData.getParticipantId()))
.thenReturn(Optional.of(participant));
@@ -201,7 +270,7 @@ class SupervisionParticipantHandlerTest {
new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class),
acDefinitionProvider, mock(ParticipantRestartPublisher.class),
- CommonTestData.getTestParamaterGroup());
+ mock(ParticipantSyncPublisher.class), CommonTestData.getTestParamaterGroup());
handler.handleParticipantMessage(participantStatusMessage);
verify(acDefinitionProvider).updateAcDefinition(acDefinition, CommonTestData.TOSCA_COMP_NAME);
@@ -218,7 +287,7 @@ class SupervisionParticipantHandlerTest {
new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider,
mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
- mock(AcRuntimeParameterGroup.class));
+ mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
handler.handleParticipantMessage(participantStatusMessage);
verify(participantProvider).saveParticipant(any());
@@ -236,9 +305,8 @@ class SupervisionParticipantHandlerTest {
new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider,
mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
- mock(AcRuntimeParameterGroup.class));
+ mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
- participant.setParticipantState(ParticipantState.OFF_LINE);
when(participantProvider.findParticipant(CommonTestData.getParticipantId()))
.thenReturn(Optional.of(participant));
handler.handleParticipantMessage(participantStatusMessage);
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java
index 690ad9672..0ae1c1a06 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java
@@ -29,28 +29,26 @@ import static org.mockito.Mockito.when;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
class SupervisionParticipantScannerTest {
@Test
void testScanParticipant() {
- var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant");
- acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1);
-
- var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
var participantProvider = mock(ParticipantProvider.class);
- when(participantProvider.getParticipants()).thenReturn(List.of(participant));
+ var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId());
+ when(participantProvider.findReplicasOnLine()).thenReturn(List.of(replica));
- var supervisionScanner = new SupervisionPartecipantScanner(participantProvider, acRuntimeParameterGroup);
+ var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant");
+ var supervisionScanner = new SupervisionParticipantScanner(participantProvider, acRuntimeParameterGroup);
- participant.setParticipantState(ParticipantState.OFF_LINE);
+ acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(100000);
supervisionScanner.run();
- verify(participantProvider, times(0)).saveParticipant(any());
+ verify(participantProvider, times(0)).saveParticipantReplica(any());
- participant.setParticipantState(ParticipantState.ON_LINE);
+ acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1);
+ supervisionScanner = new SupervisionParticipantScanner(participantProvider, acRuntimeParameterGroup);
supervisionScanner.run();
- verify(participantProvider, times(1)).saveParticipant(any());
+ verify(participantProvider).deleteParticipantReplica(CommonTestData.getReplicaId());
}
}
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java
index d5163be14..fa5929f0b 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -101,7 +102,7 @@ class SupervisionScannerTest {
var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class),
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
supervisionScanner.run();
verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any(), any());
}
@@ -113,7 +114,7 @@ class SupervisionScannerTest {
var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class),
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
supervisionScanner.run();
// Ac Definition in Priming state
verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any(), any());
@@ -121,7 +122,7 @@ class SupervisionScannerTest {
acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1);
supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class),
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
supervisionScanner.run();
// set Timeout
verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(),
@@ -164,7 +165,7 @@ class SupervisionScannerTest {
var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
// not in transition
supervisionScanner.run();
@@ -192,7 +193,7 @@ class SupervisionScannerTest {
var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
supervisionScanner.run();
verify(automationCompositionProvider).updateAutomationComposition(any(AutomationComposition.class));
@@ -213,7 +214,7 @@ class SupervisionScannerTest {
var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
supervisionScanner.run();
verify(automationCompositionProvider).deleteAutomationComposition(automationComposition.getInstanceId());
@@ -232,7 +233,7 @@ class SupervisionScannerTest {
var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
supervisionScanner.run();
verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class));
@@ -263,7 +264,7 @@ class SupervisionScannerTest {
// verify timeout scenario
var scannerObj2 = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
automationComposition.setLastMsg(TimestampHelper.now());
@@ -312,7 +313,7 @@ class SupervisionScannerTest {
var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
supervisionScanner.run();
@@ -347,7 +348,7 @@ class SupervisionScannerTest {
var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
supervisionScanner.run();
verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class));
@@ -390,7 +391,7 @@ class SupervisionScannerTest {
var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
- acRuntimeParameterGroup);
+ mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
supervisionScanner.run();
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java
index 766380ac4..cab5adf9c 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java
@@ -35,7 +35,6 @@ import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler;
@@ -149,7 +148,7 @@ class SupervisionMessagesTest {
@Test
void testParticipantPrimePublisherDecommissioning() {
var publisher = new ParticipantPrimePublisher(mock(ParticipantProvider.class),
- mock(AcmParticipantProvider.class), mock(AcRuntimeParameterGroup.class));
+ mock(AcRuntimeParameterGroup.class));
var topicSink = mock(TopicSink.class);
publisher.active(topicSink);
publisher.sendDepriming(UUID.randomUUID());
@@ -170,8 +169,7 @@ class SupervisionMessagesTest {
participantId);
var participantProvider = mock(ParticipantProvider.class);
when(participantProvider.getSupportedElementMap()).thenReturn(supportedElementMap);
- var publisher = new ParticipantPrimePublisher(participantProvider, mock(AcmParticipantProvider.class),
- CommonTestData.getTestParamaterGroup());
+ var publisher = new ParticipantPrimePublisher(participantProvider, CommonTestData.getTestParamaterGroup());
var topicSink = mock(TopicSink.class);
publisher.active(topicSink);
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
@@ -262,29 +260,70 @@ class SupervisionMessagesTest {
}
@Test
- void testParticipantSyncPublisher() {
+ void testParticipantSyncPublisherAutomationComposition() {
var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
var topicSink = mock(TopicSink.class);
publisher.active(topicSink);
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
- var acmDefinition = new AutomationCompositionDefinition();
- acmDefinition.setCompositionId(UUID.randomUUID());
- acmDefinition.setServiceTemplate(serviceTemplate);
- var acElements = AcmUtils
- .extractAcElementsFromServiceTemplate(serviceTemplate, "");
- acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED));
-
var automationComposition =
InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
+ publisher.sendSync(serviceTemplate, automationComposition);
+ verify(topicSink).send(anyString());
+ }
+
+ @Test
+ void testParticipantSyncPublisherAcDefinition() {
+ var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
+ var topicSink = mock(TopicSink.class);
+ publisher.active(topicSink);
+
+ var acmDefinition = getAcmDefinition();
+ publisher.sendSync(acmDefinition, null);
+ verify(topicSink).send(anyString());
+ }
+
+ @Test
+ void testParticipantSyncPublisherAcDefinitionCommissioned() {
+ var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
+ var topicSink = mock(TopicSink.class);
+ publisher.active(topicSink);
+ var acmDefinition = getAcmDefinition();
+ acmDefinition.setState(AcTypeState.COMMISSIONED);
+ publisher.sendSync(acmDefinition, UUID.randomUUID());
+ verify(topicSink).send(anyString());
+ }
+
+ @Test
+ void testParticipantSyncPublisherRestart() {
+ var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
+ var topicSink = mock(TopicSink.class);
+ publisher.active(topicSink);
+
+ var automationComposition =
+ InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
var participantId = automationComposition.getElements().values().iterator().next().getParticipantId();
+ var acmDefinition = getAcmDefinition();
acmDefinition.getElementStateMap().values().iterator().next().setParticipantId(participantId);
-
- publisher.send(participantId, acmDefinition, List.of(automationComposition));
+ var replicaId = UUID.randomUUID();
+ publisher.sendRestartMsg(participantId, replicaId, acmDefinition, List.of(automationComposition));
verify(topicSink).send(anyString());
}
+ private AutomationCompositionDefinition getAcmDefinition() {
+ var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
+ var acmDefinition = new AutomationCompositionDefinition();
+ acmDefinition.setCompositionId(UUID.randomUUID());
+ acmDefinition.setState(AcTypeState.PRIMED);
+ acmDefinition.setServiceTemplate(serviceTemplate);
+ var acElements = AcmUtils
+ .extractAcElementsFromServiceTemplate(serviceTemplate, TOSCA_ELEMENT_NAME);
+ acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED));
+ acmDefinition.getElementStateMap().values().forEach(element -> element.setParticipantId(UUID.randomUUID()));
+ return acmDefinition;
+ }
+
@Test
void testParticipantRegisterListener() {
final var participantRegister = new ParticipantRegister();
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java
index e031e0f5a..c3b5ff919 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java
@@ -28,6 +28,7 @@ import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeEx
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.Participant;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
@@ -88,12 +89,24 @@ public class CommonTestData {
public static Participant createParticipant(UUID participantId) {
var participant = new Participant();
participant.setParticipantId(participantId);
- participant.setParticipantState(ParticipantState.ON_LINE);
- participant.setLastMsg(TimestampHelper.now());
return participant;
}
/**
+ * Create a new ParticipantReplica.
+ *
+ * @param replicaId the replica id
+ * @return a new ParticipantReplica
+ */
+ public static ParticipantReplica createParticipantReplica(UUID replicaId) {
+ var replica = new ParticipantReplica();
+ replica.setReplicaId(replicaId);
+ replica.setParticipantState(ParticipantState.ON_LINE);
+ replica.setLastMsg(TimestampHelper.now());
+ return replica;
+ }
+
+ /**
* Create a new ParticipantSupportedElementType.
*
* @return a new ParticipantSupportedElementType
@@ -105,6 +118,10 @@ public class CommonTestData {
return supportedElementType;
}
+ public static UUID getReplicaId() {
+ return UUID.fromString("201c62b3-8918-41b9-a747-d21eb79c6c09");
+ }
+
public static UUID getParticipantId() {
return UUID.fromString("101c62b3-8918-41b9-a747-d21eb79c6c03");
}