summaryrefslogtreecommitdiffstats
path: root/runtime-acm/src/main/java
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2022-12-19 14:43:11 +0000
committerFrancescoFioraEst <francesco.fiora@est.tech>2022-12-20 10:35:32 +0000
commitc71863696d729286502f951a64c3d2193c6641e7 (patch)
tree024363ddde071c3f59d7468bdc9ddfd011d3701e /runtime-acm/src/main/java
parent66208f7d9ec65d859803ed347c3fdecd2d99022f (diff)
Add compositionId into Messages between ACM and Participants
Issue-ID: POLICY-4489 Change-Id: I1fb8c04eacee040d5c944c522ca59a2a9a50376b Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-acm/src/main/java')
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java4
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java29
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java113
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java101
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangePublisher.java1
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdatePublisher.java4
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java51
7 files changed, 118 insertions, 185 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
index 66a67a304..1d3b2992b 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
@@ -98,7 +98,7 @@ public class CommissioningProvider {
serviceTemplate = acmDefinition.getServiceTemplate();
var participantList = participantProvider.getParticipants();
if (!participantList.isEmpty()) {
- supervisionHandler.handleSendCommissionMessage(serviceTemplate.getName(), serviceTemplate.getVersion());
+ supervisionHandler.handleSendCommissionMessage(acmDefinition);
}
return createCommissioningResponse(acmDefinition.getCompositionId(), serviceTemplate);
}
@@ -145,7 +145,7 @@ public class CommissioningProvider {
}
var participantList = participantProvider.getParticipants();
if (!participantList.isEmpty()) {
- supervisionHandler.handleSendDeCommissionMessage();
+ supervisionHandler.handleSendDeCommissionMessage(compositionId);
}
var serviceTemplate = acDefinitionProvider.deleteAcDefintion(compositionId);
return createCommissioningResponse(compositionId, serviceTemplate);
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
index ea851da81..8d1f98388 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021-2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,14 +26,10 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
-import org.apache.commons.lang3.tuple.ImmutablePair;
import org.aspectj.lang.annotation.After;
-import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
-import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantRegister;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantStatus;
-import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantUpdateAck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
@@ -75,29 +71,6 @@ public class SupervisionAspect implements Closeable {
executor.execute(() -> supervisionScanner.handleParticipantStatus(participantStatusMessage.getParticipantId()));
}
- /**
- * Intercepts participant Register Message
- * if there is a Commissioning starts an execution of handleParticipantRegister.
- *
- * @param participantRegisterMessage the ParticipantRegister message
- * @param isCommissioning is Commissioning
- */
- @AfterReturning(
- value = "@annotation(MessageIntercept) && args(participantRegisterMessage,..)",
- returning = "isCommissioning")
- public void handleParticipantRegister(ParticipantRegister participantRegisterMessage, boolean isCommissioning) {
- if (isCommissioning) {
- executor.execute(() -> supervisionScanner.handleParticipantRegister(new ImmutablePair<>(
- participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType())));
- }
- }
-
- @Before("@annotation(MessageIntercept) && args(participantUpdateAckMessage,..)")
- public void handleParticipantUpdateAck(ParticipantUpdateAck participantUpdateAckMessage) {
- executor.execute(() -> supervisionScanner.handleParticipantUpdateAck(new ImmutablePair<>(
- participantUpdateAckMessage.getParticipantId(), participantUpdateAckMessage.getParticipantType())));
- }
-
@Override
public void close() throws IOException {
executor.shutdown();
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
index de67360f8..9a80508e5 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
@@ -34,6 +34,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAck
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionException;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementAck;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionState;
import org.onap.policy.clamp.models.acm.concepts.Participant;
@@ -67,9 +68,9 @@ public class SupervisionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class);
private static final String AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE =
- "Automation composition can't transition from state ";
+ "Automation composition can't transition from state ";
private static final String AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE =
- "Automation composition is already in state ";
+ "Automation composition is already in state ";
private static final String TO_STATE = " to state ";
private static final String AND_TRANSITIONING_TO_STATE = " and transitioning to state ";
@@ -107,7 +108,7 @@ public class SupervisionHandler {
*/
@MessageIntercept
@Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
- public boolean handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
+ public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
LOGGER.debug("Participant Register received {}", participantRegisterMessage);
try {
checkParticipant(participantRegisterMessage, ParticipantState.UNKNOWN, ParticipantHealthStatus.UNKNOWN);
@@ -115,12 +116,11 @@ public class SupervisionHandler {
LOGGER.warn("error saving participant {}", participantRegisterMessage.getParticipantId(), svExc);
}
- var isCommissioning = participantUpdatePublisher.sendCommissioning(null, null,
- participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType());
+ participantUpdatePublisher.sendCommissioning(participantRegisterMessage.getParticipantId(),
+ participantRegisterMessage.getParticipantType());
participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId(),
- participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType());
- return isCommissioning;
+ participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType());
}
/**
@@ -134,8 +134,8 @@ public class SupervisionHandler {
LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage);
try {
var participantOpt =
- participantProvider.findParticipant(participantDeregisterMessage.getParticipantId().getName(),
- participantDeregisterMessage.getParticipantId().getVersion());
+ participantProvider.findParticipant(participantDeregisterMessage.getParticipantId().getName(),
+ participantDeregisterMessage.getParticipantId().getVersion());
if (participantOpt.isPresent()) {
var participant = participantOpt.get();
@@ -145,7 +145,7 @@ public class SupervisionHandler {
}
} catch (PfModelException pfme) {
LOGGER.warn("Model exception occured with participant id {}",
- participantDeregisterMessage.getParticipantId());
+ participantDeregisterMessage.getParticipantId());
}
participantDeregisterAckPublisher.send(participantDeregisterMessage.getMessageId());
@@ -162,8 +162,8 @@ public class SupervisionHandler {
LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage);
try {
var participantOpt =
- participantProvider.findParticipant(participantUpdateAckMessage.getParticipantId().getName(),
- participantUpdateAckMessage.getParticipantId().getVersion());
+ participantProvider.findParticipant(participantUpdateAckMessage.getParticipantId().getName(),
+ participantUpdateAckMessage.getParticipantId().getVersion());
if (participantOpt.isPresent()) {
var participant = participantOpt.get();
@@ -174,29 +174,28 @@ public class SupervisionHandler {
}
} catch (PfModelException pfme) {
LOGGER.warn("Model exception occured with participant id {}",
- participantUpdateAckMessage.getParticipantId());
+ participantUpdateAckMessage.getParticipantId());
}
}
/**
* Send commissioning update message to dmaap.
*
- * @param name the ToscaServiceTemplate name
- * @param version the ToscaServiceTemplate version
+ * @param acmDefinition the AutomationComposition Definition
*/
- public void handleSendCommissionMessage(String name, String version) {
- LOGGER.debug("Participant update message with serviveTemplate {} {} being sent to all participants", name,
- version);
- participantUpdatePublisher.sendComissioningBroadcast(name, version);
+ public void handleSendCommissionMessage(AutomationCompositionDefinition acmDefinition) {
+ LOGGER.debug("Participant update message with serviveTemplate {} being sent to all participants",
+ acmDefinition.getCompositionId());
+ participantUpdatePublisher.sendComissioningBroadcast(acmDefinition);
}
/**
* Send decommissioning update message to dmaap.
*
*/
- public void handleSendDeCommissionMessage() {
- LOGGER.debug("Participant update message being sent");
- participantUpdatePublisher.sendDecomisioning();
+ public void handleSendDeCommissionMessage(UUID compositionId) {
+ LOGGER.debug("Participant update message being sent {}", compositionId);
+ participantUpdatePublisher.sendDecomisioning(compositionId);
}
/**
@@ -205,8 +204,9 @@ public class SupervisionHandler {
* @param automationCompositionAckMessage the AutomationCompositionAck message received from a participant
*/
@MessageIntercept
- @Timed(value = "listener.automation_composition_update_ack",
- description = "AUTOMATION_COMPOSITION_UPDATE_ACK messages received")
+ @Timed(
+ value = "listener.automation_composition_update_ack",
+ description = "AUTOMATION_COMPOSITION_UPDATE_ACK messages received")
public void handleAutomationCompositionUpdateAckMessage(AutomationCompositionAck automationCompositionAckMessage) {
LOGGER.debug("AutomationComposition Update Ack message received {}", automationCompositionAckMessage);
setAcElementStateInDb(automationCompositionAckMessage);
@@ -218,10 +218,11 @@ public class SupervisionHandler {
* @param automationCompositionAckMessage the AutomationCompositionAck message received from a participant
*/
@MessageIntercept
- @Timed(value = "listener.automation_composition_statechange_ack",
- description = "AUTOMATION_COMPOSITION_STATECHANGE_ACK messages received")
+ @Timed(
+ value = "listener.automation_composition_statechange_ack",
+ description = "AUTOMATION_COMPOSITION_STATECHANGE_ACK messages received")
public void handleAutomationCompositionStateChangeAckMessage(
- AutomationCompositionAck automationCompositionAckMessage) {
+ AutomationCompositionAck automationCompositionAckMessage) {
LOGGER.debug("AutomationComposition StateChange Ack message received {}", automationCompositionAckMessage);
setAcElementStateInDb(automationCompositionAckMessage);
}
@@ -245,7 +246,7 @@ public class SupervisionHandler {
}
private boolean updateState(AutomationComposition automationComposition,
- Set<Map.Entry<UUID, AutomationCompositionElementAck>> automationCompositionResultSet) {
+ Set<Map.Entry<UUID, AutomationCompositionElementAck>> automationCompositionResultSet) {
var updated = false;
for (var acElementAck : automationCompositionResultSet) {
var element = automationComposition.getElements().get(acElementAck.getKey());
@@ -262,9 +263,9 @@ public class SupervisionHandler {
if (acElements != null) {
Boolean primedFlag = true;
var checkOpt = automationComposition.getElements().values().stream()
- .filter(acElement -> (!acElement.getState().equals(AutomationCompositionState.PASSIVE)
- || !acElement.getState().equals(AutomationCompositionState.RUNNING)))
- .findAny();
+ .filter(acElement -> (!acElement.getState().equals(AutomationCompositionState.PASSIVE)
+ || !acElement.getState().equals(AutomationCompositionState.RUNNING)))
+ .findAny();
if (checkOpt.isEmpty()) {
primedFlag = false;
}
@@ -283,7 +284,7 @@ public class SupervisionHandler {
* @throws AutomationCompositionException on supervision errors
*/
public void triggerAutomationCompositionSupervision(AutomationComposition automationComposition)
- throws AutomationCompositionException {
+ throws AutomationCompositionException {
switch (automationComposition.getOrderedState()) {
case UNINITIALISED:
superviseAutomationCompositionUninitialization(automationComposition);
@@ -299,8 +300,8 @@ public class SupervisionHandler {
default:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
- "A automation composition cannot be commanded to go into state "
- + automationComposition.getOrderedState().name());
+ "A automation composition cannot be commanded to go into state "
+ + automationComposition.getOrderedState().name());
}
}
@@ -313,39 +314,39 @@ public class SupervisionHandler {
* @throws AutomationCompositionException on supervision errors
*/
private void superviseAutomationCompositionUninitialization(AutomationComposition automationComposition)
- throws AutomationCompositionException {
+ throws AutomationCompositionException {
switch (automationComposition.getState()) {
case UNINITIALISED:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
- AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name());
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name());
break;
case UNINITIALISED2PASSIVE:
case PASSIVE:
automationComposition.setState(AutomationCompositionState.PASSIVE2UNINITIALISED);
automationCompositionStateChangePublisher.send(automationComposition,
- getFirstStartPhase(automationComposition));
+ getFirstStartPhase(automationComposition));
break;
case PASSIVE2UNINITIALISED:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
- AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()
- + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState());
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()
+ + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState());
break;
default:
exceptionOccured(Response.Status.NOT_ACCEPTABLE, AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE
- + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState());
+ + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState());
break;
}
}
private void superviseAutomationCompositionPassivation(AutomationComposition automationComposition)
- throws AutomationCompositionException {
+ throws AutomationCompositionException {
switch (automationComposition.getState()) {
case PASSIVE:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
- AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name());
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name());
break;
case UNINITIALISED:
automationComposition.setState(AutomationCompositionState.UNINITIALISED2PASSIVE);
@@ -355,46 +356,46 @@ public class SupervisionHandler {
case UNINITIALISED2PASSIVE:
case RUNNING2PASSIVE:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
- AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()
- + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState());
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()
+ + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState());
break;
case RUNNING:
automationComposition.setState(AutomationCompositionState.RUNNING2PASSIVE);
automationCompositionStateChangePublisher.send(automationComposition,
- getFirstStartPhase(automationComposition));
+ getFirstStartPhase(automationComposition));
break;
default:
exceptionOccured(Response.Status.NOT_ACCEPTABLE, AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE
- + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState());
+ + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState());
break;
}
}
private void superviseAutomationCompositionActivation(AutomationComposition automationComposition)
- throws AutomationCompositionException {
+ throws AutomationCompositionException {
switch (automationComposition.getState()) {
case RUNNING:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
- AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name());
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name());
break;
case PASSIVE2RUNNING:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
- AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()
- + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState());
+ AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()
+ + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState());
break;
case PASSIVE:
automationComposition.setState(AutomationCompositionState.PASSIVE2RUNNING);
automationCompositionStateChangePublisher.send(automationComposition,
- getFirstStartPhase(automationComposition));
+ getFirstStartPhase(automationComposition));
break;
default:
exceptionOccured(Response.Status.NOT_ACCEPTABLE, AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE
- + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState());
+ + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState());
break;
}
}
@@ -405,12 +406,12 @@ public class SupervisionHandler {
}
private void checkParticipant(ParticipantMessage participantMessage, ParticipantState participantState,
- ParticipantHealthStatus healthStatus) throws AutomationCompositionException, PfModelException {
+ ParticipantHealthStatus healthStatus) throws AutomationCompositionException, PfModelException {
if (participantMessage.getParticipantId() == null) {
exceptionOccured(Response.Status.NOT_FOUND, "Participant ID on PARTICIPANT_STATUS message is null");
}
var participantOpt = participantProvider.findParticipant(participantMessage.getParticipantId().getName(),
- participantMessage.getParticipantId().getVersion());
+ participantMessage.getParticipantId().getVersion());
if (participantOpt.isEmpty()) {
var participant = new Participant();
@@ -432,10 +433,10 @@ public class SupervisionHandler {
}
private void superviseParticipant(ParticipantStatus participantStatusMessage)
- throws PfModelException, AutomationCompositionException {
+ throws PfModelException, AutomationCompositionException {
checkParticipant(participantStatusMessage, participantStatusMessage.getState(),
- participantStatusMessage.getHealthStatus());
+ participantStatusMessage.getHealthStatus());
}
private void exceptionOccured(Response.Status status, String reason) throws AutomationCompositionException {
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
index 129569b6b..a71f49c10 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
@@ -24,14 +24,12 @@ package org.onap.policy.clamp.acm.runtime.supervision;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.UUID;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionUpdatePublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPublisher;
-import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionState;
import org.onap.policy.clamp.models.acm.concepts.Participant;
import org.onap.policy.clamp.models.acm.concepts.ParticipantHealthStatus;
@@ -41,7 +39,6 @@ import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositi
import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,12 +51,10 @@ import org.springframework.stereotype.Component;
public class SupervisionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class);
- private final HandleCounter<ToscaConceptIdentifier> automationCompositionCounter = new HandleCounter<>();
+ private final HandleCounter<UUID> automationCompositionCounter = new HandleCounter<>();
private final HandleCounter<ToscaConceptIdentifier> participantStatusCounter = new HandleCounter<>();
- private final HandleCounter<Pair<ToscaConceptIdentifier, ToscaConceptIdentifier>> participantUpdateCounter =
- new HandleCounter<>();
- private final Map<ToscaConceptIdentifier, Integer> phaseMap = new HashMap<>();
+ private final Map<UUID, Integer> phaseMap = new HashMap<>();
private final AutomationCompositionProvider automationCompositionProvider;
private final AcDefinitionProvider acDefinitionProvider;
@@ -67,7 +62,6 @@ public class SupervisionScanner {
private final AutomationCompositionUpdatePublisher automationCompositionUpdatePublisher;
private final ParticipantProvider participantProvider;
private final ParticipantStatusReqPublisher participantStatusReqPublisher;
- private final ParticipantUpdatePublisher participantUpdatePublisher;
/**
* Constructor for instantiating SupervisionScanner.
@@ -78,7 +72,6 @@ public class SupervisionScanner {
* @param automationCompositionUpdatePublisher the AutomationCompositionUpdate Publisher
* @param participantProvider the Participant Provider
* @param participantStatusReqPublisher the Participant StatusReq Publisher
- * @param participantUpdatePublisher the Participant Update Publisher
* @param acRuntimeParameterGroup the parameters for the automation composition runtime
*/
public SupervisionScanner(final AutomationCompositionProvider automationCompositionProvider,
@@ -86,7 +79,6 @@ public class SupervisionScanner {
final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher,
AutomationCompositionUpdatePublisher automationCompositionUpdatePublisher,
ParticipantProvider participantProvider, ParticipantStatusReqPublisher participantStatusReqPublisher,
- ParticipantUpdatePublisher participantUpdatePublisher,
final AcRuntimeParameterGroup acRuntimeParameterGroup) {
this.automationCompositionProvider = automationCompositionProvider;
this.acDefinitionProvider = acDefinitionProvider;
@@ -94,18 +86,12 @@ public class SupervisionScanner {
this.automationCompositionUpdatePublisher = automationCompositionUpdatePublisher;
this.participantProvider = participantProvider;
this.participantStatusReqPublisher = participantStatusReqPublisher;
- this.participantUpdatePublisher = participantUpdatePublisher;
automationCompositionCounter.setMaxRetryCount(
acRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
automationCompositionCounter
.setMaxWaitMs(acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs());
- participantUpdateCounter.setMaxRetryCount(
- acRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
- participantUpdateCounter
- .setMaxWaitMs(acRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
-
participantStatusCounter.setMaxRetryCount(
acRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
participantStatusCounter.setMaxWaitMs(acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs());
@@ -130,48 +116,17 @@ public class SupervisionScanner {
}
}
- try {
- var list = acDefinitionProvider.getAllAcDefinitions();
- for (var acDefinition : list) {
- var acList =
- automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
- for (var automationComposition : acList) {
- scanAutomationComposition(automationComposition, acDefinition.getServiceTemplate(), counterCheck);
- }
+ var list = acDefinitionProvider.getAllAcDefinitions();
+ for (var acDefinition : list) {
+ var acList = automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
+ for (var automationComposition : acList) {
+ scanAutomationComposition(automationComposition, acDefinition.getServiceTemplate(), counterCheck);
}
- } catch (PfModelException pfme) {
- LOGGER.warn("error reading automation compositions from database", pfme);
- }
-
- if (counterCheck) {
- scanParticipantUpdate();
}
LOGGER.debug("Automation composition scan complete . . .");
}
- private void scanParticipantUpdate() {
- LOGGER.debug("Scanning participants to update . . .");
-
- for (var id : participantUpdateCounter.keySet()) {
- if (participantUpdateCounter.isFault(id)) {
- LOGGER.debug("report Participant Update fault");
-
- } else if (participantUpdateCounter.getDuration(id) > participantUpdateCounter.getMaxWaitMs()) {
-
- if (participantUpdateCounter.count(id)) {
- LOGGER.debug("retry message ParticipantUpdate");
- participantUpdatePublisher.sendCommissioning(null, null, id.getLeft(), id.getRight());
- } else {
- LOGGER.debug("report Participant Update fault");
- participantUpdateCounter.setFault(id);
- }
- }
- }
-
- LOGGER.debug("Participants to update scan complete . . .");
- }
-
private void scanParticipantStatus(Participant participant) throws PfModelException {
ToscaConceptIdentifier id = participant.getKey().asIdentifier();
if (participantStatusCounter.isFault(id)) {
@@ -199,20 +154,12 @@ public class SupervisionScanner {
participantStatusCounter.clear(id);
}
- public void handleParticipantRegister(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
- participantUpdateCounter.clear(id);
- }
-
- public void handleParticipantUpdateAck(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
- participantUpdateCounter.remove(id);
- }
-
private void scanAutomationComposition(final AutomationComposition automationComposition,
- ToscaServiceTemplate toscaServiceTemplate, boolean counterCheck) throws PfModelException {
- LOGGER.debug("scanning automation composition {} . . .", automationComposition.getKey().asIdentifier());
+ ToscaServiceTemplate toscaServiceTemplate, boolean counterCheck) {
+ LOGGER.debug("scanning automation composition {} . . .", automationComposition.getInstanceId());
if (automationComposition.getState().equals(automationComposition.getOrderedState().asState())) {
- LOGGER.debug("automation composition {} scanned, OK", automationComposition.getKey().asIdentifier());
+ LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId());
// Clear missed report counter on automation composition
clearFaultAndCounter(automationComposition);
@@ -224,8 +171,8 @@ public class SupervisionScanner {
var maxSpNotCompleted = 0; // max startPhase not completed
var defaultMin = 1000; // min startPhase
var defaultMax = 0; // max startPhase
- for (AutomationCompositionElement element : automationComposition.getElements().values()) {
- ToscaNodeTemplate toscaNodeTemplate = toscaServiceTemplate.getToscaTopologyTemplate().getNodeTemplates()
+ for (var element : automationComposition.getElements().values()) {
+ var toscaNodeTemplate = toscaServiceTemplate.getToscaTopologyTemplate().getNodeTemplates()
.get(element.getDefinition().getName());
int startPhase = ParticipantUtils.findStartPhase(toscaNodeTemplate.getProperties());
defaultMin = Math.min(defaultMin, startPhase);
@@ -262,36 +209,36 @@ public class SupervisionScanner {
? defaultMin
: defaultMax;
- if (nextSpNotCompleted != phaseMap.getOrDefault(automationComposition.getKey().asIdentifier(),
+ if (nextSpNotCompleted != phaseMap.getOrDefault(automationComposition.getInstanceId(),
firstStartPhase)) {
- phaseMap.put(automationComposition.getKey().asIdentifier(), nextSpNotCompleted);
+ phaseMap.put(automationComposition.getInstanceId(), nextSpNotCompleted);
sendAutomationCompositionMsg(automationComposition, nextSpNotCompleted);
} else if (counterCheck) {
- phaseMap.put(automationComposition.getKey().asIdentifier(), nextSpNotCompleted);
+ phaseMap.put(automationComposition.getInstanceId(), nextSpNotCompleted);
handleCounter(automationComposition, nextSpNotCompleted);
}
}
}
private void clearFaultAndCounter(AutomationComposition automationComposition) {
- automationCompositionCounter.clear(automationComposition.getKey().asIdentifier());
- phaseMap.clear();
+ automationCompositionCounter.clear(automationComposition.getInstanceId());
+ phaseMap.remove(automationComposition.getInstanceId());
}
private void handleCounter(AutomationComposition automationComposition, int startPhase) {
- ToscaConceptIdentifier id = automationComposition.getKey().asIdentifier();
- if (automationCompositionCounter.isFault(id)) {
+ var instanceId = automationComposition.getInstanceId();
+ if (automationCompositionCounter.isFault(instanceId)) {
LOGGER.debug("report AutomationComposition fault");
return;
}
- if (automationCompositionCounter.getDuration(id) > automationCompositionCounter.getMaxWaitMs()) {
- if (automationCompositionCounter.count(id)) {
- phaseMap.put(id, startPhase);
+ if (automationCompositionCounter.getDuration(instanceId) > automationCompositionCounter.getMaxWaitMs()) {
+ if (automationCompositionCounter.count(instanceId)) {
+ phaseMap.put(instanceId, startPhase);
sendAutomationCompositionMsg(automationComposition, startPhase);
} else {
LOGGER.debug("report AutomationComposition fault");
- automationCompositionCounter.setFault(id);
+ automationCompositionCounter.setFault(instanceId);
}
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangePublisher.java
index 4c3cc958b..8d6378e3f 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangePublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangePublisher.java
@@ -43,6 +43,7 @@ public class AutomationCompositionStateChangePublisher
description = "AUTOMATION_COMPOSITION_STATE_CHANGE messages published")
public void send(AutomationComposition automationComposition, int startPhase) {
var acsc = new AutomationCompositionStateChange();
+ acsc.setCompositionId(automationComposition.getCompositionId());
acsc.setAutomationCompositionId(automationComposition.getInstanceId());
acsc.setMessageId(UUID.randomUUID());
acsc.setOrderedState(automationComposition.getOrderedState());
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdatePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdatePublisher.java
index c10e47bb4..7b114c920 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdatePublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdatePublisher.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.UUID;
import lombok.AllArgsConstructor;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
import org.onap.policy.clamp.models.acm.concepts.ParticipantUpdates;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionUpdate;
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
@@ -69,6 +68,7 @@ public class AutomationCompositionUpdatePublisher extends AbstractParticipantPub
description = "AUTOMATION_COMPOSITION_UPDATE messages published")
public void send(AutomationComposition automationComposition, int startPhase) {
var automationCompositionUpdateMsg = new AutomationCompositionUpdate();
+ automationCompositionUpdateMsg.setCompositionId(automationComposition.getCompositionId());
automationCompositionUpdateMsg.setStartPhase(startPhase);
automationCompositionUpdateMsg.setAutomationCompositionId(automationComposition.getInstanceId());
automationCompositionUpdateMsg.setMessageId(UUID.randomUUID());
@@ -76,7 +76,7 @@ public class AutomationCompositionUpdatePublisher extends AbstractParticipantPub
var toscaServiceTemplate = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
List<ParticipantUpdates> participantUpdates = new ArrayList<>();
- for (AutomationCompositionElement element : automationComposition.getElements().values()) {
+ for (var element : automationComposition.getElements().values()) {
AcmUtils.setAcPolicyInfo(element, toscaServiceTemplate);
AcmUtils.prepareParticipantUpdate(element, participantUpdates);
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java
index fa5e423d7..af9023815 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java
@@ -26,7 +26,9 @@ import io.micrometer.core.annotation.Timed;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import lombok.AllArgsConstructor;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantUpdate;
@@ -51,47 +53,56 @@ public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<Par
/**
* Send ParticipantUpdate to all Participants.
*
- * @param name the ToscaServiceTemplate name
- * @param version the ToscaServiceTemplate version
+ * @param acmDefinition the AutomationComposition Definition
*/
@Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published")
- public void sendComissioningBroadcast(String name, String version) {
- sendCommissioning(name, version, null, null);
+ public void sendComissioningBroadcast(AutomationCompositionDefinition acmDefinition) {
+ sendCommissioning(acmDefinition, null, null);
}
/**
* Send ParticipantUpdate to Participant
* if participantType and participantId are null then message is broadcast.
*
- * @param name the ToscaServiceTemplate name
- * @param version the ToscaServiceTemplate version
* @param participantType the ParticipantType
* @param participantId the ParticipantId
*/
@Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published")
- public boolean sendCommissioning(String name, String version, ToscaConceptIdentifier participantType,
- ToscaConceptIdentifier participantId) {
+ public void sendCommissioning(ToscaConceptIdentifier participantType, ToscaConceptIdentifier participantId) {
+ var list = acDefinitionProvider.getAllAcDefinitions();
+ if (list.isEmpty()) {
+ LOGGER.warn("No tosca service template found, cannot send participantupdate");
+ }
+ for (var acmDefinition : list) {
+ sendCommissioning(acmDefinition, participantType, participantId);
+ }
+ }
+
+ /**
+ * Send ParticipantUpdate to Participant
+ * if participantType and participantId are null then message is broadcast.
+ *
+ * @param acmDefinition the AutomationComposition Definition
+ * @param participantType the ParticipantType
+ * @param participantId the ParticipantId
+ */
+ @Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published")
+ public void sendCommissioning(AutomationCompositionDefinition acmDefinition,
+ ToscaConceptIdentifier participantType, ToscaConceptIdentifier participantId) {
var message = new ParticipantUpdate();
+ message.setCompositionId(acmDefinition.getCompositionId());
message.setParticipantType(participantType);
message.setParticipantId(participantId);
message.setTimestamp(Instant.now());
- var list = acDefinitionProvider.getServiceTemplateList(name, version);
- if (list.isEmpty()) {
- LOGGER.warn("No tosca service template found, cannot send participantupdate {} {}", name, version);
- return false;
- }
- var toscaServiceTemplate = list.get(0);
- var commonPropertiesMap = AcmUtils.getCommonOrInstancePropertiesFromNodeTypes(true, toscaServiceTemplate);
-
+ var toscaServiceTemplate = acmDefinition.getServiceTemplate();
List<ParticipantDefinition> participantDefinitionUpdates = new ArrayList<>();
for (var toscaInputEntry : toscaServiceTemplate.getToscaTopologyTemplate().getNodeTemplates().entrySet()) {
if (ParticipantUtils.checkIfNodeTemplateIsAutomationCompositionElement(toscaInputEntry.getValue(),
toscaServiceTemplate)) {
AcmUtils.prepareParticipantDefinitionUpdate(
ParticipantUtils.findParticipantType(toscaInputEntry.getValue().getProperties()),
- toscaInputEntry.getKey(), toscaInputEntry.getValue(), participantDefinitionUpdates,
- commonPropertiesMap);
+ toscaInputEntry.getKey(), toscaInputEntry.getValue(), participantDefinitionUpdates);
}
}
@@ -99,15 +110,15 @@ public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<Par
message.setParticipantDefinitionUpdates(participantDefinitionUpdates);
LOGGER.debug("Participant Update sent {}", message);
super.send(message);
- return true;
}
/**
* Send ParticipantUpdate to Participant after that commissioning has been removed.
*/
@Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published")
- public void sendDecomisioning() {
+ public void sendDecomisioning(UUID compositionId) {
var message = new ParticipantUpdate();
+ message.setCompositionId(compositionId);
message.setTimestamp(Instant.now());
// DeCommission the automation composition but deleting participantdefinitions on participants
message.setParticipantDefinitionUpdates(null);