aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2024-09-25 17:04:00 +0100
committerFrancesco Fiora <francesco.fiora@est.tech>2024-09-30 09:43:38 +0000
commit15302061ba7e9572ed4ea8c3aa0255d01ef07310 (patch)
tree94e97d4daafdfd7ed063306377270a5a5017ce6c
parent6ed514f31be046cf31caeada994e7b9c036b5f90 (diff)
Handle timeout as hard timeout
Issue-ID: POLICY-5132 Change-Id: Ic78d346972e955f7700118ffa74be11643a885f0 Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
-rw-r--r--models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java2
-rw-r--r--models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java3
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java25
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java1
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java8
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java23
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java17
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java2
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java23
9 files changed, 85 insertions, 19 deletions
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java
index 3312752fa..5d4b8ac77 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java
@@ -39,6 +39,7 @@ public class ParticipantRestartAc {
private DeployState deployState;
private LockState lockState;
+ private StateChangeResult stateChangeResult;
private List<AcElementRestart> acElementList = new ArrayList<>();
@@ -51,6 +52,7 @@ public class ParticipantRestartAc {
this.automationCompositionId = copyConstructor.automationCompositionId;
this.deployState = copyConstructor.deployState;
this.lockState = copyConstructor.lockState;
+ this.stateChangeResult = copyConstructor.stateChangeResult;
this.acElementList = PfUtils.mapList(copyConstructor.acElementList, AcElementRestart::new);
}
}
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java
index 85b715b86..2780a5b1e 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java
@@ -31,6 +31,7 @@ import lombok.ToString;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.models.base.PfUtils;
@Getter
@@ -50,6 +51,7 @@ public class ParticipantSync extends ParticipantMessage {
private Set<UUID> excludeReplicas = new HashSet<>();
private boolean restarting = false;
private boolean delete = false;
+ private StateChangeResult stateChangeResult;
/**
* Constructor.
@@ -72,5 +74,6 @@ public class ParticipantSync extends ParticipantMessage {
this.excludeReplicas = new HashSet<>(source.excludeReplicas);
this.restarting = source.restarting;
this.delete = source.delete;
+ this.stateChangeResult = source.getStateChangeResult();
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java
index b38df515a..9e3efce57 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java
@@ -114,16 +114,15 @@ public class AcDefinitionHandler {
public void handleParticipantSync(ParticipantSync participantSyncMsg) {
if (participantSyncMsg.isDelete()) {
- if (AcTypeState.COMMISSIONED.equals(participantSyncMsg.getState())) {
- cacheProvider.removeElementDefinition(participantSyncMsg.getCompositionId());
- }
- for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
- cacheProvider.removeAutomationComposition(automationcomposition.getAutomationCompositionId());
- }
+ deleteScenario(participantSyncMsg);
return;
}
if (!participantSyncMsg.getParticipantDefinitionUpdates().isEmpty()) {
+ if (StateChangeResult.TIMEOUT.equals(participantSyncMsg.getStateChangeResult())) {
+ listener.cleanExecution(participantSyncMsg.getCompositionId(), participantSyncMsg.getMessageId());
+ }
+
List<AutomationCompositionElementDefinition> list = new ArrayList<>();
for (var participantDefinition : participantSyncMsg.getParticipantDefinitionUpdates()) {
list.addAll(participantDefinition.getAutomationCompositionElementDefinitionList());
@@ -134,6 +133,20 @@ public class AcDefinitionHandler {
for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
cacheProvider
.initializeAutomationComposition(participantSyncMsg.getCompositionId(), automationcomposition);
+ if (StateChangeResult.TIMEOUT.equals(automationcomposition.getStateChangeResult())) {
+ for (var element : automationcomposition.getAcElementList()) {
+ listener.cleanExecution(element.getId(), participantSyncMsg.getMessageId());
+ }
+ }
+ }
+ }
+
+ private void deleteScenario(ParticipantSync participantSyncMsg) {
+ if (AcTypeState.COMMISSIONED.equals(participantSyncMsg.getState())) {
+ cacheProvider.removeElementDefinition(participantSyncMsg.getCompositionId());
+ }
+ for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
+ cacheProvider.removeAutomationComposition(automationcomposition.getAutomationCompositionId());
}
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
index 65ad627d1..3837ec629 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
@@ -243,6 +243,7 @@ public class CacheProvider {
automationComposition.setLockState(participantRestartAc.getLockState());
automationComposition.setInstanceId(participantRestartAc.getAutomationCompositionId());
automationComposition.setElements(acElementMap);
+ automationComposition.setStateChangeResult(participantRestartAc.getStateChangeResult());
automationCompositions.put(automationComposition.getInstanceId(), automationComposition);
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
index 9a43bf4c3..c422b22b5 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
@@ -218,7 +218,13 @@ public class ThreadHandler implements Closeable {
executionMap.remove(instanceElement.elementId());
}
- private void cleanExecution(UUID execIdentificationId, UUID messageId) {
+ /**
+ * Clean Execution.
+ *
+ * @param execIdentificationId the identification Id
+ * @param messageId the messageId
+ */
+ public void cleanExecution(UUID execIdentificationId, UUID messageId) {
var process = executionMap.get(execIdentificationId);
if (process != null) {
if (!process.isDone()) {
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java
index c6259a28f..d72f5deea 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java
@@ -35,6 +35,7 @@ import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessag
import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
@@ -110,6 +111,28 @@ class AcDefinitionHandlerTest {
}
@Test
+ void syncCompositionDefinitionTimeout() {
+ var participantSyncMsg = new ParticipantSync();
+ participantSyncMsg.setState(AcTypeState.PRIMED);
+ participantSyncMsg.setStateChangeResult(StateChangeResult.TIMEOUT);
+ participantSyncMsg.setCompositionId(UUID.randomUUID());
+ participantSyncMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition());
+ var participantRestartAc = CommonTestData.createParticipantRestartAc();
+ participantRestartAc.setStateChangeResult(StateChangeResult.TIMEOUT);
+ participantSyncMsg.setAutomationcompositionList(List.of(participantRestartAc));
+
+ var cacheProvider = mock(CacheProvider.class);
+ var listener = mock(ThreadHandler.class);
+ var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
+ ach.handleParticipantSync(participantSyncMsg);
+ verify(cacheProvider).initializeAutomationComposition(any(UUID.class), any());
+ verify(cacheProvider).addElementDefinition(any(), any());
+ verify(listener).cleanExecution(participantSyncMsg.getCompositionId(), participantSyncMsg.getMessageId());
+ var elementId = participantRestartAc.getAcElementList().get(0).getId();
+ verify(listener).cleanExecution(elementId, participantSyncMsg.getMessageId());
+ }
+
+ @Test
void syncDeleteTest() {
var participantSyncMsg = new ParticipantSync();
participantSyncMsg.setState(AcTypeState.COMMISSIONED);
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
index c4cebb430..db67e5eea 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
@@ -198,7 +198,7 @@ public class SupervisionScanner {
if (nextSpNotCompleted != automationComposition.getPhase()) {
sendAutomationCompositionMsg(automationComposition, serviceTemplate, nextSpNotCompleted);
} else {
- handleTimeout(automationComposition);
+ handleTimeout(automationComposition, serviceTemplate);
}
}
}
@@ -217,7 +217,7 @@ public class SupervisionScanner {
if (completed) {
complete(automationComposition, serviceTemplate);
} else {
- handleTimeout(automationComposition);
+ handleTimeout(automationComposition, serviceTemplate);
}
}
@@ -250,11 +250,11 @@ public class SupervisionScanner {
automationComposition.getDeployState(), automationComposition.getLockState());
if (minStageNotCompleted != automationComposition.getPhase()) {
- savePahese(automationComposition, minStageNotCompleted);
+ savePhase(automationComposition, minStageNotCompleted);
LOGGER.debug("retry message AutomationCompositionMigration");
automationCompositionMigrationPublisher.send(automationComposition, minStageNotCompleted);
} else {
- handleTimeout(automationComposition);
+ handleTimeout(automationComposition, serviceTemplate);
}
}
}
@@ -300,10 +300,12 @@ public class SupervisionScanner {
acDefinition.setStateChangeResult(StateChangeResult.TIMEOUT);
acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(),
acDefinition.getState(), acDefinition.getStateChangeResult());
+ participantSyncPublisher.sendSync(acDefinition, null);
}
}
- private void handleTimeout(AutomationComposition automationComposition) {
+ private void handleTimeout(AutomationComposition automationComposition,
+ ToscaServiceTemplate serviceTemplate) {
LOGGER.debug("automation composition scan: transition from state {} to {} {} not completed",
automationComposition.getDeployState(), automationComposition.getLockState(),
automationComposition.getSubState());
@@ -318,10 +320,11 @@ public class SupervisionScanner {
LOGGER.debug("Report timeout for the ac instance {}", automationComposition.getInstanceId());
automationComposition.setStateChangeResult(StateChangeResult.TIMEOUT);
automationCompositionProvider.updateAcState(automationComposition);
+ participantSyncPublisher.sendSync(serviceTemplate, automationComposition);
}
}
- private void savePahese(AutomationComposition automationComposition, int startPhase) {
+ private void savePhase(AutomationComposition automationComposition, int startPhase) {
automationComposition.setLastMsg(TimestampHelper.now());
automationComposition.setPhase(startPhase);
automationCompositionProvider.updateAcState(automationComposition);
@@ -329,7 +332,7 @@ public class SupervisionScanner {
private void sendAutomationCompositionMsg(AutomationComposition automationComposition,
ToscaServiceTemplate serviceTemplate, int startPhase) {
- savePahese(automationComposition, startPhase);
+ savePhase(automationComposition, startPhase);
if (DeployState.DEPLOYING.equals(automationComposition.getDeployState())) {
LOGGER.debug("retry message AutomationCompositionDeploy");
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
index eb1db6f0e..d90b6f667 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
@@ -101,6 +101,7 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher<Parti
message.getExcludeReplicas().add(excludeReplicaId);
}
message.setState(acDefinition.getState());
+ message.setStateChangeResult(acDefinition.getStateChangeResult());
message.setMessageId(UUID.randomUUID());
message.setTimestamp(Instant.now());
if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
@@ -131,6 +132,7 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher<Parti
syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
syncAc.setDeployState(automationComposition.getDeployState());
syncAc.setLockState(automationComposition.getLockState());
+ syncAc.setStateChangeResult(automationComposition.getStateChangeResult());
if (DeployState.DELETED.equals(automationComposition.getDeployState())) {
message.setDelete(true);
} else {
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java
index 5cefd5f09..17cc8ad3b 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java
@@ -115,10 +115,11 @@ class SupervisionScannerTest {
void testAcDefinitionPrimeTimeout() {
var acDefinition = createAutomationCompositionDefinition(AcTypeState.PRIMING, StateChangeResult.NO_ERROR);
var acDefinitionProvider = createAcDefinitionProvider(acDefinition);
+ var participantSyncPublisher = mock(ParticipantSyncPublisher.class);
var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class),
- mock(ParticipantSyncPublisher.class), null, acRuntimeParameterGroup);
+ participantSyncPublisher, null, acRuntimeParameterGroup);
supervisionScanner.run();
// Ac Definition in Priming state
verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any());
@@ -126,25 +127,30 @@ class SupervisionScannerTest {
acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1);
supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class),
- mock(ParticipantSyncPublisher.class), null, acRuntimeParameterGroup);
+ participantSyncPublisher, null, acRuntimeParameterGroup);
supervisionScanner.run();
// set Timeout
verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(),
StateChangeResult.TIMEOUT);
+ verify(participantSyncPublisher).sendSync(any(AutomationCompositionDefinition.class), any());
clearInvocations(acDefinitionProvider);
+ clearInvocations(participantSyncPublisher);
acDefinition.setStateChangeResult(StateChangeResult.TIMEOUT);
supervisionScanner.run();
// already in Timeout
verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any());
+ verify(participantSyncPublisher, times(0)).sendSync(acDefinition, null);
clearInvocations(acDefinitionProvider);
+ clearInvocations(participantSyncPublisher);
// retry by the user
acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR);
supervisionScanner.run();
// set Timeout
verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(),
StateChangeResult.TIMEOUT);
+ verify(participantSyncPublisher).sendSync(any(AutomationCompositionDefinition.class), any());
clearInvocations(acDefinitionProvider);
for (var element : acDefinition.getElementStateMap().values()) {
@@ -263,31 +269,38 @@ class SupervisionScannerTest {
when(automationCompositionProvider.updateAcState(any())).thenReturn(automationComposition);
var automationCompositionDeployPublisher = mock(AutomationCompositionDeployPublisher.class);
var automationCompositionStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
+ var participantSyncPublisher = mock(ParticipantSyncPublisher.class);
var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1);
// verify timeout scenario
var scannerObj2 = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
- mock(ParticipantSyncPublisher.class), null, acRuntimeParameterGroup);
+ participantSyncPublisher, null, acRuntimeParameterGroup);
automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
automationComposition.setLastMsg(TimestampHelper.now());
scannerObj2.run();
- verify(automationCompositionProvider, times(1)).updateAcState(any(AutomationComposition.class));
+ verify(automationCompositionProvider).updateAcState(any(AutomationComposition.class));
+ verify(participantSyncPublisher).sendSync(any(ToscaServiceTemplate.class), any(AutomationComposition.class));
assertEquals(StateChangeResult.TIMEOUT, automationComposition.getStateChangeResult());
//already in TIMEOUT
clearInvocations(automationCompositionProvider);
+ clearInvocations(participantSyncPublisher);
scannerObj2.run();
verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class));
+ verify(participantSyncPublisher, times(0))
+ .sendSync(any(ToscaServiceTemplate.class), any(AutomationComposition.class));
clearInvocations(automationCompositionProvider);
+ clearInvocations(participantSyncPublisher);
for (Map.Entry<UUID, AutomationCompositionElement> entry : automationComposition.getElements().entrySet()) {
entry.getValue().setDeployState(DeployState.DEPLOYED);
}
scannerObj2.run();
- verify(automationCompositionProvider, times(1)).updateAcState(any(AutomationComposition.class));
+ verify(automationCompositionProvider).updateAcState(any(AutomationComposition.class));
+ verify(participantSyncPublisher).sendSync(any(ToscaServiceTemplate.class), any(AutomationComposition.class));
assertEquals(StateChangeResult.NO_ERROR, automationComposition.getStateChangeResult());
}