diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2023-06-20 16:02:40 +0100 |
---|---|---|
committer | Francesco Fiora <francesco.fiora@est.tech> | 2023-06-21 10:44:05 +0000 |
commit | 3af93b5db64ea87b16eb25d14bce3c79ee2193de (patch) | |
tree | be4e2fe166323360d66e3a28c0ccc31f00d7e7a8 /participant | |
parent | a859910c602a5384ddd3e1b85e95bc800c099640 (diff) |
Add multiple messages support in Intermediary
Issue-ID: POLICY-4708
Change-Id: I6401eebb5730dde2c62eabcbbe3b34539238ec04
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant')
9 files changed, 579 insertions, 147 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java index 6ca4d3cc7..48b60dc76 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java @@ -37,7 +37,7 @@ public interface AutomationCompositionElementListener { * @param automationCompositionElementId the ID of the automation composition element * @throws PfModelException in case of a model exception */ - public void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; + void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; /** * Handle an update on a automation composition element. @@ -47,20 +47,20 @@ public interface AutomationCompositionElementListener { * @param properties properties Map * @throws PfModelException from Policy framework */ - public void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties) + void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties) throws PfModelException; - public void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; + void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; - public void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; + void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; - public void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; + void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; - public void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties) + void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties) throws PfModelException; - public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList) + void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList) throws PfModelException; - public void deprime(UUID compositionId) throws PfModelException; + void deprime(UUID compositionId) throws PfModelException; } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java index 7e713654d..71c9d9abc 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java @@ -24,13 +24,13 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler; import java.util.HashMap; import java.util.List; import java.util.UUID; -import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener; import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher; import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy; import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils; +import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeploy; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeployAck; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionStateChange; @@ -39,7 +39,6 @@ import org.onap.policy.clamp.models.acm.messages.dmaap.participant.PropertiesUpd import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder; import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder; import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver; -import org.onap.policy.models.base.PfModelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -53,7 +52,7 @@ public class AutomationCompositionHandler { private final CacheProvider cacheProvider; private final ParticipantMessagePublisher publisher; - private final AutomationCompositionElementListener listener; + private final ThreadHandler listener; private final AcInstanceStateResolver acInstanceStateResolver; /** @@ -64,7 +63,7 @@ public class AutomationCompositionHandler { * @param listener the ThreadHandler Listener */ public AutomationCompositionHandler(CacheProvider cacheProvider, ParticipantMessagePublisher publisher, - AutomationCompositionElementListener listener) { + ThreadHandler listener) { this.cacheProvider = cacheProvider; this.publisher = publisher; this.listener = listener; @@ -84,17 +83,20 @@ public class AutomationCompositionHandler { var automationComposition = cacheProvider.getAutomationComposition(stateChangeMsg.getAutomationCompositionId()); if (automationComposition == null) { - var automationCompositionAck = - new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK); - automationCompositionAck.setParticipantId(cacheProvider.getParticipantId()); - automationCompositionAck.setMessage("Automation composition " + stateChangeMsg.getAutomationCompositionId() - + " does not use this participant " + cacheProvider.getParticipantId()); - automationCompositionAck.setResult(false); - automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId()); - automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId()); - publisher.sendAutomationCompositionAck(automationCompositionAck); - LOGGER.debug("Automation composition {} does not use this participant", - stateChangeMsg.getAutomationCompositionId()); + if (DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) { + var automationCompositionAck = new AutomationCompositionDeployAck( + ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK); + automationCompositionAck.setParticipantId(cacheProvider.getParticipantId()); + automationCompositionAck.setMessage("Already deleted or never used"); + automationCompositionAck.setResult(true); + automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR); + automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId()); + automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId()); + publisher.sendAutomationCompositionAck(automationCompositionAck); + } else { + LOGGER.debug("Automation composition {} does not use this participant", + stateChangeMsg.getAutomationCompositionId()); + } return; } @@ -106,18 +108,18 @@ public class AutomationCompositionHandler { } if (DeployOrder.NONE.equals(stateChangeMsg.getDeployOrderedState())) { - handleLockOrderState(automationComposition, stateChangeMsg.getLockOrderedState(), - stateChangeMsg.getStartPhase()); + handleLockOrderState(stateChangeMsg.getMessageId(), automationComposition, + stateChangeMsg.getLockOrderedState(), stateChangeMsg.getStartPhase()); } else { - handleDeployOrderState(automationComposition, stateChangeMsg.getDeployOrderedState(), - stateChangeMsg.getStartPhase()); + handleDeployOrderState(stateChangeMsg.getMessageId(), automationComposition, + stateChangeMsg.getDeployOrderedState(), stateChangeMsg.getStartPhase()); } } private boolean checkConsistantOrderState(AutomationComposition automationComposition, DeployOrder deployOrder, LockOrder lockOrder) { if (DeployOrder.UPDATE.equals(deployOrder)) { - return false; + return true; } return acInstanceStateResolver.resolve(deployOrder, lockOrder, automationComposition.getDeployState(), automationComposition.getLockState(), automationComposition.getStateChangeResult()) != null; @@ -126,19 +128,20 @@ public class AutomationCompositionHandler { /** * Method to handle state changes. * + * @param messageId the messageId * @param automationComposition participant response * @param orderedState automation composition ordered state * @param startPhaseMsg startPhase from message */ - private void handleDeployOrderState(final AutomationComposition automationComposition, DeployOrder orderedState, - Integer startPhaseMsg) { + private void handleDeployOrderState(UUID messageId, final AutomationComposition automationComposition, + DeployOrder orderedState, Integer startPhaseMsg) { switch (orderedState) { case UNDEPLOY: - handleUndeployState(automationComposition, startPhaseMsg); + handleUndeployState(messageId, automationComposition, startPhaseMsg); break; case DELETE: - handleDeleteState(automationComposition, startPhaseMsg); + handleDeleteState(messageId, automationComposition, startPhaseMsg); break; default: @@ -150,19 +153,20 @@ public class AutomationCompositionHandler { /** * Method to handle state changes. * + * @param messageId the messageId * @param automationComposition participant response * @param orderedState automation composition ordered state * @param startPhaseMsg startPhase from message */ - private void handleLockOrderState(final AutomationComposition automationComposition, LockOrder orderedState, - Integer startPhaseMsg) { + private void handleLockOrderState(UUID messageId, final AutomationComposition automationComposition, + LockOrder orderedState, Integer startPhaseMsg) { switch (orderedState) { case LOCK: - handleLockState(automationComposition, startPhaseMsg); + handleLockState(messageId, automationComposition, startPhaseMsg); break; case UNLOCK: - handleUnlockState(automationComposition, startPhaseMsg); + handleUnlockState(messageId, automationComposition, startPhaseMsg); break; default: LOGGER.error("StateChange message has no state, state is null {}", automationComposition.getKey()); @@ -188,7 +192,7 @@ public class AutomationCompositionHandler { updateExistingElementsOnThisParticipant(updateMsg.getAutomationCompositionId(), participantDeploy); - callParticipantUpdateProperty(participantDeploy.getAcElementList(), + callParticipantUpdateProperty(updateMsg.getMessageId(), participantDeploy.getAcElementList(), updateMsg.getAutomationCompositionId()); } } @@ -212,35 +216,28 @@ public class AutomationCompositionHandler { cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(), deployMsg.getAutomationCompositionId(), participantDeploy); } - callParticipanDeploy(participantDeploy.getAcElementList(), deployMsg.getStartPhase(), - deployMsg.getAutomationCompositionId()); + callParticipanDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(), + deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId()); } } } - private void callParticipanDeploy(List<AcElementDeploy> acElements, Integer startPhaseMsg, UUID instanceId) { - try { - for (var element : acElements) { - var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId()); - int startPhase = ParticipantUtils.findStartPhase(commonProperties); - if (startPhaseMsg.equals(startPhase)) { - var map = new HashMap<>(commonProperties); - map.putAll(element.getProperties()); - listener.deploy(instanceId, element, map); - } + private void callParticipanDeploy(UUID messageId, List<AcElementDeploy> acElements, Integer startPhaseMsg, + UUID instanceId) { + for (var element : acElements) { + var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId()); + int startPhase = ParticipantUtils.findStartPhase(commonProperties); + if (startPhaseMsg.equals(startPhase)) { + var map = new HashMap<>(commonProperties); + map.putAll(element.getProperties()); + listener.deploy(messageId, instanceId, element, map); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Deploy failed {}", instanceId); } } - private void callParticipantUpdateProperty(List<AcElementDeploy> acElements, UUID instanceId) { - try { - for (var element : acElements) { - listener.update(instanceId, element, element.getProperties()); - } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element update failed {}", instanceId); + private void callParticipantUpdateProperty(UUID messageId, List<AcElementDeploy> acElements, UUID instanceId) { + for (var element : acElements) { + listener.update(messageId, instanceId, element, element.getProperties()); } } @@ -255,101 +252,86 @@ public class AutomationCompositionHandler { /** * Method to handle when the new state from participant is UNINITIALISED state. * + * @param messageId the messageId * @param automationComposition participant response * @param startPhaseMsg startPhase from message */ - private void handleUndeployState(final AutomationComposition automationComposition, Integer startPhaseMsg) { - try { - for (var acElement : automationComposition.getElements().values()) { - int startPhase = ParticipantUtils.findStartPhase( - cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); - if (startPhaseMsg.equals(startPhase)) { - listener.undeploy(automationComposition.getInstanceId(), acElement.getId()); - } + private void handleUndeployState(UUID messageId, final AutomationComposition automationComposition, + Integer startPhaseMsg) { + for (var acElement : automationComposition.getElements().values()) { + int startPhase = ParticipantUtils.findStartPhase( + cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); + if (startPhaseMsg.equals(startPhase)) { + listener.undeploy(messageId, automationComposition.getInstanceId(), acElement.getId()); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Undeploy failed {}", automationComposition.getInstanceId()); } } - private void handleDeleteState(final AutomationComposition automationComposition, Integer startPhaseMsg) { - try { - for (var acElement : automationComposition.getElements().values()) { - int startPhase = ParticipantUtils.findStartPhase( - cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); - if (startPhaseMsg.equals(startPhase)) { - listener.delete(automationComposition.getInstanceId(), acElement.getId()); - } + private void handleDeleteState(UUID messageId, final AutomationComposition automationComposition, + Integer startPhaseMsg) { + for (var acElement : automationComposition.getElements().values()) { + int startPhase = ParticipantUtils.findStartPhase( + cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); + if (startPhaseMsg.equals(startPhase)) { + listener.delete(messageId, automationComposition.getInstanceId(), acElement.getId()); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Delete failed {}", automationComposition.getInstanceId()); } } /** * Method to handle when the new state from participant is PASSIVE state. * + * @param messageId the messageId * @param automationComposition participant response * @param startPhaseMsg startPhase from message */ - private void handleLockState(final AutomationComposition automationComposition, Integer startPhaseMsg) { - try { - for (var acElement : automationComposition.getElements().values()) { - int startPhase = ParticipantUtils.findStartPhase( - cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); - if (startPhaseMsg.equals(startPhase)) { - listener.lock(automationComposition.getInstanceId(), acElement.getId()); - } + private void handleLockState(UUID messageId, final AutomationComposition automationComposition, + Integer startPhaseMsg) { + for (var acElement : automationComposition.getElements().values()) { + int startPhase = ParticipantUtils.findStartPhase( + cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); + if (startPhaseMsg.equals(startPhase)) { + listener.lock(messageId, automationComposition.getInstanceId(), acElement.getId()); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Lock failed {}", automationComposition.getInstanceId()); } } /** * Method to handle when the new state from participant is RUNNING state. * + * @param messageId the messageId * @param automationComposition participant response * @param startPhaseMsg startPhase from message */ - private void handleUnlockState(final AutomationComposition automationComposition, Integer startPhaseMsg) { - try { - for (var acElement : automationComposition.getElements().values()) { - int startPhase = ParticipantUtils.findStartPhase( - cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); - if (startPhaseMsg.equals(startPhase)) { - listener.unlock(automationComposition.getInstanceId(), acElement.getId()); - } + private void handleUnlockState(UUID messageId, final AutomationComposition automationComposition, + Integer startPhaseMsg) { + for (var acElement : automationComposition.getElements().values()) { + int startPhase = ParticipantUtils.findStartPhase( + cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); + if (startPhaseMsg.equals(startPhase)) { + listener.unlock(messageId, automationComposition.getInstanceId(), acElement.getId()); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Unlock failed {}", automationComposition.getInstanceId()); } } /** * Handles prime a Composition Definition. * + * @param messageId the messageId * @param compositionId the compositionId * @param list the list of AutomationCompositionElementDefinition */ - public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> list) { - try { - listener.prime(compositionId, list); - } catch (PfModelException e) { - LOGGER.debug("Composition prime failed {}", compositionId); - } + public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) { + listener.prime(messageId, compositionId, list); } /** * Handles deprime a Composition Definition. * + * @param messageId the messageId * @param compositionId the compositionId */ - public void deprime(UUID compositionId) { - try { - listener.deprime(compositionId); - } catch (PfModelException e) { - LOGGER.debug("Composition deprime failed {}", compositionId); - } + public void deprime(UUID messageId, UUID compositionId) { + listener.deprime(messageId, compositionId); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java index 2c34652c8..cfd61c4fe 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java @@ -99,6 +99,7 @@ public class AutomationCompositionOutHandler { new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK); automationCompositionStateChangeAck.setParticipantId(cacheProvider.getParticipantId()); automationCompositionStateChangeAck.setMessage(message); + automationCompositionStateChangeAck.setResponseTo(cacheProvider.getMsgIdentification().get(element.getId())); automationCompositionStateChangeAck.setStateChangeResult(stateChangeResult); automationCompositionStateChangeAck.setAutomationCompositionId(automationCompositionId); automationCompositionStateChangeAck.getAutomationCompositionResultMap().put(element.getId(), @@ -107,6 +108,7 @@ public class AutomationCompositionOutHandler { LOGGER.debug("Automation composition element {} state changed to {}", elementId, deployState); automationCompositionStateChangeAck.setResult(true); publisher.sendAutomationCompositionAck(automationCompositionStateChangeAck); + cacheProvider.getMsgIdentification().remove(element.getId()); } private void handleDeployState(AutomationComposition automationComposition, AutomationCompositionElement element, @@ -213,10 +215,12 @@ public class AutomationCompositionOutHandler { participantPrimeAck.setCompositionId(compositionId); participantPrimeAck.setMessage(message); participantPrimeAck.setResult(true); + participantPrimeAck.setResponseTo(cacheProvider.getMsgIdentification().get(compositionId)); participantPrimeAck.setCompositionState(state); participantPrimeAck.setStateChangeResult(stateChangeResult); participantPrimeAck.setParticipantId(cacheProvider.getParticipantId()); participantPrimeAck.setState(ParticipantState.ON_LINE); publisher.sendParticipantPrimeAck(participantPrimeAck); + cacheProvider.getMsgIdentification().remove(compositionId); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java index 09e75e8d5..119cc11b5 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java @@ -55,6 +55,9 @@ public class CacheProvider { private final Map<UUID, Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition>> acElementsDefinitions = new ConcurrentHashMap<>(); + @Getter + private final Map<UUID, UUID> msgIdentification = new ConcurrentHashMap<>(); + /** * Constructor. * diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java index 0e7e193f6..3a3a0cced 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java @@ -186,11 +186,13 @@ public class ParticipantHandler { } } cacheProvider.addElementDefinition(participantPrimeMsg.getCompositionId(), list); - automationCompositionHandler.prime(participantPrimeMsg.getCompositionId(), list); + automationCompositionHandler.prime(participantPrimeMsg.getMessageId(), + participantPrimeMsg.getCompositionId(), list); } else { // deprime cacheProvider.removeElementDefinition(participantPrimeMsg.getCompositionId()); - automationCompositionHandler.deprime(participantPrimeMsg.getCompositionId()); + automationCompositionHandler.deprime(participantPrimeMsg.getMessageId(), + participantPrimeMsg.getCompositionId()); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java new file mode 100644 index 000000000..b5866d71f --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java @@ -0,0 +1,275 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.participant.intermediary.handler; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import lombok.RequiredArgsConstructor; +import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener; +import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi; +import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy; +import org.onap.policy.clamp.models.acm.concepts.AcTypeState; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition; +import org.onap.policy.clamp.models.acm.concepts.DeployState; +import org.onap.policy.clamp.models.acm.concepts.LockState; +import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; +import org.onap.policy.models.base.PfModelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class ThreadHandler implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class); + + private final AutomationCompositionElementListener listener; + private final ParticipantIntermediaryApi intermediaryApi; + private final CacheProvider cacheProvider; + + private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>(); + + private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + + /** + * Handle an update on a automation composition element. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param element the information on the automation composition element + * @param properties properties Map + */ + public void deploy(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) { + cleanExecution(element.getId(), messageId); + var result = executor.submit(() -> this.deployProcess(instanceId, element, properties)); + executionMap.put(element.getId(), result); + } + + private void deployProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) { + try { + listener.deploy(instanceId, element, properties); + } catch (PfModelException e) { + LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.UNDEPLOYED, + null, StateChangeResult.FAILED, "Automation composition element deploy failed"); + } + executionMap.remove(element.getId()); + } + + /** + * Handle a automation composition element state change. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param elementId the ID of the automation composition element + */ + public void undeploy(UUID messageId, UUID instanceId, UUID elementId) { + cleanExecution(elementId, messageId); + var result = executor.submit(() -> this.undeployProcess(instanceId, elementId)); + executionMap.put(elementId, result); + } + + private void undeployProcess(UUID instanceId, UUID elementId) { + try { + listener.undeploy(instanceId, elementId); + } catch (PfModelException e) { + LOGGER.error("Automation composition element undeploy failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DEPLOYED, null, + StateChangeResult.FAILED, "Automation composition element undeploy failed"); + } + executionMap.remove(elementId); + } + + /** + * Handle a automation composition element lock. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param elementId the ID of the automation composition element + */ + public void lock(UUID messageId, UUID instanceId, UUID elementId) { + cleanExecution(elementId, messageId); + var result = executor.submit(() -> this.lockProcess(instanceId, elementId)); + executionMap.put(elementId, result); + } + + private void lockProcess(UUID instanceId, UUID elementId) { + try { + listener.lock(instanceId, elementId); + } catch (PfModelException e) { + LOGGER.error("Automation composition element lock failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED, + StateChangeResult.FAILED, "Automation composition element lock failed"); + } + executionMap.remove(elementId); + } + + /** + * Handle a automation composition element unlock. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param elementId the ID of the automation composition element + */ + public void unlock(UUID messageId, UUID instanceId, UUID elementId) { + cleanExecution(elementId, messageId); + var result = executor.submit(() -> this.unlockProcess(instanceId, elementId)); + executionMap.put(elementId, result); + } + + private void unlockProcess(UUID instanceId, UUID elementId) { + try { + listener.unlock(instanceId, elementId); + } catch (PfModelException e) { + LOGGER.error("Automation composition element unlock failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED, + StateChangeResult.FAILED, "Automation composition element unlock failed"); + } + executionMap.remove(elementId); + } + + /** + * Handle a automation composition element delete. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param elementId the ID of the automation composition element + */ + public void delete(UUID messageId, UUID instanceId, UUID elementId) { + cleanExecution(elementId, messageId); + var result = executor.submit(() -> this.deleteProcess(instanceId, elementId)); + executionMap.put(elementId, result); + } + + private void deleteProcess(UUID instanceId, UUID elementId) { + try { + listener.delete(instanceId, elementId); + } catch (PfModelException e) { + LOGGER.error("Automation composition element delete failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.UNDEPLOYED, null, + StateChangeResult.FAILED, "Automation composition element delete failed"); + } + executionMap.remove(elementId); + } + + /** + * Handle a automation composition element properties update. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param element the information on the automation composition element + * @param properties properties Map + */ + public void update(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) { + cleanExecution(element.getId(), messageId); + var result = executor.submit(() -> this.updateProcess(instanceId, element, properties)); + executionMap.put(element.getId(), result); + } + + private void updateProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) { + try { + listener.update(instanceId, element, properties); + } catch (PfModelException e) { + LOGGER.error("Automation composition element update failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED, + null, StateChangeResult.FAILED, "Automation composition element update failed"); + } + executionMap.remove(element.getId()); + } + + private void cleanExecution(UUID execIdentificationId, UUID messageId) { + var process = executionMap.get(execIdentificationId); + if (process != null) { + if (!process.isDone()) { + process.cancel(true); + } + executionMap.remove(execIdentificationId); + } + cacheProvider.getMsgIdentification().put(execIdentificationId, messageId); + } + + /** + * Handles prime a Composition Definition. + * + * @param messageId the messageId + * @param compositionId the compositionId + * @param list the list of AutomationCompositionElementDefinition + */ + public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) { + cleanExecution(compositionId, messageId); + var result = executor.submit(() -> this.primeProcess(compositionId, list)); + executionMap.put(compositionId, result); + } + + private void primeProcess(UUID compositionId, List<AutomationCompositionElementDefinition> list) { + try { + listener.prime(compositionId, list); + executionMap.remove(compositionId); + } catch (PfModelException e) { + LOGGER.error("Composition Defintion prime failed {} {}", compositionId, e.getMessage()); + intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.FAILED, + "Composition Defintion prime failed"); + } + } + + /** + * Handles deprime a Composition Definition. + * + * @param messageId the messageId + * @param compositionId the compositionId + */ + public void deprime(UUID messageId, UUID compositionId) { + cleanExecution(compositionId, messageId); + var result = executor.submit(() -> this.deprimeProcess(compositionId)); + executionMap.put(compositionId, result); + } + + private void deprimeProcess(UUID compositionId) { + try { + listener.deprime(compositionId); + executionMap.remove(compositionId); + } catch (PfModelException e) { + LOGGER.error("Composition Defintion deprime failed {} {}", compositionId, e.getMessage()); + intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED, + "Composition Defintion deprime failed"); + } + } + + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + executor.shutdown(); + } +} diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java index b4397b49f..dd49ee318 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener; import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher; import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy; @@ -44,7 +43,6 @@ import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCom import org.onap.policy.clamp.models.acm.messages.dmaap.participant.PropertiesUpdate; import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder; import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder; -import org.onap.policy.models.base.PfModelException; import org.springframework.test.context.junit.jupiter.SpringExtension; @ExtendWith(SpringExtension.class) @@ -54,13 +52,14 @@ class AutomationCompositionHandlerTest { void handleAutomationCompositionStateChangeNullTest() { var participantMessagePublisher = mock(ParticipantMessagePublisher.class); var cacheProvider = mock(CacheProvider.class); - var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, - mock(AutomationCompositionElementListener.class)); + var ach = + new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, mock(ThreadHandler.class)); var automationCompositionStateChange = new AutomationCompositionStateChange(); assertDoesNotThrow(() -> ach.handleAutomationCompositionStateChange(automationCompositionStateChange)); automationCompositionStateChange.setAutomationCompositionId(UUID.randomUUID()); + automationCompositionStateChange.setDeployOrderedState(DeployOrder.DELETE); assertDoesNotThrow(() -> ach.handleAutomationCompositionStateChange(automationCompositionStateChange)); verify(participantMessagePublisher).sendAutomationCompositionAck(any(AutomationCompositionDeployAck.class)); @@ -73,7 +72,7 @@ class AutomationCompositionHandlerTest { } @Test - void handleAutomationCompositionStateChangeUndeployTest() throws PfModelException { + void handleAutomationCompositionStateChangeUndeployTest() { var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next(); var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(), automationComposition.getInstanceId(), DeployOrder.UNDEPLOY, LockOrder.NONE); @@ -83,14 +82,14 @@ class AutomationCompositionHandlerTest { when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of()); var participantMessagePublisher = mock(ParticipantMessagePublisher.class); - var listener = mock(AutomationCompositionElementListener.class); + var listener = mock(ThreadHandler.class); var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener); ach.handleAutomationCompositionStateChange(automationCompositionStateChange); - verify(listener, times(automationComposition.getElements().size())).undeploy(any(), any()); + verify(listener, times(automationComposition.getElements().size())).undeploy(any(), any(), any()); } @Test - void handleAutomationCompositionStateChangeLockTest() throws PfModelException { + void handleAutomationCompositionStateChangeLockTest() { var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next(); var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(), automationComposition.getInstanceId(), DeployOrder.NONE, LockOrder.LOCK); @@ -100,14 +99,14 @@ class AutomationCompositionHandlerTest { when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of()); var participantMessagePublisher = mock(ParticipantMessagePublisher.class); - var listener = mock(AutomationCompositionElementListener.class); + var listener = mock(ThreadHandler.class); var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener); ach.handleAutomationCompositionStateChange(automationCompositionStateChange); - verify(listener, times(automationComposition.getElements().size())).lock(any(), any()); + verify(listener, times(automationComposition.getElements().size())).lock(any(), any(), any()); } @Test - void handleAutomationCompositionStateChangeUnlockTest() throws PfModelException { + void handleAutomationCompositionStateChangeUnlockTest() { var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next(); var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(), automationComposition.getInstanceId(), DeployOrder.NONE, LockOrder.UNLOCK); @@ -117,14 +116,14 @@ class AutomationCompositionHandlerTest { when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of()); var participantMessagePublisher = mock(ParticipantMessagePublisher.class); - var listener = mock(AutomationCompositionElementListener.class); + var listener = mock(ThreadHandler.class); var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener); ach.handleAutomationCompositionStateChange(automationCompositionStateChange); - verify(listener, times(automationComposition.getElements().size())).unlock(any(), any()); + verify(listener, times(automationComposition.getElements().size())).unlock(any(), any(), any()); } @Test - void handleAutomationCompositionStateChangeDeleteTest() throws PfModelException { + void handleAutomationCompositionStateChangeDeleteTest() { var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next(); var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(), automationComposition.getInstanceId(), DeployOrder.DELETE, LockOrder.NONE); @@ -134,16 +133,16 @@ class AutomationCompositionHandlerTest { when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of()); var participantMessagePublisher = mock(ParticipantMessagePublisher.class); - var listener = mock(AutomationCompositionElementListener.class); + var listener = mock(ThreadHandler.class); var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener); ach.handleAutomationCompositionStateChange(automationCompositionStateChange); - verify(listener, times(automationComposition.getElements().size())).delete(any(), any()); + verify(listener, times(automationComposition.getElements().size())).delete(any(), any(), any()); } @Test - void handleAcPropertyUpdateTest() throws PfModelException { + void handleAcPropertyUpdateTest() { var cacheProvider = mock(CacheProvider.class); - var listener = mock(AutomationCompositionElementListener.class); + var listener = mock(ThreadHandler.class); var participantMessagePublisher = mock(ParticipantMessagePublisher.class); var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener); @@ -166,13 +165,13 @@ class AutomationCompositionHandlerTest { participantDeploy.getAcElementList().add(acElementDeploy); ach.handleAcPropertyUpdate(updateMsg); - verify(listener).update(any(), any(), any()); + verify(listener).update(any(), any(), any(), any()); } @Test - void handleAutomationCompositionDeployTest() throws PfModelException { + void handleAutomationCompositionDeployTest() { var cacheProvider = mock(CacheProvider.class); - var listener = mock(AutomationCompositionElementListener.class); + var listener = mock(ThreadHandler.class); var participantMessagePublisher = mock(ParticipantMessagePublisher.class); var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener); @@ -196,27 +195,29 @@ class AutomationCompositionHandlerTest { participantDeploy.getAcElementList().add(acElementDeploy); } ach.handleAutomationCompositionDeploy(deployMsg); - verify(listener, times(automationComposition.getElements().size())).deploy(any(), any(), any()); + verify(listener, times(automationComposition.getElements().size())).deploy(any(), any(), any(), any()); } @Test - void handleComposiotPrimeTest() throws PfModelException { - var listener = mock(AutomationCompositionElementListener.class); + void handleComposiotPrimeTest() { + var listener = mock(ThreadHandler.class); var ach = new AutomationCompositionHandler(mock(CacheProvider.class), mock(ParticipantMessagePublisher.class), listener); var compositionId = UUID.randomUUID(); var list = List.of(new AutomationCompositionElementDefinition()); - ach.prime(compositionId, list); - verify(listener).prime(compositionId, list); + var messageId = UUID.randomUUID(); + ach.prime(messageId, compositionId, list); + verify(listener).prime(messageId, compositionId, list); } @Test - void handleComposiotDeprimeTest() throws PfModelException { - var listener = mock(AutomationCompositionElementListener.class); + void handleComposiotDeprimeTest() { + var listener = mock(ThreadHandler.class); var ach = new AutomationCompositionHandler(mock(CacheProvider.class), mock(ParticipantMessagePublisher.class), listener); var compositionId = UUID.randomUUID(); - ach.deprime(compositionId); - verify(listener).deprime(compositionId); + var messageId = UUID.randomUUID(); + ach.deprime(messageId, compositionId); + verify(listener).deprime(messageId, compositionId); } } diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java index 895d4ed59..237cab224 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java @@ -158,17 +158,20 @@ class ParticipantHandlerTest { void handleParticipantPrimeTest() { var cacheProvider = mock(CacheProvider.class); when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId()); + + var participantPrime = new ParticipantPrime(); + participantPrime.setCompositionId(UUID.randomUUID()); + participantPrime.setMessageId(UUID.randomUUID()); + participantPrime.setParticipantDefinitionUpdates(List.of(createParticipantDefinition())); + var publisher = mock(ParticipantMessagePublisher.class); var acHandler = mock(AutomationCompositionHandler.class); var participantHandler = new ParticipantHandler(acHandler, mock(AutomationCompositionOutHandler.class), publisher, cacheProvider); - var participantPrime = new ParticipantPrime(); - participantPrime.setCompositionId(UUID.randomUUID()); - participantPrime.setParticipantDefinitionUpdates(List.of(createParticipantDefinition())); participantHandler.handleParticipantPrime(participantPrime); verify(cacheProvider).addElementDefinition(any(), any()); - verify(acHandler).prime(any(), any()); + verify(acHandler).prime(any(), any(), any()); } @Test @@ -182,9 +185,11 @@ class ParticipantHandlerTest { var participantPrime = new ParticipantPrime(); var compositionId = UUID.randomUUID(); participantPrime.setCompositionId(compositionId); + var messageId = UUID.randomUUID(); + participantPrime.setMessageId(messageId); participantHandler.handleParticipantPrime(participantPrime); verify(cacheProvider).removeElementDefinition(compositionId); - verify(acHandler).deprime(compositionId); + verify(acHandler).deprime(messageId, compositionId); } @Test diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java new file mode 100644 index 000000000..767a916b5 --- /dev/null +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java @@ -0,0 +1,160 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.participant.intermediary.handler; + +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import javax.ws.rs.core.Response.Status; +import org.junit.jupiter.api.Test; +import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener; +import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi; +import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy; +import org.onap.policy.clamp.models.acm.concepts.AcTypeState; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition; +import org.onap.policy.clamp.models.acm.concepts.DeployState; +import org.onap.policy.clamp.models.acm.concepts.LockState; +import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; +import org.onap.policy.models.base.PfModelException; + +class ThreadHandlerTest { + + private static final int TIMEOUT = 400; + + @Test + void test() throws PfModelException { + var listener = mock(AutomationCompositionElementListener.class); + var intermediaryApi = mock(ParticipantIntermediaryApi.class); + var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class)); + + var compositionId = UUID.randomUUID(); + var list = List.of(new AutomationCompositionElementDefinition()); + var messageId = UUID.randomUUID(); + threadHandler.prime(messageId, compositionId, list); + verify(listener, timeout(TIMEOUT)).prime(compositionId, list); + + clearInvocations(listener); + var element = new AcElementDeploy(); + var elementId = UUID.randomUUID(); + element.setId(elementId); + Map<String, Object> properties = Map.of("key", "value"); + var instanceId = UUID.randomUUID(); + threadHandler.deploy(messageId, instanceId, element, properties); + verify(listener, timeout(TIMEOUT)).deploy(instanceId, element, properties); + + clearInvocations(listener); + threadHandler.update(messageId, instanceId, element, properties); + verify(listener, timeout(TIMEOUT)).update(instanceId, element, properties); + + clearInvocations(listener); + threadHandler.lock(messageId, instanceId, elementId); + verify(listener, timeout(TIMEOUT)).lock(instanceId, elementId); + + clearInvocations(listener); + threadHandler.unlock(messageId, instanceId, elementId); + verify(listener, timeout(TIMEOUT)).unlock(instanceId, elementId); + + clearInvocations(listener); + threadHandler.undeploy(messageId, instanceId, elementId); + verify(listener, timeout(TIMEOUT)).undeploy(instanceId, elementId); + + clearInvocations(listener); + threadHandler.delete(messageId, instanceId, elementId); + verify(listener, timeout(TIMEOUT)).delete(instanceId, elementId); + + clearInvocations(listener); + threadHandler.deprime(messageId, compositionId); + verify(listener, timeout(TIMEOUT)).deprime(compositionId); + } + + @Test + void testException() throws PfModelException { + var listener = mock(AutomationCompositionElementListener.class); + var intermediaryApi = mock(ParticipantIntermediaryApi.class); + var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class)); + + var compositionId = UUID.randomUUID(); + var list = List.of(new AutomationCompositionElementDefinition()); + doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).prime(compositionId, list); + var messageId = UUID.randomUUID(); + threadHandler.prime(messageId, compositionId, list); + verify(intermediaryApi, timeout(TIMEOUT)).updateCompositionState(compositionId, AcTypeState.COMMISSIONED, + StateChangeResult.FAILED, "Composition Defintion prime failed"); + + clearInvocations(intermediaryApi); + var element = new AcElementDeploy(); + var elementId = UUID.randomUUID(); + element.setId(elementId); + Map<String, Object> properties = Map.of("key", "value"); + var instanceId = UUID.randomUUID(); + doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).deploy(instanceId, element, + properties); + threadHandler.deploy(messageId, instanceId, element, properties); + verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, + DeployState.UNDEPLOYED, null, StateChangeResult.FAILED, "Automation composition element deploy failed"); + + clearInvocations(listener); + doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).update(instanceId, element, + properties); + threadHandler.update(messageId, instanceId, element, properties); + verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, + DeployState.DEPLOYED, null, StateChangeResult.FAILED, "Automation composition element update failed"); + + clearInvocations(listener); + doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).lock(instanceId, elementId); + threadHandler.lock(messageId, instanceId, elementId); + verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, null, + LockState.UNLOCKED, StateChangeResult.FAILED, "Automation composition element lock failed"); + + clearInvocations(listener); + doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).unlock(instanceId, + elementId); + threadHandler.unlock(messageId, instanceId, elementId); + verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, null, + LockState.LOCKED, StateChangeResult.FAILED, "Automation composition element unlock failed"); + + clearInvocations(listener); + doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).undeploy(instanceId, + elementId); + threadHandler.undeploy(messageId, instanceId, elementId); + verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, + DeployState.DEPLOYED, null, StateChangeResult.FAILED, "Automation composition element undeploy failed"); + + clearInvocations(listener); + doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).delete(instanceId, + elementId); + threadHandler.delete(messageId, instanceId, elementId); + verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, + DeployState.UNDEPLOYED, null, StateChangeResult.FAILED, "Automation composition element delete failed"); + + clearInvocations(listener); + doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).deprime(compositionId); + threadHandler.deprime(messageId, compositionId); + verify(intermediaryApi, timeout(TIMEOUT)).updateCompositionState(compositionId, + AcTypeState.PRIMED, StateChangeResult.FAILED, "Composition Defintion deprime failed"); + } +} |