aboutsummaryrefslogtreecommitdiffstats
path: root/runtime-acm/src/main
diff options
context:
space:
mode:
authorRamesh Murugan Iyer <ramesh.murugan.iyer@est.tech>2024-06-19 12:25:33 +0000
committerGerrit Code Review <gerrit@onap.org>2024-06-19 12:25:33 +0000
commita1ce07d06745bfe966ffc000ad2be84789a555d3 (patch)
treecae1549e985875afa33a237893c16c7f5c1249ec /runtime-acm/src/main
parentbe79e86eb56d7d72c7504cee491b79498f78d6ba (diff)
parent9cdfa4dc5aadaaf8ec11223c4991b61c0aa6d0b0 (diff)
Merge "Add support for sync messages in ACM-runtime"
Diffstat (limited to 'runtime-acm/src/main')
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java10
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java25
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java24
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java143
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java (renamed from runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java)35
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java12
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java12
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java51
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java103
13 files changed, 232 insertions, 203 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
index 74ccb9cc6..8a56fbb1e 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -39,6 +38,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.Commissionin
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
import org.onap.policy.models.base.PfModelRuntimeException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
@@ -56,7 +56,7 @@ public class CommissioningProvider {
private final AcDefinitionProvider acDefinitionProvider;
private final AutomationCompositionProvider acProvider;
- private final AcmParticipantProvider acmParticipantProvider;
+ private final ParticipantProvider participantProvider;
private final AcTypeStateResolver acTypeStateResolver;
private final ParticipantPrimePublisher participantPrimePublisher;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
@@ -229,7 +229,7 @@ public class CommissioningProvider {
}
}
if (!participantIds.isEmpty()) {
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
}
acmDefinition.setState(AcTypeState.DEPRIMING);
acmDefinition.setLastMsg(TimestampHelper.now());
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
index 220636b9d..2bf08220a 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
@@ -29,7 +29,6 @@ import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -43,6 +42,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.Instantiatio
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.common.parameters.ObjectValidationResult;
@@ -69,7 +69,7 @@ public class AutomationCompositionInstantiationProvider {
private final AcDefinitionProvider acDefinitionProvider;
private final AcInstanceStateResolver acInstanceStateResolver;
private final SupervisionAcHandler supervisionAcHandler;
- private final AcmParticipantProvider acmParticipantProvider;
+ private final ParticipantProvider participantProvider;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
/**
@@ -265,7 +265,7 @@ public class AutomationCompositionInstantiationProvider {
var participantIds = acDefinitionOpt.get().getElementStateMap().values().stream()
.map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
result.addResult(AcmUtils.validateAutomationComposition(automationComposition,
acDefinitionOpt.get().getServiceTemplate(),
@@ -331,7 +331,7 @@ public class AutomationCompositionInstantiationProvider {
var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
var participantIds = acDefinition.getElementStateMap().values().stream()
.map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
supervisionAcHandler.delete(automationComposition, acDefinition);
var response = new InstantiationResponse();
response.setInstanceId(automationComposition.getInstanceId());
@@ -374,7 +374,7 @@ public class AutomationCompositionInstantiationProvider {
var participantIds = acDefinition.getElementStateMap().values().stream()
.map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
var result = acInstanceStateResolver.resolve(acInstanceStateUpdate.getDeployOrder(),
acInstanceStateUpdate.getLockOrder(), automationComposition.getDeployState(),
automationComposition.getLockState(), automationComposition.getStateChangeResult());
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
index 282389a74..62ba7b017 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
@@ -20,12 +20,10 @@
package org.onap.policy.clamp.acm.runtime.participants;
-import jakarta.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.MapUtils;
@@ -33,9 +31,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPu
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantInformation;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
-import org.onap.policy.models.base.PfModelRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -94,12 +90,11 @@ public class AcmParticipantProvider {
* @param participantId The UUID of the participant to send request to
*/
public void sendParticipantStatusRequest(UUID participantId) {
- var participant = this.participantProvider.getParticipantById(participantId);
+ // check if participant is present
+ this.participantProvider.getParticipantById(participantId);
LOGGER.debug("Requesting Participant Status Now ParticipantStatusReq");
participantStatusReqPublisher.send(participantId);
- participant.setParticipantState(ParticipantState.OFF_LINE);
- participantProvider.saveParticipant(participant);
}
/**
@@ -110,22 +105,6 @@ public class AcmParticipantProvider {
this.participantStatusReqPublisher.send((UUID) null);
}
- /**
- * Verify Participant state.
- *
- * @param participantIds The list of UUIDs of the participants to get
- * @throws PfModelRuntimeException in case the participant is offline
- */
- public void verifyParticipantState(Set<UUID> participantIds) {
- for (UUID participantId : participantIds) {
- var participant = this.participantProvider.getParticipantById(participantId);
- if (! participant.getParticipantState().equals(ParticipantState.ON_LINE)) {
- throw new PfModelRuntimeException(Response.Status.CONFLICT,
- "Participant: " + participantId + " is OFFLINE");
- }
- }
- }
-
private Map<UUID, AutomationCompositionElement> getAutomationCompositionElementsForParticipant(UUID participantId) {
var automationCompositionElements = participantProvider
.getAutomationCompositionElements(participantId);
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
index 802c6603b..3e2057ed5 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
@@ -33,6 +33,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -42,6 +43,7 @@ import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
+import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
import org.slf4j.Logger;
@@ -58,12 +60,14 @@ public class SupervisionAcHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAcHandler.class);
private final AutomationCompositionProvider automationCompositionProvider;
+ private final AcDefinitionProvider acDefinitionProvider;
// Publishers for participant communication
private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher;
private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
private final AcElementPropertiesPublisher acElementPropertiesPublisher;
private final AutomationCompositionMigrationPublisher acCompositionMigrationPublisher;
+ private final ParticipantSyncPublisher participantSyncPublisher;
private final ExecutorService executor = Context.taskWrapping(Executors.newFixedThreadPool(1));
@@ -260,6 +264,8 @@ public class SupervisionAcHandler {
automationCompositionAckMessage.getStateChangeResult());
if (updated) {
automationCompositionProvider.updateAutomationComposition(automationComposition);
+ var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
+ participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), automationComposition);
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
index 8f3a4c2eb..9ef979f8e 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
@@ -41,9 +41,9 @@ public class SupervisionAspect implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class);
private final SupervisionScanner supervisionScanner;
- private final SupervisionPartecipantScanner partecipantScanner;
+ private final SupervisionParticipantScanner participantScanner;
- private ThreadPoolExecutor executor =
+ private final ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
@Scheduled(
@@ -56,7 +56,7 @@ public class SupervisionAspect implements Closeable {
private void executeScan() {
supervisionScanner.run();
- partecipantScanner.run();
+ participantScanner.run();
}
/**
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
index 963e4830e..a4e470495 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
@@ -23,9 +23,10 @@ package org.onap.policy.clamp.acm.runtime.supervision;
import io.micrometer.core.annotation.Timed;
import lombok.AllArgsConstructor;
-import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
@@ -43,7 +44,7 @@ public class SupervisionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class);
private final AcDefinitionProvider acDefinitionProvider;
- private final AcRuntimeParameterGroup acRuntimeParameterGroup;
+ private final ParticipantSyncPublisher participantSyncPublisher;
/**
* Handle a ParticipantPrimeAck message from a participant.
@@ -82,12 +83,7 @@ public class SupervisionHandler {
boolean completed = true;
boolean restarting = false;
for (var element : acDefinition.getElementStateMap().values()) {
- if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) {
- element.setMessage(participantPrimeAckMessage.getMessage());
- element.setState(participantPrimeAckMessage.getCompositionState());
- element.setRestarting(null);
- acDefinitionProvider.updateAcDefinitionElement(element, acDefinition.getCompositionId());
- }
+ handlePrimeAckElement(participantPrimeAckMessage, element);
if (!finalState.equals(element.getState())) {
completed = false;
}
@@ -110,6 +106,18 @@ public class SupervisionHandler {
if (toUpdate) {
acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(),
acDefinition.getStateChangeResult(), acDefinition.getRestarting());
+ if (!participantPrimeAckMessage.getParticipantId().equals(participantPrimeAckMessage.getReplicaId())) {
+ participantSyncPublisher.sendSync(acDefinition, participantPrimeAckMessage.getReplicaId());
+ }
+ }
+ }
+
+ private void handlePrimeAckElement(ParticipantPrimeAck participantPrimeAckMessage, NodeTemplateState element) {
+ if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) {
+ element.setMessage(participantPrimeAckMessage.getMessage());
+ element.setState(participantPrimeAckMessage.getCompositionState());
+ element.setRestarting(null);
+ acDefinitionProvider.updateAcDefinitionElement(element, participantPrimeAckMessage.getCompositionId());
}
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
index 609e036be..4c8c5815c 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
@@ -21,7 +21,6 @@
package org.onap.policy.clamp.acm.runtime.supervision;
import io.micrometer.core.annotation.Timed;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,11 +31,13 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.Participant;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
@@ -65,6 +66,7 @@ public class SupervisionParticipantHandler {
private final AutomationCompositionProvider automationCompositionProvider;
private final AcDefinitionProvider acDefinitionProvider;
private final ParticipantRestartPublisher participantRestartPublisher;
+ private final ParticipantSyncPublisher participantSyncPublisher;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
/**
@@ -72,21 +74,11 @@ public class SupervisionParticipantHandler {
*
* @param participantRegisterMsg the ParticipantRegister message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) {
- var participantOpt = participantProvider.findParticipant(participantRegisterMsg.getParticipantId());
-
- if (participantOpt.isPresent()) {
- var participant = participantOpt.get();
- checkOnline(participant);
- handleRestart(participant.getParticipantId());
- } else {
- var participant = createParticipant(participantRegisterMsg.getParticipantId(),
- listToMap(participantRegisterMsg.getParticipantSupportedElementType()));
- participantProvider.saveParticipant(participant);
-
- }
+ saveIfNotPresent(participantRegisterMsg.getReplicaId(),
+ participantRegisterMsg.getParticipantId(),
+ participantRegisterMsg.getParticipantSupportedElementType(), true);
participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(),
participantRegisterMsg.getParticipantId());
@@ -97,15 +89,13 @@ public class SupervisionParticipantHandler {
*
* @param participantDeregisterMsg the ParticipantDeregister message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received")
public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) {
- var participantOpt = participantProvider.findParticipant(participantDeregisterMsg.getParticipantId());
-
- if (participantOpt.isPresent()) {
- var participant = participantOpt.get();
- participant.setParticipantState(ParticipantState.OFF_LINE);
- participantProvider.saveParticipant(participant);
+ var replicaId = participantDeregisterMsg.getReplicaId() != null
+ ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId();
+ var replicaOpt = participantProvider.findParticipantReplica(replicaId);
+ if (replicaOpt.isPresent()) {
+ participantProvider.deleteParticipantReplica(replicaId);
}
participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId());
@@ -116,32 +106,57 @@ public class SupervisionParticipantHandler {
*
* @param participantStatusMsg the ParticipantStatus message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
+ saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
+ participantStatusMsg.getParticipantSupportedElementType(), false);
- var participantOpt = participantProvider.findParticipant(participantStatusMsg.getParticipantId());
- if (participantOpt.isEmpty()) {
- var participant = createParticipant(participantStatusMsg.getParticipantId(),
- listToMap(participantStatusMsg.getParticipantSupportedElementType()));
- participantProvider.saveParticipant(participant);
- } else {
- checkOnline(participantOpt.get());
- }
if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList());
}
if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
&& participantStatusMsg.getCompositionId() != null) {
updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(),
- participantStatusMsg.getParticipantDefinitionUpdates());
+ participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates());
}
}
- private void updateAcDefinitionOutProperties(UUID composotionId, List<ParticipantDefinition> list) {
- var acDefinitionOpt = acDefinitionProvider.findAcDefinition(composotionId);
+ private void saveIfNotPresent(UUID msgReplicaId, UUID participantId,
+ List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) {
+ var replicaId = msgReplicaId != null ? msgReplicaId : participantId;
+ var replicaOpt = participantProvider.findParticipantReplica(replicaId);
+ if (replicaOpt.isPresent()) {
+ var replica = replicaOpt.get();
+ checkOnline(replica);
+ } else {
+ var participant = getParticipant(participantId, listToMap(participantSupportedElementType));
+ participant.getReplicas().put(replicaId, createReplica(replicaId));
+ participantProvider.saveParticipant(participant);
+ }
+ if (registration) {
+ handleRestart(participantId, replicaId);
+ }
+ }
+
+ private Participant getParticipant(UUID participantId,
+ Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
+ var participantOpt = participantProvider.findParticipant(participantId);
+ return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType));
+ }
+
+ private ParticipantReplica createReplica(UUID replicaId) {
+ var replica = new ParticipantReplica();
+ replica.setReplicaId(replicaId);
+ replica.setParticipantState(ParticipantState.ON_LINE);
+ replica.setLastMsg(TimestampHelper.now());
+ return replica;
+
+ }
+
+ private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) {
+ var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId);
if (acDefinitionOpt.isEmpty()) {
- LOGGER.error("Ac Definition with id {} not found", composotionId);
+ LOGGER.error("Ac Definition with id {} not found", compositionId);
return;
}
var acDefinition = acDefinitionOpt.get();
@@ -155,26 +170,32 @@ public class SupervisionParticipantHandler {
}
acDefinitionProvider.updateAcDefinition(acDefinition,
acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
+ participantSyncPublisher.sendSync(acDefinition, replicaId);
}
- private void checkOnline(Participant participant) {
- if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) {
- participant.setParticipantState(ParticipantState.ON_LINE);
+ private void checkOnline(ParticipantReplica replica) {
+ if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
+ replica.setParticipantState(ParticipantState.ON_LINE);
}
- participant.setLastMsg(TimestampHelper.now());
- participantProvider.saveParticipant(participant);
+ replica.setLastMsg(TimestampHelper.now());
+ participantProvider.saveParticipantReplica(replica);
}
- private void handleRestart(UUID participantId) {
+ private void handleRestart(UUID participantId, UUID replicaId) {
var compositionIds = participantProvider.getCompositionIds(participantId);
+ var oldParticipant = participantId.equals(replicaId);
for (var compositionId : compositionIds) {
var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId());
- handleRestart(participantId, acDefinition);
+ if (oldParticipant) {
+ handleRestart(participantId, acDefinition);
+ } else {
+ handleSyncRestart(participantId, replicaId, acDefinition);
+ }
}
}
- private void handleRestart(UUID participantId, AutomationCompositionDefinition acDefinition) {
+ private void handleRestart(final UUID participantId, AutomationCompositionDefinition acDefinition) {
if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
return;
@@ -185,14 +206,6 @@ public class SupervisionParticipantHandler {
elementState.setRestarting(true);
}
}
- var automationCompositionList =
- automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
- List<AutomationComposition> automationCompositions = new ArrayList<>();
- for (var automationComposition : automationCompositionList) {
- if (isAcToBeRestarted(participantId, automationComposition)) {
- automationCompositions.add(automationComposition);
- }
- }
// expected final state
if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) {
acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR);
@@ -200,6 +213,11 @@ public class SupervisionParticipantHandler {
acDefinition.setRestarting(true);
acDefinitionProvider.updateAcDefinition(acDefinition,
acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
+
+ var automationCompositionList =
+ automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
+ var automationCompositions = automationCompositionList.stream()
+ .filter(ac -> isAcToBeRestarted(participantId, ac)).toList();
participantRestartPublisher.send(participantId, acDefinition, automationCompositions);
}
@@ -222,13 +240,34 @@ public class SupervisionParticipantHandler {
return toAdd;
}
+ private void handleSyncRestart(final UUID participantId, UUID replicaId,
+ AutomationCompositionDefinition acDefinition) {
+ if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
+ LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
+ return;
+ }
+ LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
+ var automationCompositionList =
+ automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
+ var automationCompositions = automationCompositionList.stream()
+ .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList();
+ participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions);
+ }
+
+ private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) {
+ for (var element : automationComposition.getElements().values()) {
+ if (participantId.equals(element.getParticipantId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private Participant createParticipant(UUID participantId,
Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
var participant = new Participant();
participant.setParticipantId(participantId);
participant.setParticipantSupportedElementTypes(participantSupportedElementType);
- participant.setParticipantState(ParticipantState.ON_LINE);
- participant.setLastMsg(TimestampHelper.now());
return participant;
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java
index b7f0be8eb..4ada199b6 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java
@@ -21,8 +21,7 @@
package org.onap.policy.clamp.acm.runtime.supervision;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.models.acm.concepts.Participant;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
import org.slf4j.Logger;
@@ -33,20 +32,20 @@ import org.springframework.stereotype.Component;
* This class is used to scan the automation compositions in the database and check if they are in the correct state.
*/
@Component
-public class SupervisionPartecipantScanner {
- private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionPartecipantScanner.class);
+public class SupervisionParticipantScanner {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantScanner.class);
private final long maxWaitMs;
private final ParticipantProvider participantProvider;
/**
- * Constructor for instantiating SupervisionPartecipantScanner.
+ * Constructor for instantiating SupervisionParticipantScanner.
*
* @param participantProvider the Participant Provider
* @param acRuntimeParameterGroup the parameters for the automation composition runtime
*/
- public SupervisionPartecipantScanner(final ParticipantProvider participantProvider,
+ public SupervisionParticipantScanner(final ParticipantProvider participantProvider,
final AcRuntimeParameterGroup acRuntimeParameterGroup) {
this.participantProvider = participantProvider;
this.maxWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs();
@@ -56,27 +55,17 @@ public class SupervisionPartecipantScanner {
* Run Scanning.
*/
public void run() {
- LOGGER.debug("Scanning participans in the database . . .");
-
- for (var participant : participantProvider.getParticipants()) {
- scanParticipantStatus(participant);
- }
-
- LOGGER.debug("Participans scan complete . . .");
+ LOGGER.debug("Scanning participants in the database . . .");
+ participantProvider.findReplicasOnLine().forEach(this::scanParticipantReplicaStatus);
+ LOGGER.debug("Participants scan complete . . .");
}
- private void scanParticipantStatus(Participant participant) {
- var id = participant.getParticipantId();
- if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) {
- LOGGER.debug("report Participant is still OFF_LINE {}", id);
- return;
- }
+ private void scanParticipantReplicaStatus(ParticipantReplica replica) {
var now = TimestampHelper.nowEpochMilli();
- var lastMsg = TimestampHelper.toEpochMilli(participant.getLastMsg());
+ var lastMsg = TimestampHelper.toEpochMilli(replica.getLastMsg());
if ((now - lastMsg) > maxWaitMs) {
- LOGGER.debug("report Participant OFF_LINE {}", id);
- participant.setParticipantState(ParticipantState.OFF_LINE);
- participantProvider.saveParticipant(participant);
+ LOGGER.debug("Participant OFF_LINE {}", replica.getReplicaId());
+ participantProvider.deleteParticipantReplica(replica.getReplicaId());
}
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
index 06d464671..75a2f0540 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -55,6 +56,7 @@ public class SupervisionScanner {
private final AcDefinitionProvider acDefinitionProvider;
private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher;
+ private final ParticipantSyncPublisher participantSyncPublisher;
/**
* Constructor for instantiating SupervisionScanner.
@@ -69,11 +71,13 @@ public class SupervisionScanner {
final AcDefinitionProvider acDefinitionProvider,
final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher,
final AutomationCompositionDeployPublisher automationCompositionDeployPublisher,
+ final ParticipantSyncPublisher participantSyncPublisher,
final AcRuntimeParameterGroup acRuntimeParameterGroup) {
this.automationCompositionProvider = automationCompositionProvider;
this.acDefinitionProvider = acDefinitionProvider;
this.automationCompositionStateChangePublisher = automationCompositionStateChangePublisher;
this.automationCompositionDeployPublisher = automationCompositionDeployPublisher;
+ this.participantSyncPublisher = participantSyncPublisher;
this.maxStatusWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs();
}
@@ -118,6 +122,7 @@ public class SupervisionScanner {
if (completed) {
acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), finalState,
StateChangeResult.NO_ERROR, null);
+ participantSyncPublisher.sendSync(acDefinition, null);
} else {
handleTimeout(acDefinition);
}
@@ -132,7 +137,6 @@ public class SupervisionScanner {
|| StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult())) {
LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId());
- // Clear Timeout on automation composition
return;
}
@@ -158,7 +162,7 @@ public class SupervisionScanner {
LOGGER.debug("automation composition scan: transition state {} {} completed",
automationComposition.getDeployState(), automationComposition.getLockState());
- complete(automationComposition);
+ complete(automationComposition, serviceTemplate);
} else {
LOGGER.debug("automation composition scan: transition state {} {} not completed",
automationComposition.getDeployState(), automationComposition.getLockState());
@@ -183,7 +187,8 @@ public class SupervisionScanner {
}
}
- private void complete(final AutomationComposition automationComposition) {
+ private void complete(final AutomationComposition automationComposition,
+ ToscaServiceTemplate serviceTemplate) {
var deployState = automationComposition.getDeployState();
if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
// migration scenario
@@ -201,6 +206,7 @@ public class SupervisionScanner {
} else {
automationCompositionProvider.updateAutomationComposition(automationComposition);
}
+ participantSyncPublisher.sendSync(serviceTemplate, automationComposition);
}
private void handleTimeout(AutomationCompositionDefinition acDefinition) {
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
index 89763a2b6..b0848bd51 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
@@ -55,7 +54,6 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantPrimePublisher.class);
private final ParticipantProvider participantProvider;
- private final AcmParticipantProvider acmParticipantProvider;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
/**
@@ -99,9 +97,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
elementState.setState(AcTypeState.PRIMING);
participantIds.add(elementState.getParticipantId());
- var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
- elementEntry.getValue().getTypeVersion());
- supportedElementMap.put(type, elementState.getParticipantId());
+ supportedElementMap.put(AcmUtils.getType(elementEntry.getValue()), elementState.getParticipantId());
}
} else {
// scenario Prime participants not assigned yet
@@ -109,16 +105,14 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
for (var elementEntry : acElements) {
var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
elementState.setState(AcTypeState.PRIMING);
- var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
- elementEntry.getValue().getTypeVersion());
- var participantId = supportedElementMap.get(type);
+ var participantId = supportedElementMap.get(AcmUtils.getType(elementEntry.getValue()));
if (participantId != null) {
elementState.setParticipantId(participantId);
participantIds.add(participantId);
}
}
}
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
return AcmUtils.prepareParticipantPriming(acElements, supportedElementMap);
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
index 4f28eab8e..3fe46a928 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
@@ -22,22 +22,15 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import io.micrometer.core.annotation.Timed;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.UUID;
import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -66,52 +59,18 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher<Pa
message.setMessageId(UUID.randomUUID());
message.setTimestamp(Instant.now());
message.setState(acmDefinition.getState());
- message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+ message.setParticipantDefinitionUpdates(
+ AcmUtils.prepareParticipantRestarting(participantId, acmDefinition,
+ acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
for (var automationComposition : automationCompositions) {
- var restartAc = new ParticipantRestartAc();
- restartAc.setAutomationCompositionId(automationComposition.getInstanceId());
- for (var element : automationComposition.getElements().values()) {
- if (participantId.equals(element.getParticipantId())) {
- var acElementRestart = AcmUtils.createAcElementRestart(element);
- acElementRestart.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
- restartAc.getAcElementList().add(acElementRestart);
- }
- }
+ var restartAc = AcmUtils
+ .createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment);
message.getAutomationcompositionList().add(restartAc);
}
LOGGER.debug("Participant Restart sent {}", message);
super.send(message);
}
-
- protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
- AutomationCompositionDefinition acmDefinition) {
- var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(),
- acRuntimeParameterGroup.getAcmParameters().getToscaElementName());
-
- // list of entry filtered by participantId
- List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>();
- Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
- for (var elementEntry : acElements) {
- var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
- if (participantId.equals(elementState.getParticipantId())) {
- var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
- elementEntry.getValue().getTypeVersion());
- supportedElementMap.put(type, participantId);
- elementList.add(elementEntry);
- }
- }
- var list = AcmUtils.prepareParticipantPriming(elementList, supportedElementMap);
- for (var participantDefinition : list) {
- for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) {
- var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName());
- if (state != null) {
- elementDe.setOutProperties(state.getOutProperties());
- }
- }
- }
- return list;
- }
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
index 76feee72f..2eb434b64 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
@@ -40,7 +40,7 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher<
*/
@Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published")
public void send(UUID participantId) {
- ParticipantStatusReq message = new ParticipantStatusReq();
+ var message = new ParticipantStatusReq();
message.setParticipantId(participantId);
message.setTimestamp(Instant.now());
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
index ae7eda1ee..b63bc0a6b 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
@@ -23,69 +23,58 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import io.micrometer.core.annotation.Timed;
import java.time.Instant;
import java.util.List;
-import java.util.Optional;
import java.util.UUID;
+import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-
@Component
-public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
+@AllArgsConstructor
+public class ParticipantSyncPublisher extends AbstractParticipantPublisher<ParticipantSync> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class);
-
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
- public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) {
- super(acRuntimeParameterGroup);
- this.acRuntimeParameterGroup = acRuntimeParameterGroup;
- }
-
-
/**
- * Send sync msg to Participant.
+ * Send Restart sync msg to Participant by participantId.
*
- * @param participantId the ParticipantId
+ * @param participantId the participantId
+ * @param replicaId the replicaId
* @param acmDefinition the AutomationComposition Definition
* @param automationCompositions the list of automationCompositions
*/
- @Override
@Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
- public void send(UUID participantId, AutomationCompositionDefinition acmDefinition,
+ public void sendRestartMsg(UUID participantId, UUID replicaId, AutomationCompositionDefinition acmDefinition,
List<AutomationComposition> automationCompositions) {
var message = new ParticipantSync();
message.setParticipantId(participantId);
+ message.setReplicaId(replicaId);
+ message.setRestarting(true);
message.setCompositionId(acmDefinition.getCompositionId());
message.setMessageId(UUID.randomUUID());
message.setTimestamp(Instant.now());
message.setState(acmDefinition.getState());
- message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+ message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(participantId, acmDefinition,
+ acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
for (var automationComposition : automationCompositions) {
- var syncAc = new ParticipantRestartAc();
- syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
- for (var element : automationComposition.getElements().values()) {
- if (participantId.equals(element.getParticipantId())) {
- var acElementSync = AcmUtils.createAcElementRestart(element);
- acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
- syncAc.getAcElementList().add(acElementSync);
- }
- }
+ var syncAc = AcmUtils.createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment);
message.getAutomationcompositionList().add(syncAc);
}
- LOGGER.debug("Participant Sync sent {}", message);
+ LOGGER.debug("Participant Restarting Sync sent {}", message);
super.send(message);
}
@@ -98,4 +87,64 @@ public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
return false;
}
+ /**
+ * Send AutomationCompositionDefinition sync msg to all Participants.
+ *
+ * @param acDefinition the AutomationComposition Definition
+ * @param excludeReplicaId the replica to be excluded
+ */
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void sendSync(AutomationCompositionDefinition acDefinition, UUID excludeReplicaId) {
+ var message = new ParticipantSync();
+ message.setCompositionId(acDefinition.getCompositionId());
+ if (excludeReplicaId != null) {
+ message.getExcludeReplicas().add(excludeReplicaId);
+ }
+ message.setState(acDefinition.getState());
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+ if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
+ message.setDelete(true);
+ } else {
+ message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition,
+ acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
+ }
+ LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message);
+ super.send(message);
+ }
+
+ /**
+ * Send AutomationComposition sync msg to all Participants.
+ *
+ * @param serviceTemplate the ServiceTemplate
+ * @param automationComposition the automationComposition
+ */
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void sendSync(ToscaServiceTemplate serviceTemplate, AutomationComposition automationComposition) {
+ var message = new ParticipantSync();
+ message.setCompositionId(automationComposition.getCompositionId());
+ message.setAutomationCompositionId(automationComposition.getInstanceId());
+ message.setState(AcTypeState.PRIMED);
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+ var syncAc = new ParticipantRestartAc();
+ syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+ syncAc.setDeployState(automationComposition.getDeployState());
+ syncAc.setLockState(automationComposition.getLockState());
+ if (DeployState.DELETED.equals(automationComposition.getDeployState())) {
+ message.setDelete(true);
+ } else {
+ var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(serviceTemplate);
+ for (var element : automationComposition.getElements().values()) {
+ var acElementSync = AcmUtils.createAcElementRestart(element);
+ acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
+ syncAc.getAcElementList().add(acElementSync);
+
+ }
+ }
+ message.getAutomationcompositionList().add(syncAc);
+
+ LOGGER.debug("Participant AutomationComposition Sync sent {}", message);
+ super.send(message);
+ }
}