diff options
author | 2024-09-25 17:04:00 +0100 | |
---|---|---|
committer | 2024-09-30 09:43:38 +0000 | |
commit | 15302061ba7e9572ed4ea8c3aa0255d01ef07310 (patch) | |
tree | 94e97d4daafdfd7ed063306377270a5a5017ce6c | |
parent | 6ed514f31be046cf31caeada994e7b9c036b5f90 (diff) |
Handle timeout as hard timeout
Issue-ID: POLICY-5132
Change-Id: Ic78d346972e955f7700118ffa74be11643a885f0
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
9 files changed, 85 insertions, 19 deletions
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java index 3312752fa..5d4b8ac77 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java @@ -39,6 +39,7 @@ public class ParticipantRestartAc { private DeployState deployState; private LockState lockState; + private StateChangeResult stateChangeResult; private List<AcElementRestart> acElementList = new ArrayList<>(); @@ -51,6 +52,7 @@ public class ParticipantRestartAc { this.automationCompositionId = copyConstructor.automationCompositionId; this.deployState = copyConstructor.deployState; this.lockState = copyConstructor.lockState; + this.stateChangeResult = copyConstructor.stateChangeResult; this.acElementList = PfUtils.mapList(copyConstructor.acElementList, AcElementRestart::new); } } diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java index 85b715b86..2780a5b1e 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java @@ -31,6 +31,7 @@ import lombok.ToString; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; +import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.models.base.PfUtils; @Getter @@ -50,6 +51,7 @@ public class ParticipantSync extends ParticipantMessage { private Set<UUID> excludeReplicas = new HashSet<>(); private boolean restarting = false; private boolean delete = false; + private StateChangeResult stateChangeResult; /** * Constructor. @@ -72,5 +74,6 @@ public class ParticipantSync extends ParticipantMessage { this.excludeReplicas = new HashSet<>(source.excludeReplicas); this.restarting = source.restarting; this.delete = source.delete; + this.stateChangeResult = source.getStateChangeResult(); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java index b38df515a..9e3efce57 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java @@ -114,16 +114,15 @@ public class AcDefinitionHandler { public void handleParticipantSync(ParticipantSync participantSyncMsg) { if (participantSyncMsg.isDelete()) { - if (AcTypeState.COMMISSIONED.equals(participantSyncMsg.getState())) { - cacheProvider.removeElementDefinition(participantSyncMsg.getCompositionId()); - } - for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) { - cacheProvider.removeAutomationComposition(automationcomposition.getAutomationCompositionId()); - } + deleteScenario(participantSyncMsg); return; } if (!participantSyncMsg.getParticipantDefinitionUpdates().isEmpty()) { + if (StateChangeResult.TIMEOUT.equals(participantSyncMsg.getStateChangeResult())) { + listener.cleanExecution(participantSyncMsg.getCompositionId(), participantSyncMsg.getMessageId()); + } + List<AutomationCompositionElementDefinition> list = new ArrayList<>(); for (var participantDefinition : participantSyncMsg.getParticipantDefinitionUpdates()) { list.addAll(participantDefinition.getAutomationCompositionElementDefinitionList()); @@ -134,6 +133,20 @@ public class AcDefinitionHandler { for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) { cacheProvider .initializeAutomationComposition(participantSyncMsg.getCompositionId(), automationcomposition); + if (StateChangeResult.TIMEOUT.equals(automationcomposition.getStateChangeResult())) { + for (var element : automationcomposition.getAcElementList()) { + listener.cleanExecution(element.getId(), participantSyncMsg.getMessageId()); + } + } + } + } + + private void deleteScenario(ParticipantSync participantSyncMsg) { + if (AcTypeState.COMMISSIONED.equals(participantSyncMsg.getState())) { + cacheProvider.removeElementDefinition(participantSyncMsg.getCompositionId()); + } + for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) { + cacheProvider.removeAutomationComposition(automationcomposition.getAutomationCompositionId()); } } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java index 65ad627d1..3837ec629 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java @@ -243,6 +243,7 @@ public class CacheProvider { automationComposition.setLockState(participantRestartAc.getLockState()); automationComposition.setInstanceId(participantRestartAc.getAutomationCompositionId()); automationComposition.setElements(acElementMap); + automationComposition.setStateChangeResult(participantRestartAc.getStateChangeResult()); automationCompositions.put(automationComposition.getInstanceId(), automationComposition); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java index 9a43bf4c3..c422b22b5 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java @@ -218,7 +218,13 @@ public class ThreadHandler implements Closeable { executionMap.remove(instanceElement.elementId()); } - private void cleanExecution(UUID execIdentificationId, UUID messageId) { + /** + * Clean Execution. + * + * @param execIdentificationId the identification Id + * @param messageId the messageId + */ + public void cleanExecution(UUID execIdentificationId, UUID messageId) { var process = executionMap.get(execIdentificationId); if (process != null) { if (!process.isDone()) { diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java index c6259a28f..d72f5deea 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java @@ -35,6 +35,7 @@ import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessag import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; +import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync; @@ -110,6 +111,28 @@ class AcDefinitionHandlerTest { } @Test + void syncCompositionDefinitionTimeout() { + var participantSyncMsg = new ParticipantSync(); + participantSyncMsg.setState(AcTypeState.PRIMED); + participantSyncMsg.setStateChangeResult(StateChangeResult.TIMEOUT); + participantSyncMsg.setCompositionId(UUID.randomUUID()); + participantSyncMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition()); + var participantRestartAc = CommonTestData.createParticipantRestartAc(); + participantRestartAc.setStateChangeResult(StateChangeResult.TIMEOUT); + participantSyncMsg.setAutomationcompositionList(List.of(participantRestartAc)); + + var cacheProvider = mock(CacheProvider.class); + var listener = mock(ThreadHandler.class); + var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener); + ach.handleParticipantSync(participantSyncMsg); + verify(cacheProvider).initializeAutomationComposition(any(UUID.class), any()); + verify(cacheProvider).addElementDefinition(any(), any()); + verify(listener).cleanExecution(participantSyncMsg.getCompositionId(), participantSyncMsg.getMessageId()); + var elementId = participantRestartAc.getAcElementList().get(0).getId(); + verify(listener).cleanExecution(elementId, participantSyncMsg.getMessageId()); + } + + @Test void syncDeleteTest() { var participantSyncMsg = new ParticipantSync(); participantSyncMsg.setState(AcTypeState.COMMISSIONED); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java index c4cebb430..db67e5eea 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java @@ -198,7 +198,7 @@ public class SupervisionScanner { if (nextSpNotCompleted != automationComposition.getPhase()) { sendAutomationCompositionMsg(automationComposition, serviceTemplate, nextSpNotCompleted); } else { - handleTimeout(automationComposition); + handleTimeout(automationComposition, serviceTemplate); } } } @@ -217,7 +217,7 @@ public class SupervisionScanner { if (completed) { complete(automationComposition, serviceTemplate); } else { - handleTimeout(automationComposition); + handleTimeout(automationComposition, serviceTemplate); } } @@ -250,11 +250,11 @@ public class SupervisionScanner { automationComposition.getDeployState(), automationComposition.getLockState()); if (minStageNotCompleted != automationComposition.getPhase()) { - savePahese(automationComposition, minStageNotCompleted); + savePhase(automationComposition, minStageNotCompleted); LOGGER.debug("retry message AutomationCompositionMigration"); automationCompositionMigrationPublisher.send(automationComposition, minStageNotCompleted); } else { - handleTimeout(automationComposition); + handleTimeout(automationComposition, serviceTemplate); } } } @@ -300,10 +300,12 @@ public class SupervisionScanner { acDefinition.setStateChangeResult(StateChangeResult.TIMEOUT); acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(), acDefinition.getStateChangeResult()); + participantSyncPublisher.sendSync(acDefinition, null); } } - private void handleTimeout(AutomationComposition automationComposition) { + private void handleTimeout(AutomationComposition automationComposition, + ToscaServiceTemplate serviceTemplate) { LOGGER.debug("automation composition scan: transition from state {} to {} {} not completed", automationComposition.getDeployState(), automationComposition.getLockState(), automationComposition.getSubState()); @@ -318,10 +320,11 @@ public class SupervisionScanner { LOGGER.debug("Report timeout for the ac instance {}", automationComposition.getInstanceId()); automationComposition.setStateChangeResult(StateChangeResult.TIMEOUT); automationCompositionProvider.updateAcState(automationComposition); + participantSyncPublisher.sendSync(serviceTemplate, automationComposition); } } - private void savePahese(AutomationComposition automationComposition, int startPhase) { + private void savePhase(AutomationComposition automationComposition, int startPhase) { automationComposition.setLastMsg(TimestampHelper.now()); automationComposition.setPhase(startPhase); automationCompositionProvider.updateAcState(automationComposition); @@ -329,7 +332,7 @@ public class SupervisionScanner { private void sendAutomationCompositionMsg(AutomationComposition automationComposition, ToscaServiceTemplate serviceTemplate, int startPhase) { - savePahese(automationComposition, startPhase); + savePhase(automationComposition, startPhase); if (DeployState.DEPLOYING.equals(automationComposition.getDeployState())) { LOGGER.debug("retry message AutomationCompositionDeploy"); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java index eb1db6f0e..d90b6f667 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java @@ -101,6 +101,7 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher<Parti message.getExcludeReplicas().add(excludeReplicaId); } message.setState(acDefinition.getState()); + message.setStateChangeResult(acDefinition.getStateChangeResult()); message.setMessageId(UUID.randomUUID()); message.setTimestamp(Instant.now()); if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { @@ -131,6 +132,7 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher<Parti syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); syncAc.setDeployState(automationComposition.getDeployState()); syncAc.setLockState(automationComposition.getLockState()); + syncAc.setStateChangeResult(automationComposition.getStateChangeResult()); if (DeployState.DELETED.equals(automationComposition.getDeployState())) { message.setDelete(true); } else { diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java index 5cefd5f09..17cc8ad3b 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java @@ -115,10 +115,11 @@ class SupervisionScannerTest { void testAcDefinitionPrimeTimeout() { var acDefinition = createAutomationCompositionDefinition(AcTypeState.PRIMING, StateChangeResult.NO_ERROR); var acDefinitionProvider = createAcDefinitionProvider(acDefinition); + var participantSyncPublisher = mock(ParticipantSyncPublisher.class); var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class), - mock(ParticipantSyncPublisher.class), null, acRuntimeParameterGroup); + participantSyncPublisher, null, acRuntimeParameterGroup); supervisionScanner.run(); // Ac Definition in Priming state verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any()); @@ -126,25 +127,30 @@ class SupervisionScannerTest { acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1); supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class), - mock(ParticipantSyncPublisher.class), null, acRuntimeParameterGroup); + participantSyncPublisher, null, acRuntimeParameterGroup); supervisionScanner.run(); // set Timeout verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(), StateChangeResult.TIMEOUT); + verify(participantSyncPublisher).sendSync(any(AutomationCompositionDefinition.class), any()); clearInvocations(acDefinitionProvider); + clearInvocations(participantSyncPublisher); acDefinition.setStateChangeResult(StateChangeResult.TIMEOUT); supervisionScanner.run(); // already in Timeout verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any()); + verify(participantSyncPublisher, times(0)).sendSync(acDefinition, null); clearInvocations(acDefinitionProvider); + clearInvocations(participantSyncPublisher); // retry by the user acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR); supervisionScanner.run(); // set Timeout verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(), StateChangeResult.TIMEOUT); + verify(participantSyncPublisher).sendSync(any(AutomationCompositionDefinition.class), any()); clearInvocations(acDefinitionProvider); for (var element : acDefinition.getElementStateMap().values()) { @@ -263,31 +269,38 @@ class SupervisionScannerTest { when(automationCompositionProvider.updateAcState(any())).thenReturn(automationComposition); var automationCompositionDeployPublisher = mock(AutomationCompositionDeployPublisher.class); var automationCompositionStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); + var participantSyncPublisher = mock(ParticipantSyncPublisher.class); var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1); // verify timeout scenario var scannerObj2 = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - mock(ParticipantSyncPublisher.class), null, acRuntimeParameterGroup); + participantSyncPublisher, null, acRuntimeParameterGroup); automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); automationComposition.setLastMsg(TimestampHelper.now()); scannerObj2.run(); - verify(automationCompositionProvider, times(1)).updateAcState(any(AutomationComposition.class)); + verify(automationCompositionProvider).updateAcState(any(AutomationComposition.class)); + verify(participantSyncPublisher).sendSync(any(ToscaServiceTemplate.class), any(AutomationComposition.class)); assertEquals(StateChangeResult.TIMEOUT, automationComposition.getStateChangeResult()); //already in TIMEOUT clearInvocations(automationCompositionProvider); + clearInvocations(participantSyncPublisher); scannerObj2.run(); verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class)); + verify(participantSyncPublisher, times(0)) + .sendSync(any(ToscaServiceTemplate.class), any(AutomationComposition.class)); clearInvocations(automationCompositionProvider); + clearInvocations(participantSyncPublisher); for (Map.Entry<UUID, AutomationCompositionElement> entry : automationComposition.getElements().entrySet()) { entry.getValue().setDeployState(DeployState.DEPLOYED); } scannerObj2.run(); - verify(automationCompositionProvider, times(1)).updateAcState(any(AutomationComposition.class)); + verify(automationCompositionProvider).updateAcState(any(AutomationComposition.class)); + verify(participantSyncPublisher).sendSync(any(ToscaServiceTemplate.class), any(AutomationComposition.class)); assertEquals(StateChangeResult.NO_ERROR, automationComposition.getStateChangeResult()); } |