aboutsummaryrefslogtreecommitdiffstats
path: root/runtime-acm/src/main/java
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2023-01-26 17:31:46 +0000
committerLiam Fallon <liam.fallon@est.tech>2023-01-30 16:39:27 +0000
commit934f7bd443225a6945b0542fa5cb7c043deac426 (patch)
tree55d490b816af0a50f521ee777f82757f28f9a16e /runtime-acm/src/main/java
parenta178851e9f2e148c17d81c29a9310644e0330b9a (diff)
Refactor Prime and Deprime messages in ACM
Issue-ID: POLICY-4502 Change-Id: Ib0ecc513285bf971a0c25cec528dcdeec5ad63a2 Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-acm/src/main/java')
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java68
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java7
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/CommissioningController.java4
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java50
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java91
5 files changed, 131 insertions, 89 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
index 450c75564..c273a627a 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
@@ -25,13 +25,14 @@ import java.util.UUID;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response.Status;
import lombok.RequiredArgsConstructor;
-import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.messages.rest.commissioning.AcTypeStateUpdate;
import org.onap.policy.clamp.models.acm.messages.rest.commissioning.CommissioningResponse;
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
-import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.base.PfModelRuntimeException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
@@ -51,8 +52,8 @@ public class CommissioningProvider {
private final AcDefinitionProvider acDefinitionProvider;
private final AutomationCompositionProvider acProvider;
- private final SupervisionHandler supervisionHandler;
- private final ParticipantProvider participantProvider;
+ private final AcTypeStateResolver acTypeStateResolver;
+ private final ParticipantUpdatePublisher participantUpdatePublisher;
private CommissioningResponse createCommissioningResponse(UUID compositionId,
ToscaServiceTemplate serviceTemplate) {
@@ -80,10 +81,6 @@ public class CommissioningProvider {
var acmDefinition = acDefinitionProvider.createAutomationCompositionDefinition(serviceTemplate);
serviceTemplate = acmDefinition.getServiceTemplate();
- var participantList = participantProvider.getParticipants();
- if (!participantList.isEmpty()) {
- supervisionHandler.handleSendCommissionMessage(acmDefinition);
- }
return createCommissioningResponse(acmDefinition.getCompositionId(), serviceTemplate);
}
@@ -100,7 +97,7 @@ public class CommissioningProvider {
"There are ACM instances, Update of ACM Definition not allowed");
}
var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
- if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
+ if (!AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
throw new PfModelRuntimeException(Status.BAD_REQUEST,
"ACM not in COMMISSIONED state, Update of ACM Definition not allowed");
}
@@ -116,14 +113,14 @@ public class CommissioningProvider {
* @return the result of the deletion
*/
public CommissioningResponse deleteAutomationCompositionDefinition(UUID compositionId) {
-
if (verifyIfInstanceExists(compositionId)) {
throw new PfModelRuntimeException(Status.BAD_REQUEST,
"Delete instances, to commission automation composition definitions");
}
- var participantList = participantProvider.getParticipants();
- if (!participantList.isEmpty()) {
- supervisionHandler.handleSendDeCommissionMessage(compositionId);
+ var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
+ if (!AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
+ "ACM not in COMMISSIONED state, Update of ACM Definition not allowed");
}
var serviceTemplate = acDefinitionProvider.deleteAcDefintion(compositionId);
return createCommissioningResponse(compositionId, serviceTemplate);
@@ -165,4 +162,49 @@ public class CommissioningProvider {
private boolean verifyIfInstanceExists(UUID compositionId) {
return !acProvider.getAcInstancesByCompositionId(compositionId).isEmpty();
}
+
+ /**
+ * Composition Definition Priming.
+ *
+ * @param compositionId the compositionId
+ * @param acTypeStateUpdate the ACMTypeStateUpdate
+ */
+ public void compositionDefinitionPriming(UUID compositionId, AcTypeStateUpdate acTypeStateUpdate) {
+ if (verifyIfInstanceExists(compositionId)) {
+ throw new PfModelRuntimeException(Status.BAD_REQUEST, "There are instances, Priming/Depriming not allowed");
+ }
+ var acmDefinition = acDefinitionProvider.getAcDefinition(compositionId);
+ var stateOrdered = acTypeStateResolver.resolve(acTypeStateUpdate.getPrimeOrder(), acmDefinition.getState());
+ switch (stateOrdered) {
+ case PRIME:
+ prime(acmDefinition);
+ break;
+
+ case DEPRIME:
+ deprime(acmDefinition);
+
+ break;
+
+ default:
+ throw new PfModelRuntimeException(Status.BAD_REQUEST, "Not valid " + acTypeStateUpdate.getPrimeOrder());
+ }
+ }
+
+ private void prime(AutomationCompositionDefinition acmDefinition) {
+ var prearation = participantUpdatePublisher.prepareParticipantPriming(acmDefinition);
+ acDefinitionProvider.updateAcDefinition(acmDefinition);
+ participantUpdatePublisher.sendPriming(prearation, acmDefinition.getCompositionId(), null);
+ }
+
+ private void deprime(AutomationCompositionDefinition acmDefinition) {
+ if (!AcTypeState.COMMISSIONED.equals(acmDefinition.getState())) {
+ for (var elementState : acmDefinition.getElementStateMap().values()) {
+ elementState.setState(AcTypeState.DEPRIMING);
+ }
+ acmDefinition.setState(AcTypeState.DEPRIMING);
+ acDefinitionProvider.updateAcDefinition(acmDefinition);
+ }
+ participantUpdatePublisher.sendDepriming(acmDefinition.getCompositionId());
+ }
+
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
index b7e7644ef..4949c6612 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
@@ -138,12 +138,13 @@ public class AutomationCompositionInstantiationProvider {
private BeanValidationResult validateAutomationComposition(AutomationComposition automationComposition) {
var result = new BeanValidationResult("AutomationComposition", automationComposition);
- var serviceTemplate = acDefinitionProvider.findAcDefinition(automationComposition.getCompositionId());
- if (serviceTemplate.isEmpty()) {
+ var acDefinitionOpt = acDefinitionProvider.findAcDefinition(automationComposition.getCompositionId());
+ if (acDefinitionOpt.isEmpty()) {
result.addResult(new ObjectValidationResult("ServiceTemplate", "", ValidationStatus.INVALID,
"Commissioned automation composition definition not found"));
} else {
- result.addResult(AcmUtils.validateAutomationComposition(automationComposition, serviceTemplate.get()));
+ result.addResult(AcmUtils.validateAutomationComposition(automationComposition,
+ acDefinitionOpt.get().getServiceTemplate()));
}
return result;
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/CommissioningController.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/CommissioningController.java
index 6d7ae7d4d..4799880db 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/CommissioningController.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/CommissioningController.java
@@ -100,7 +100,7 @@ public class CommissioningController extends AbstractRestController implements A
@Override
public ResponseEntity<Void> compositionDefinitionPriming(UUID compositionId, UUID requestId,
@Valid AcTypeStateUpdate body) {
- // TODO Auto-generated method stub
- return null;
+ provider.compositionDefinitionPriming(compositionId, body);
+ return ResponseEntity.accepted().build();
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
index 2c5d48717..2542bdb15 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
@@ -29,10 +29,9 @@ import javax.ws.rs.core.Response;
import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionUpdatePublisher;
-import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionException;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementAck;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
@@ -69,27 +68,6 @@ public class SupervisionHandler {
// Publishers for participant communication
private final AutomationCompositionUpdatePublisher automationCompositionUpdatePublisher;
private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
- private final ParticipantUpdatePublisher participantUpdatePublisher;
-
- /**
- * Send commissioning update message to dmaap.
- *
- * @param acmDefinition the AutomationComposition Definition
- */
- public void handleSendCommissionMessage(AutomationCompositionDefinition acmDefinition) {
- LOGGER.debug("Participant update message with serviveTemplate {} being sent to all participants",
- acmDefinition.getCompositionId());
- participantUpdatePublisher.sendComissioningBroadcast(acmDefinition);
- }
-
- /**
- * Send decommissioning update message to dmaap.
- *
- */
- public void handleSendDeCommissionMessage(UUID compositionId) {
- LOGGER.debug("Participant update message being sent {}", compositionId);
- participantUpdatePublisher.sendDecomisioning(compositionId);
- }
/**
* Handle a AutomationComposition update acknowledge message from a participant.
@@ -110,10 +88,34 @@ public class SupervisionHandler {
*
* @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_update_ack", description = "PARTICIPANT_UPDATE_ACK messages received")
public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) {
LOGGER.debug("Participant Update Ack message received {}", participantUpdateAckMessage);
+ var acDefinitionOpt = acDefinitionProvider.findAcDefinition(participantUpdateAckMessage.getCompositionId());
+ if (acDefinitionOpt.isEmpty()) {
+ LOGGER.warn("AC Definition not found in database {}", participantUpdateAckMessage.getCompositionId());
+ return;
+ }
+ var acDefinition = acDefinitionOpt.get();
+ if (!AcTypeState.PRIMING.equals(acDefinition.getState())
+ && !AcTypeState.DEPRIMING.equals(acDefinition.getState())) {
+ LOGGER.warn("AC Definition {} already primed/deprimed with participant {}",
+ participantUpdateAckMessage.getCompositionId(), participantUpdateAckMessage.getParticipantId());
+ return;
+ }
+ var state = AcTypeState.PRIMING.equals(acDefinition.getState()) ? AcTypeState.PRIMED : AcTypeState.COMMISSIONED;
+ boolean completed = true;
+ for (var element : acDefinition.getElementStateMap().values()) {
+ if (participantUpdateAckMessage.getParticipantId().equals(element.getParticipantId())) {
+ element.setState(state);
+ } else if (!state.equals(element.getState())) {
+ completed = false;
+ }
+ }
+ if (completed) {
+ acDefinition.setState(state);
+ }
+ acDefinitionProvider.updateAcDefinition(acDefinition);
}
/**
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java
index e4bfedd9d..1915f1b0d 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java
@@ -24,16 +24,18 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import io.micrometer.core.annotation.Timed;
import java.time.Instant;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantUpdate;
-import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -47,72 +49,67 @@ public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<Par
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantUpdatePublisher.class);
- private final AcDefinitionProvider acDefinitionProvider;
-
- /**
- * Send ParticipantUpdate to all Participants.
- *
- * @param acmDefinition the AutomationComposition Definition
- */
- @Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published")
- public void sendComissioningBroadcast(AutomationCompositionDefinition acmDefinition) {
- sendCommissioning(acmDefinition, null);
- }
+ private final ParticipantProvider participantProvider;
/**
* Send ParticipantUpdate to Participant
* if participantId is null then message is broadcast.
*
+ * @param participantDefinitions the list of ParticipantDefinition to send
+ * @param compositionId the compositionId
* @param participantId the ParticipantId
*/
@Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published")
- public void sendCommissioning(UUID participantId) {
- var list = acDefinitionProvider.getAllAcDefinitions();
- if (list.isEmpty()) {
- LOGGER.warn("No tosca service template found, cannot send participantupdate");
- }
- for (var acmDefinition : list) {
- sendCommissioning(acmDefinition, participantId);
- }
+ public void sendPriming(List<ParticipantDefinition> participantDefinitions, UUID compositionId,
+ UUID participantId) {
+ var message = new ParticipantUpdate();
+ message.setCompositionId(compositionId);
+ message.setParticipantId(participantId);
+ message.setTimestamp(Instant.now());
+ message.setParticipantDefinitionUpdates(participantDefinitions);
+ LOGGER.debug("Participant Update sent {}", message);
+ super.send(message);
}
/**
- * Send ParticipantUpdate to Participant
- * if participantId is null then message is broadcast.
+ * Pepare the Priming message creating the list of ParticipantDefinition to send
+ * and fill the ElementState map of the AC Definition.
*
* @param acmDefinition the AutomationComposition Definition
- * @param participantId the ParticipantId
+ * @return list of ParticipantDefinition
*/
- @Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published")
- public void sendCommissioning(AutomationCompositionDefinition acmDefinition,
- UUID participantId) {
- var message = new ParticipantUpdate();
- message.setCompositionId(acmDefinition.getCompositionId());
- message.setParticipantId(participantId);
- message.setTimestamp(Instant.now());
-
- var toscaServiceTemplate = acmDefinition.getServiceTemplate();
- List<ParticipantDefinition> participantDefinitionUpdates = new ArrayList<>();
- for (var toscaInputEntry : toscaServiceTemplate.getToscaTopologyTemplate().getNodeTemplates().entrySet()) {
- if (ParticipantUtils.checkIfNodeTemplateIsAutomationCompositionElement(toscaInputEntry.getValue(),
- toscaServiceTemplate)) {
- AcmUtils.prepareParticipantDefinitionUpdate(
- participantId,
- toscaInputEntry.getKey(), toscaInputEntry.getValue(), participantDefinitionUpdates);
+ public List<ParticipantDefinition> prepareParticipantPriming(AutomationCompositionDefinition acmDefinition) {
+ acmDefinition.setState(AcTypeState.PRIMING);
+ var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate());
+ Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
+ if (AcTypeState.PRIMED.equals(acmDefinition.getState())) {
+ // scenario Prime again, participants already assigned
+ for (var elementEntry : acElements) {
+ var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
+ elementState.setState(AcTypeState.PRIMING);
+ var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
+ elementEntry.getValue().getTypeVersion());
+ supportedElementMap.put(type, elementState.getParticipantId());
+ }
+ } else {
+ // scenario Prime participants not assigned yet
+ supportedElementMap = participantProvider.getSupportedElementMap();
+ for (var elementEntry : acElements) {
+ var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
+ elementState.setState(AcTypeState.PRIMING);
+ var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
+ elementEntry.getValue().getTypeVersion());
+ elementState.setParticipantId(supportedElementMap.get(type));
}
}
-
- // Commission the automation composition but sending participantdefinitions to participants
- message.setParticipantDefinitionUpdates(participantDefinitionUpdates);
- LOGGER.debug("Participant Update sent {}", message);
- super.send(message);
+ return AcmUtils.prepareParticipantPriming(acElements, supportedElementMap);
}
/**
* Send ParticipantUpdate to Participant after that commissioning has been removed.
*/
@Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published")
- public void sendDecomisioning(UUID compositionId) {
+ public void sendDepriming(UUID compositionId) {
var message = new ParticipantUpdate();
message.setCompositionId(compositionId);
message.setTimestamp(Instant.now());