diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2023-06-20 16:02:40 +0100 |
---|---|---|
committer | Francesco Fiora <francesco.fiora@est.tech> | 2023-06-21 10:44:05 +0000 |
commit | 3af93b5db64ea87b16eb25d14bce3c79ee2193de (patch) | |
tree | be4e2fe166323360d66e3a28c0ccc31f00d7e7a8 /participant/participant-intermediary/src/main | |
parent | a859910c602a5384ddd3e1b85e95bc800c099640 (diff) |
Add multiple messages support in Intermediary
Issue-ID: POLICY-4708
Change-Id: I6401eebb5730dde2c62eabcbbe3b34539238ec04
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant/participant-intermediary/src/main')
6 files changed, 378 insertions, 112 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java index 6ca4d3cc7..48b60dc76 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java @@ -37,7 +37,7 @@ public interface AutomationCompositionElementListener { * @param automationCompositionElementId the ID of the automation composition element * @throws PfModelException in case of a model exception */ - public void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; + void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; /** * Handle an update on a automation composition element. @@ -47,20 +47,20 @@ public interface AutomationCompositionElementListener { * @param properties properties Map * @throws PfModelException from Policy framework */ - public void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties) + void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties) throws PfModelException; - public void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; + void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; - public void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; + void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; - public void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; + void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException; - public void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties) + void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties) throws PfModelException; - public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList) + void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList) throws PfModelException; - public void deprime(UUID compositionId) throws PfModelException; + void deprime(UUID compositionId) throws PfModelException; } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java index 7e713654d..71c9d9abc 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java @@ -24,13 +24,13 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler; import java.util.HashMap; import java.util.List; import java.util.UUID; -import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener; import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher; import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy; import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils; +import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeploy; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeployAck; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionStateChange; @@ -39,7 +39,6 @@ import org.onap.policy.clamp.models.acm.messages.dmaap.participant.PropertiesUpd import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder; import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder; import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver; -import org.onap.policy.models.base.PfModelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -53,7 +52,7 @@ public class AutomationCompositionHandler { private final CacheProvider cacheProvider; private final ParticipantMessagePublisher publisher; - private final AutomationCompositionElementListener listener; + private final ThreadHandler listener; private final AcInstanceStateResolver acInstanceStateResolver; /** @@ -64,7 +63,7 @@ public class AutomationCompositionHandler { * @param listener the ThreadHandler Listener */ public AutomationCompositionHandler(CacheProvider cacheProvider, ParticipantMessagePublisher publisher, - AutomationCompositionElementListener listener) { + ThreadHandler listener) { this.cacheProvider = cacheProvider; this.publisher = publisher; this.listener = listener; @@ -84,17 +83,20 @@ public class AutomationCompositionHandler { var automationComposition = cacheProvider.getAutomationComposition(stateChangeMsg.getAutomationCompositionId()); if (automationComposition == null) { - var automationCompositionAck = - new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK); - automationCompositionAck.setParticipantId(cacheProvider.getParticipantId()); - automationCompositionAck.setMessage("Automation composition " + stateChangeMsg.getAutomationCompositionId() - + " does not use this participant " + cacheProvider.getParticipantId()); - automationCompositionAck.setResult(false); - automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId()); - automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId()); - publisher.sendAutomationCompositionAck(automationCompositionAck); - LOGGER.debug("Automation composition {} does not use this participant", - stateChangeMsg.getAutomationCompositionId()); + if (DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) { + var automationCompositionAck = new AutomationCompositionDeployAck( + ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK); + automationCompositionAck.setParticipantId(cacheProvider.getParticipantId()); + automationCompositionAck.setMessage("Already deleted or never used"); + automationCompositionAck.setResult(true); + automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR); + automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId()); + automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId()); + publisher.sendAutomationCompositionAck(automationCompositionAck); + } else { + LOGGER.debug("Automation composition {} does not use this participant", + stateChangeMsg.getAutomationCompositionId()); + } return; } @@ -106,18 +108,18 @@ public class AutomationCompositionHandler { } if (DeployOrder.NONE.equals(stateChangeMsg.getDeployOrderedState())) { - handleLockOrderState(automationComposition, stateChangeMsg.getLockOrderedState(), - stateChangeMsg.getStartPhase()); + handleLockOrderState(stateChangeMsg.getMessageId(), automationComposition, + stateChangeMsg.getLockOrderedState(), stateChangeMsg.getStartPhase()); } else { - handleDeployOrderState(automationComposition, stateChangeMsg.getDeployOrderedState(), - stateChangeMsg.getStartPhase()); + handleDeployOrderState(stateChangeMsg.getMessageId(), automationComposition, + stateChangeMsg.getDeployOrderedState(), stateChangeMsg.getStartPhase()); } } private boolean checkConsistantOrderState(AutomationComposition automationComposition, DeployOrder deployOrder, LockOrder lockOrder) { if (DeployOrder.UPDATE.equals(deployOrder)) { - return false; + return true; } return acInstanceStateResolver.resolve(deployOrder, lockOrder, automationComposition.getDeployState(), automationComposition.getLockState(), automationComposition.getStateChangeResult()) != null; @@ -126,19 +128,20 @@ public class AutomationCompositionHandler { /** * Method to handle state changes. * + * @param messageId the messageId * @param automationComposition participant response * @param orderedState automation composition ordered state * @param startPhaseMsg startPhase from message */ - private void handleDeployOrderState(final AutomationComposition automationComposition, DeployOrder orderedState, - Integer startPhaseMsg) { + private void handleDeployOrderState(UUID messageId, final AutomationComposition automationComposition, + DeployOrder orderedState, Integer startPhaseMsg) { switch (orderedState) { case UNDEPLOY: - handleUndeployState(automationComposition, startPhaseMsg); + handleUndeployState(messageId, automationComposition, startPhaseMsg); break; case DELETE: - handleDeleteState(automationComposition, startPhaseMsg); + handleDeleteState(messageId, automationComposition, startPhaseMsg); break; default: @@ -150,19 +153,20 @@ public class AutomationCompositionHandler { /** * Method to handle state changes. * + * @param messageId the messageId * @param automationComposition participant response * @param orderedState automation composition ordered state * @param startPhaseMsg startPhase from message */ - private void handleLockOrderState(final AutomationComposition automationComposition, LockOrder orderedState, - Integer startPhaseMsg) { + private void handleLockOrderState(UUID messageId, final AutomationComposition automationComposition, + LockOrder orderedState, Integer startPhaseMsg) { switch (orderedState) { case LOCK: - handleLockState(automationComposition, startPhaseMsg); + handleLockState(messageId, automationComposition, startPhaseMsg); break; case UNLOCK: - handleUnlockState(automationComposition, startPhaseMsg); + handleUnlockState(messageId, automationComposition, startPhaseMsg); break; default: LOGGER.error("StateChange message has no state, state is null {}", automationComposition.getKey()); @@ -188,7 +192,7 @@ public class AutomationCompositionHandler { updateExistingElementsOnThisParticipant(updateMsg.getAutomationCompositionId(), participantDeploy); - callParticipantUpdateProperty(participantDeploy.getAcElementList(), + callParticipantUpdateProperty(updateMsg.getMessageId(), participantDeploy.getAcElementList(), updateMsg.getAutomationCompositionId()); } } @@ -212,35 +216,28 @@ public class AutomationCompositionHandler { cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(), deployMsg.getAutomationCompositionId(), participantDeploy); } - callParticipanDeploy(participantDeploy.getAcElementList(), deployMsg.getStartPhase(), - deployMsg.getAutomationCompositionId()); + callParticipanDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(), + deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId()); } } } - private void callParticipanDeploy(List<AcElementDeploy> acElements, Integer startPhaseMsg, UUID instanceId) { - try { - for (var element : acElements) { - var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId()); - int startPhase = ParticipantUtils.findStartPhase(commonProperties); - if (startPhaseMsg.equals(startPhase)) { - var map = new HashMap<>(commonProperties); - map.putAll(element.getProperties()); - listener.deploy(instanceId, element, map); - } + private void callParticipanDeploy(UUID messageId, List<AcElementDeploy> acElements, Integer startPhaseMsg, + UUID instanceId) { + for (var element : acElements) { + var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId()); + int startPhase = ParticipantUtils.findStartPhase(commonProperties); + if (startPhaseMsg.equals(startPhase)) { + var map = new HashMap<>(commonProperties); + map.putAll(element.getProperties()); + listener.deploy(messageId, instanceId, element, map); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Deploy failed {}", instanceId); } } - private void callParticipantUpdateProperty(List<AcElementDeploy> acElements, UUID instanceId) { - try { - for (var element : acElements) { - listener.update(instanceId, element, element.getProperties()); - } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element update failed {}", instanceId); + private void callParticipantUpdateProperty(UUID messageId, List<AcElementDeploy> acElements, UUID instanceId) { + for (var element : acElements) { + listener.update(messageId, instanceId, element, element.getProperties()); } } @@ -255,101 +252,86 @@ public class AutomationCompositionHandler { /** * Method to handle when the new state from participant is UNINITIALISED state. * + * @param messageId the messageId * @param automationComposition participant response * @param startPhaseMsg startPhase from message */ - private void handleUndeployState(final AutomationComposition automationComposition, Integer startPhaseMsg) { - try { - for (var acElement : automationComposition.getElements().values()) { - int startPhase = ParticipantUtils.findStartPhase( - cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); - if (startPhaseMsg.equals(startPhase)) { - listener.undeploy(automationComposition.getInstanceId(), acElement.getId()); - } + private void handleUndeployState(UUID messageId, final AutomationComposition automationComposition, + Integer startPhaseMsg) { + for (var acElement : automationComposition.getElements().values()) { + int startPhase = ParticipantUtils.findStartPhase( + cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); + if (startPhaseMsg.equals(startPhase)) { + listener.undeploy(messageId, automationComposition.getInstanceId(), acElement.getId()); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Undeploy failed {}", automationComposition.getInstanceId()); } } - private void handleDeleteState(final AutomationComposition automationComposition, Integer startPhaseMsg) { - try { - for (var acElement : automationComposition.getElements().values()) { - int startPhase = ParticipantUtils.findStartPhase( - cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); - if (startPhaseMsg.equals(startPhase)) { - listener.delete(automationComposition.getInstanceId(), acElement.getId()); - } + private void handleDeleteState(UUID messageId, final AutomationComposition automationComposition, + Integer startPhaseMsg) { + for (var acElement : automationComposition.getElements().values()) { + int startPhase = ParticipantUtils.findStartPhase( + cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); + if (startPhaseMsg.equals(startPhase)) { + listener.delete(messageId, automationComposition.getInstanceId(), acElement.getId()); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Delete failed {}", automationComposition.getInstanceId()); } } /** * Method to handle when the new state from participant is PASSIVE state. * + * @param messageId the messageId * @param automationComposition participant response * @param startPhaseMsg startPhase from message */ - private void handleLockState(final AutomationComposition automationComposition, Integer startPhaseMsg) { - try { - for (var acElement : automationComposition.getElements().values()) { - int startPhase = ParticipantUtils.findStartPhase( - cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); - if (startPhaseMsg.equals(startPhase)) { - listener.lock(automationComposition.getInstanceId(), acElement.getId()); - } + private void handleLockState(UUID messageId, final AutomationComposition automationComposition, + Integer startPhaseMsg) { + for (var acElement : automationComposition.getElements().values()) { + int startPhase = ParticipantUtils.findStartPhase( + cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); + if (startPhaseMsg.equals(startPhase)) { + listener.lock(messageId, automationComposition.getInstanceId(), acElement.getId()); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Lock failed {}", automationComposition.getInstanceId()); } } /** * Method to handle when the new state from participant is RUNNING state. * + * @param messageId the messageId * @param automationComposition participant response * @param startPhaseMsg startPhase from message */ - private void handleUnlockState(final AutomationComposition automationComposition, Integer startPhaseMsg) { - try { - for (var acElement : automationComposition.getElements().values()) { - int startPhase = ParticipantUtils.findStartPhase( - cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); - if (startPhaseMsg.equals(startPhase)) { - listener.unlock(automationComposition.getInstanceId(), acElement.getId()); - } + private void handleUnlockState(UUID messageId, final AutomationComposition automationComposition, + Integer startPhaseMsg) { + for (var acElement : automationComposition.getElements().values()) { + int startPhase = ParticipantUtils.findStartPhase( + cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId())); + if (startPhaseMsg.equals(startPhase)) { + listener.unlock(messageId, automationComposition.getInstanceId(), acElement.getId()); } - } catch (PfModelException e) { - LOGGER.debug("Automation composition element Unlock failed {}", automationComposition.getInstanceId()); } } /** * Handles prime a Composition Definition. * + * @param messageId the messageId * @param compositionId the compositionId * @param list the list of AutomationCompositionElementDefinition */ - public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> list) { - try { - listener.prime(compositionId, list); - } catch (PfModelException e) { - LOGGER.debug("Composition prime failed {}", compositionId); - } + public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) { + listener.prime(messageId, compositionId, list); } /** * Handles deprime a Composition Definition. * + * @param messageId the messageId * @param compositionId the compositionId */ - public void deprime(UUID compositionId) { - try { - listener.deprime(compositionId); - } catch (PfModelException e) { - LOGGER.debug("Composition deprime failed {}", compositionId); - } + public void deprime(UUID messageId, UUID compositionId) { + listener.deprime(messageId, compositionId); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java index 2c34652c8..cfd61c4fe 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java @@ -99,6 +99,7 @@ public class AutomationCompositionOutHandler { new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK); automationCompositionStateChangeAck.setParticipantId(cacheProvider.getParticipantId()); automationCompositionStateChangeAck.setMessage(message); + automationCompositionStateChangeAck.setResponseTo(cacheProvider.getMsgIdentification().get(element.getId())); automationCompositionStateChangeAck.setStateChangeResult(stateChangeResult); automationCompositionStateChangeAck.setAutomationCompositionId(automationCompositionId); automationCompositionStateChangeAck.getAutomationCompositionResultMap().put(element.getId(), @@ -107,6 +108,7 @@ public class AutomationCompositionOutHandler { LOGGER.debug("Automation composition element {} state changed to {}", elementId, deployState); automationCompositionStateChangeAck.setResult(true); publisher.sendAutomationCompositionAck(automationCompositionStateChangeAck); + cacheProvider.getMsgIdentification().remove(element.getId()); } private void handleDeployState(AutomationComposition automationComposition, AutomationCompositionElement element, @@ -213,10 +215,12 @@ public class AutomationCompositionOutHandler { participantPrimeAck.setCompositionId(compositionId); participantPrimeAck.setMessage(message); participantPrimeAck.setResult(true); + participantPrimeAck.setResponseTo(cacheProvider.getMsgIdentification().get(compositionId)); participantPrimeAck.setCompositionState(state); participantPrimeAck.setStateChangeResult(stateChangeResult); participantPrimeAck.setParticipantId(cacheProvider.getParticipantId()); participantPrimeAck.setState(ParticipantState.ON_LINE); publisher.sendParticipantPrimeAck(participantPrimeAck); + cacheProvider.getMsgIdentification().remove(compositionId); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java index 09e75e8d5..119cc11b5 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java @@ -55,6 +55,9 @@ public class CacheProvider { private final Map<UUID, Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition>> acElementsDefinitions = new ConcurrentHashMap<>(); + @Getter + private final Map<UUID, UUID> msgIdentification = new ConcurrentHashMap<>(); + /** * Constructor. * diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java index 0e7e193f6..3a3a0cced 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java @@ -186,11 +186,13 @@ public class ParticipantHandler { } } cacheProvider.addElementDefinition(participantPrimeMsg.getCompositionId(), list); - automationCompositionHandler.prime(participantPrimeMsg.getCompositionId(), list); + automationCompositionHandler.prime(participantPrimeMsg.getMessageId(), + participantPrimeMsg.getCompositionId(), list); } else { // deprime cacheProvider.removeElementDefinition(participantPrimeMsg.getCompositionId()); - automationCompositionHandler.deprime(participantPrimeMsg.getCompositionId()); + automationCompositionHandler.deprime(participantPrimeMsg.getMessageId(), + participantPrimeMsg.getCompositionId()); } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java new file mode 100644 index 000000000..b5866d71f --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java @@ -0,0 +1,275 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.participant.intermediary.handler; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import lombok.RequiredArgsConstructor; +import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener; +import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi; +import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy; +import org.onap.policy.clamp.models.acm.concepts.AcTypeState; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition; +import org.onap.policy.clamp.models.acm.concepts.DeployState; +import org.onap.policy.clamp.models.acm.concepts.LockState; +import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; +import org.onap.policy.models.base.PfModelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class ThreadHandler implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class); + + private final AutomationCompositionElementListener listener; + private final ParticipantIntermediaryApi intermediaryApi; + private final CacheProvider cacheProvider; + + private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>(); + + private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + + /** + * Handle an update on a automation composition element. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param element the information on the automation composition element + * @param properties properties Map + */ + public void deploy(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) { + cleanExecution(element.getId(), messageId); + var result = executor.submit(() -> this.deployProcess(instanceId, element, properties)); + executionMap.put(element.getId(), result); + } + + private void deployProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) { + try { + listener.deploy(instanceId, element, properties); + } catch (PfModelException e) { + LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.UNDEPLOYED, + null, StateChangeResult.FAILED, "Automation composition element deploy failed"); + } + executionMap.remove(element.getId()); + } + + /** + * Handle a automation composition element state change. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param elementId the ID of the automation composition element + */ + public void undeploy(UUID messageId, UUID instanceId, UUID elementId) { + cleanExecution(elementId, messageId); + var result = executor.submit(() -> this.undeployProcess(instanceId, elementId)); + executionMap.put(elementId, result); + } + + private void undeployProcess(UUID instanceId, UUID elementId) { + try { + listener.undeploy(instanceId, elementId); + } catch (PfModelException e) { + LOGGER.error("Automation composition element undeploy failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DEPLOYED, null, + StateChangeResult.FAILED, "Automation composition element undeploy failed"); + } + executionMap.remove(elementId); + } + + /** + * Handle a automation composition element lock. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param elementId the ID of the automation composition element + */ + public void lock(UUID messageId, UUID instanceId, UUID elementId) { + cleanExecution(elementId, messageId); + var result = executor.submit(() -> this.lockProcess(instanceId, elementId)); + executionMap.put(elementId, result); + } + + private void lockProcess(UUID instanceId, UUID elementId) { + try { + listener.lock(instanceId, elementId); + } catch (PfModelException e) { + LOGGER.error("Automation composition element lock failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED, + StateChangeResult.FAILED, "Automation composition element lock failed"); + } + executionMap.remove(elementId); + } + + /** + * Handle a automation composition element unlock. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param elementId the ID of the automation composition element + */ + public void unlock(UUID messageId, UUID instanceId, UUID elementId) { + cleanExecution(elementId, messageId); + var result = executor.submit(() -> this.unlockProcess(instanceId, elementId)); + executionMap.put(elementId, result); + } + + private void unlockProcess(UUID instanceId, UUID elementId) { + try { + listener.unlock(instanceId, elementId); + } catch (PfModelException e) { + LOGGER.error("Automation composition element unlock failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED, + StateChangeResult.FAILED, "Automation composition element unlock failed"); + } + executionMap.remove(elementId); + } + + /** + * Handle a automation composition element delete. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param elementId the ID of the automation composition element + */ + public void delete(UUID messageId, UUID instanceId, UUID elementId) { + cleanExecution(elementId, messageId); + var result = executor.submit(() -> this.deleteProcess(instanceId, elementId)); + executionMap.put(elementId, result); + } + + private void deleteProcess(UUID instanceId, UUID elementId) { + try { + listener.delete(instanceId, elementId); + } catch (PfModelException e) { + LOGGER.error("Automation composition element delete failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.UNDEPLOYED, null, + StateChangeResult.FAILED, "Automation composition element delete failed"); + } + executionMap.remove(elementId); + } + + /** + * Handle a automation composition element properties update. + * + * @param messageId the messageId + * @param instanceId the automationComposition Id + * @param element the information on the automation composition element + * @param properties properties Map + */ + public void update(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) { + cleanExecution(element.getId(), messageId); + var result = executor.submit(() -> this.updateProcess(instanceId, element, properties)); + executionMap.put(element.getId(), result); + } + + private void updateProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) { + try { + listener.update(instanceId, element, properties); + } catch (PfModelException e) { + LOGGER.error("Automation composition element update failed {} {}", instanceId, e.getMessage()); + intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED, + null, StateChangeResult.FAILED, "Automation composition element update failed"); + } + executionMap.remove(element.getId()); + } + + private void cleanExecution(UUID execIdentificationId, UUID messageId) { + var process = executionMap.get(execIdentificationId); + if (process != null) { + if (!process.isDone()) { + process.cancel(true); + } + executionMap.remove(execIdentificationId); + } + cacheProvider.getMsgIdentification().put(execIdentificationId, messageId); + } + + /** + * Handles prime a Composition Definition. + * + * @param messageId the messageId + * @param compositionId the compositionId + * @param list the list of AutomationCompositionElementDefinition + */ + public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) { + cleanExecution(compositionId, messageId); + var result = executor.submit(() -> this.primeProcess(compositionId, list)); + executionMap.put(compositionId, result); + } + + private void primeProcess(UUID compositionId, List<AutomationCompositionElementDefinition> list) { + try { + listener.prime(compositionId, list); + executionMap.remove(compositionId); + } catch (PfModelException e) { + LOGGER.error("Composition Defintion prime failed {} {}", compositionId, e.getMessage()); + intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.FAILED, + "Composition Defintion prime failed"); + } + } + + /** + * Handles deprime a Composition Definition. + * + * @param messageId the messageId + * @param compositionId the compositionId + */ + public void deprime(UUID messageId, UUID compositionId) { + cleanExecution(compositionId, messageId); + var result = executor.submit(() -> this.deprimeProcess(compositionId)); + executionMap.put(compositionId, result); + } + + private void deprimeProcess(UUID compositionId) { + try { + listener.deprime(compositionId); + executionMap.remove(compositionId); + } catch (PfModelException e) { + LOGGER.error("Composition Defintion deprime failed {} {}", compositionId, e.getMessage()); + intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED, + "Composition Defintion deprime failed"); + } + } + + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + executor.shutdown(); + } +} |