summaryrefslogtreecommitdiffstats
path: root/participant/participant-intermediary/src/main
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2023-06-20 16:02:40 +0100
committerFrancesco Fiora <francesco.fiora@est.tech>2023-06-21 10:44:05 +0000
commit3af93b5db64ea87b16eb25d14bce3c79ee2193de (patch)
treebe4e2fe166323360d66e3a28c0ccc31f00d7e7a8 /participant/participant-intermediary/src/main
parenta859910c602a5384ddd3e1b85e95bc800c099640 (diff)
Add multiple messages support in Intermediary
Issue-ID: POLICY-4708 Change-Id: I6401eebb5730dde2c62eabcbbe3b34539238ec04 Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant/participant-intermediary/src/main')
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java16
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java186
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java4
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java3
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java275
6 files changed, 378 insertions, 112 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
index 6ca4d3cc7..48b60dc76 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
@@ -37,7 +37,7 @@ public interface AutomationCompositionElementListener {
* @param automationCompositionElementId the ID of the automation composition element
* @throws PfModelException in case of a model exception
*/
- public void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+ void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
/**
* Handle an update on a automation composition element.
@@ -47,20 +47,20 @@ public interface AutomationCompositionElementListener {
* @param properties properties Map
* @throws PfModelException from Policy framework
*/
- public void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+ void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
throws PfModelException;
- public void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+ void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
- public void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+ void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
- public void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+ void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
- public void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+ void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
throws PfModelException;
- public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
+ void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
throws PfModelException;
- public void deprime(UUID compositionId) throws PfModelException;
+ void deprime(UUID compositionId) throws PfModelException;
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
index 7e713654d..71c9d9abc 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
@@ -24,13 +24,13 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
-import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy;
import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeploy;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeployAck;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionStateChange;
@@ -39,7 +39,6 @@ import org.onap.policy.clamp.models.acm.messages.dmaap.participant.PropertiesUpd
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver;
-import org.onap.policy.models.base.PfModelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -53,7 +52,7 @@ public class AutomationCompositionHandler {
private final CacheProvider cacheProvider;
private final ParticipantMessagePublisher publisher;
- private final AutomationCompositionElementListener listener;
+ private final ThreadHandler listener;
private final AcInstanceStateResolver acInstanceStateResolver;
/**
@@ -64,7 +63,7 @@ public class AutomationCompositionHandler {
* @param listener the ThreadHandler Listener
*/
public AutomationCompositionHandler(CacheProvider cacheProvider, ParticipantMessagePublisher publisher,
- AutomationCompositionElementListener listener) {
+ ThreadHandler listener) {
this.cacheProvider = cacheProvider;
this.publisher = publisher;
this.listener = listener;
@@ -84,17 +83,20 @@ public class AutomationCompositionHandler {
var automationComposition = cacheProvider.getAutomationComposition(stateChangeMsg.getAutomationCompositionId());
if (automationComposition == null) {
- var automationCompositionAck =
- new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
- automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
- automationCompositionAck.setMessage("Automation composition " + stateChangeMsg.getAutomationCompositionId()
- + " does not use this participant " + cacheProvider.getParticipantId());
- automationCompositionAck.setResult(false);
- automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
- automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
- publisher.sendAutomationCompositionAck(automationCompositionAck);
- LOGGER.debug("Automation composition {} does not use this participant",
- stateChangeMsg.getAutomationCompositionId());
+ if (DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) {
+ var automationCompositionAck = new AutomationCompositionDeployAck(
+ ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
+ automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
+ automationCompositionAck.setMessage("Already deleted or never used");
+ automationCompositionAck.setResult(true);
+ automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR);
+ automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
+ automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
+ publisher.sendAutomationCompositionAck(automationCompositionAck);
+ } else {
+ LOGGER.debug("Automation composition {} does not use this participant",
+ stateChangeMsg.getAutomationCompositionId());
+ }
return;
}
@@ -106,18 +108,18 @@ public class AutomationCompositionHandler {
}
if (DeployOrder.NONE.equals(stateChangeMsg.getDeployOrderedState())) {
- handleLockOrderState(automationComposition, stateChangeMsg.getLockOrderedState(),
- stateChangeMsg.getStartPhase());
+ handleLockOrderState(stateChangeMsg.getMessageId(), automationComposition,
+ stateChangeMsg.getLockOrderedState(), stateChangeMsg.getStartPhase());
} else {
- handleDeployOrderState(automationComposition, stateChangeMsg.getDeployOrderedState(),
- stateChangeMsg.getStartPhase());
+ handleDeployOrderState(stateChangeMsg.getMessageId(), automationComposition,
+ stateChangeMsg.getDeployOrderedState(), stateChangeMsg.getStartPhase());
}
}
private boolean checkConsistantOrderState(AutomationComposition automationComposition, DeployOrder deployOrder,
LockOrder lockOrder) {
if (DeployOrder.UPDATE.equals(deployOrder)) {
- return false;
+ return true;
}
return acInstanceStateResolver.resolve(deployOrder, lockOrder, automationComposition.getDeployState(),
automationComposition.getLockState(), automationComposition.getStateChangeResult()) != null;
@@ -126,19 +128,20 @@ public class AutomationCompositionHandler {
/**
* Method to handle state changes.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param orderedState automation composition ordered state
* @param startPhaseMsg startPhase from message
*/
- private void handleDeployOrderState(final AutomationComposition automationComposition, DeployOrder orderedState,
- Integer startPhaseMsg) {
+ private void handleDeployOrderState(UUID messageId, final AutomationComposition automationComposition,
+ DeployOrder orderedState, Integer startPhaseMsg) {
switch (orderedState) {
case UNDEPLOY:
- handleUndeployState(automationComposition, startPhaseMsg);
+ handleUndeployState(messageId, automationComposition, startPhaseMsg);
break;
case DELETE:
- handleDeleteState(automationComposition, startPhaseMsg);
+ handleDeleteState(messageId, automationComposition, startPhaseMsg);
break;
default:
@@ -150,19 +153,20 @@ public class AutomationCompositionHandler {
/**
* Method to handle state changes.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param orderedState automation composition ordered state
* @param startPhaseMsg startPhase from message
*/
- private void handleLockOrderState(final AutomationComposition automationComposition, LockOrder orderedState,
- Integer startPhaseMsg) {
+ private void handleLockOrderState(UUID messageId, final AutomationComposition automationComposition,
+ LockOrder orderedState, Integer startPhaseMsg) {
switch (orderedState) {
case LOCK:
- handleLockState(automationComposition, startPhaseMsg);
+ handleLockState(messageId, automationComposition, startPhaseMsg);
break;
case UNLOCK:
- handleUnlockState(automationComposition, startPhaseMsg);
+ handleUnlockState(messageId, automationComposition, startPhaseMsg);
break;
default:
LOGGER.error("StateChange message has no state, state is null {}", automationComposition.getKey());
@@ -188,7 +192,7 @@ public class AutomationCompositionHandler {
updateExistingElementsOnThisParticipant(updateMsg.getAutomationCompositionId(), participantDeploy);
- callParticipantUpdateProperty(participantDeploy.getAcElementList(),
+ callParticipantUpdateProperty(updateMsg.getMessageId(), participantDeploy.getAcElementList(),
updateMsg.getAutomationCompositionId());
}
}
@@ -212,35 +216,28 @@ public class AutomationCompositionHandler {
cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(),
deployMsg.getAutomationCompositionId(), participantDeploy);
}
- callParticipanDeploy(participantDeploy.getAcElementList(), deployMsg.getStartPhase(),
- deployMsg.getAutomationCompositionId());
+ callParticipanDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(),
+ deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId());
}
}
}
- private void callParticipanDeploy(List<AcElementDeploy> acElements, Integer startPhaseMsg, UUID instanceId) {
- try {
- for (var element : acElements) {
- var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId());
- int startPhase = ParticipantUtils.findStartPhase(commonProperties);
- if (startPhaseMsg.equals(startPhase)) {
- var map = new HashMap<>(commonProperties);
- map.putAll(element.getProperties());
- listener.deploy(instanceId, element, map);
- }
+ private void callParticipanDeploy(UUID messageId, List<AcElementDeploy> acElements, Integer startPhaseMsg,
+ UUID instanceId) {
+ for (var element : acElements) {
+ var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId());
+ int startPhase = ParticipantUtils.findStartPhase(commonProperties);
+ if (startPhaseMsg.equals(startPhase)) {
+ var map = new HashMap<>(commonProperties);
+ map.putAll(element.getProperties());
+ listener.deploy(messageId, instanceId, element, map);
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Deploy failed {}", instanceId);
}
}
- private void callParticipantUpdateProperty(List<AcElementDeploy> acElements, UUID instanceId) {
- try {
- for (var element : acElements) {
- listener.update(instanceId, element, element.getProperties());
- }
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element update failed {}", instanceId);
+ private void callParticipantUpdateProperty(UUID messageId, List<AcElementDeploy> acElements, UUID instanceId) {
+ for (var element : acElements) {
+ listener.update(messageId, instanceId, element, element.getProperties());
}
}
@@ -255,101 +252,86 @@ public class AutomationCompositionHandler {
/**
* Method to handle when the new state from participant is UNINITIALISED state.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param startPhaseMsg startPhase from message
*/
- private void handleUndeployState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
- try {
- for (var acElement : automationComposition.getElements().values()) {
- int startPhase = ParticipantUtils.findStartPhase(
- cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
- if (startPhaseMsg.equals(startPhase)) {
- listener.undeploy(automationComposition.getInstanceId(), acElement.getId());
- }
+ private void handleUndeployState(UUID messageId, final AutomationComposition automationComposition,
+ Integer startPhaseMsg) {
+ for (var acElement : automationComposition.getElements().values()) {
+ int startPhase = ParticipantUtils.findStartPhase(
+ cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+ if (startPhaseMsg.equals(startPhase)) {
+ listener.undeploy(messageId, automationComposition.getInstanceId(), acElement.getId());
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Undeploy failed {}", automationComposition.getInstanceId());
}
}
- private void handleDeleteState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
- try {
- for (var acElement : automationComposition.getElements().values()) {
- int startPhase = ParticipantUtils.findStartPhase(
- cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
- if (startPhaseMsg.equals(startPhase)) {
- listener.delete(automationComposition.getInstanceId(), acElement.getId());
- }
+ private void handleDeleteState(UUID messageId, final AutomationComposition automationComposition,
+ Integer startPhaseMsg) {
+ for (var acElement : automationComposition.getElements().values()) {
+ int startPhase = ParticipantUtils.findStartPhase(
+ cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+ if (startPhaseMsg.equals(startPhase)) {
+ listener.delete(messageId, automationComposition.getInstanceId(), acElement.getId());
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Delete failed {}", automationComposition.getInstanceId());
}
}
/**
* Method to handle when the new state from participant is PASSIVE state.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param startPhaseMsg startPhase from message
*/
- private void handleLockState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
- try {
- for (var acElement : automationComposition.getElements().values()) {
- int startPhase = ParticipantUtils.findStartPhase(
- cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
- if (startPhaseMsg.equals(startPhase)) {
- listener.lock(automationComposition.getInstanceId(), acElement.getId());
- }
+ private void handleLockState(UUID messageId, final AutomationComposition automationComposition,
+ Integer startPhaseMsg) {
+ for (var acElement : automationComposition.getElements().values()) {
+ int startPhase = ParticipantUtils.findStartPhase(
+ cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+ if (startPhaseMsg.equals(startPhase)) {
+ listener.lock(messageId, automationComposition.getInstanceId(), acElement.getId());
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Lock failed {}", automationComposition.getInstanceId());
}
}
/**
* Method to handle when the new state from participant is RUNNING state.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param startPhaseMsg startPhase from message
*/
- private void handleUnlockState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
- try {
- for (var acElement : automationComposition.getElements().values()) {
- int startPhase = ParticipantUtils.findStartPhase(
- cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
- if (startPhaseMsg.equals(startPhase)) {
- listener.unlock(automationComposition.getInstanceId(), acElement.getId());
- }
+ private void handleUnlockState(UUID messageId, final AutomationComposition automationComposition,
+ Integer startPhaseMsg) {
+ for (var acElement : automationComposition.getElements().values()) {
+ int startPhase = ParticipantUtils.findStartPhase(
+ cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+ if (startPhaseMsg.equals(startPhase)) {
+ listener.unlock(messageId, automationComposition.getInstanceId(), acElement.getId());
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Unlock failed {}", automationComposition.getInstanceId());
}
}
/**
* Handles prime a Composition Definition.
*
+ * @param messageId the messageId
* @param compositionId the compositionId
* @param list the list of AutomationCompositionElementDefinition
*/
- public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
- try {
- listener.prime(compositionId, list);
- } catch (PfModelException e) {
- LOGGER.debug("Composition prime failed {}", compositionId);
- }
+ public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
+ listener.prime(messageId, compositionId, list);
}
/**
* Handles deprime a Composition Definition.
*
+ * @param messageId the messageId
* @param compositionId the compositionId
*/
- public void deprime(UUID compositionId) {
- try {
- listener.deprime(compositionId);
- } catch (PfModelException e) {
- LOGGER.debug("Composition deprime failed {}", compositionId);
- }
+ public void deprime(UUID messageId, UUID compositionId) {
+ listener.deprime(messageId, compositionId);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
index 2c34652c8..cfd61c4fe 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
@@ -99,6 +99,7 @@ public class AutomationCompositionOutHandler {
new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
automationCompositionStateChangeAck.setParticipantId(cacheProvider.getParticipantId());
automationCompositionStateChangeAck.setMessage(message);
+ automationCompositionStateChangeAck.setResponseTo(cacheProvider.getMsgIdentification().get(element.getId()));
automationCompositionStateChangeAck.setStateChangeResult(stateChangeResult);
automationCompositionStateChangeAck.setAutomationCompositionId(automationCompositionId);
automationCompositionStateChangeAck.getAutomationCompositionResultMap().put(element.getId(),
@@ -107,6 +108,7 @@ public class AutomationCompositionOutHandler {
LOGGER.debug("Automation composition element {} state changed to {}", elementId, deployState);
automationCompositionStateChangeAck.setResult(true);
publisher.sendAutomationCompositionAck(automationCompositionStateChangeAck);
+ cacheProvider.getMsgIdentification().remove(element.getId());
}
private void handleDeployState(AutomationComposition automationComposition, AutomationCompositionElement element,
@@ -213,10 +215,12 @@ public class AutomationCompositionOutHandler {
participantPrimeAck.setCompositionId(compositionId);
participantPrimeAck.setMessage(message);
participantPrimeAck.setResult(true);
+ participantPrimeAck.setResponseTo(cacheProvider.getMsgIdentification().get(compositionId));
participantPrimeAck.setCompositionState(state);
participantPrimeAck.setStateChangeResult(stateChangeResult);
participantPrimeAck.setParticipantId(cacheProvider.getParticipantId());
participantPrimeAck.setState(ParticipantState.ON_LINE);
publisher.sendParticipantPrimeAck(participantPrimeAck);
+ cacheProvider.getMsgIdentification().remove(compositionId);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
index 09e75e8d5..119cc11b5 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
@@ -55,6 +55,9 @@ public class CacheProvider {
private final Map<UUID, Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition>> acElementsDefinitions =
new ConcurrentHashMap<>();
+ @Getter
+ private final Map<UUID, UUID> msgIdentification = new ConcurrentHashMap<>();
+
/**
* Constructor.
*
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
index 0e7e193f6..3a3a0cced 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
@@ -186,11 +186,13 @@ public class ParticipantHandler {
}
}
cacheProvider.addElementDefinition(participantPrimeMsg.getCompositionId(), list);
- automationCompositionHandler.prime(participantPrimeMsg.getCompositionId(), list);
+ automationCompositionHandler.prime(participantPrimeMsg.getMessageId(),
+ participantPrimeMsg.getCompositionId(), list);
} else {
// deprime
cacheProvider.removeElementDefinition(participantPrimeMsg.getCompositionId());
- automationCompositionHandler.deprime(participantPrimeMsg.getCompositionId());
+ automationCompositionHandler.deprime(participantPrimeMsg.getMessageId(),
+ participantPrimeMsg.getCompositionId());
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
new file mode 100644
index 000000000..b5866d71f
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
@@ -0,0 +1,275 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import lombok.RequiredArgsConstructor;
+import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
+import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
+import org.onap.policy.models.base.PfModelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class ThreadHandler implements Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
+
+ private final AutomationCompositionElementListener listener;
+ private final ParticipantIntermediaryApi intermediaryApi;
+ private final CacheProvider cacheProvider;
+
+ private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>();
+
+ private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+
+ /**
+ * Handle an update on a automation composition element.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param element the information on the automation composition element
+ * @param properties properties Map
+ */
+ public void deploy(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+ cleanExecution(element.getId(), messageId);
+ var result = executor.submit(() -> this.deployProcess(instanceId, element, properties));
+ executionMap.put(element.getId(), result);
+ }
+
+ private void deployProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+ try {
+ listener.deploy(instanceId, element, properties);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.UNDEPLOYED,
+ null, StateChangeResult.FAILED, "Automation composition element deploy failed");
+ }
+ executionMap.remove(element.getId());
+ }
+
+ /**
+ * Handle a automation composition element state change.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param elementId the ID of the automation composition element
+ */
+ public void undeploy(UUID messageId, UUID instanceId, UUID elementId) {
+ cleanExecution(elementId, messageId);
+ var result = executor.submit(() -> this.undeployProcess(instanceId, elementId));
+ executionMap.put(elementId, result);
+ }
+
+ private void undeployProcess(UUID instanceId, UUID elementId) {
+ try {
+ listener.undeploy(instanceId, elementId);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element undeploy failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DEPLOYED, null,
+ StateChangeResult.FAILED, "Automation composition element undeploy failed");
+ }
+ executionMap.remove(elementId);
+ }
+
+ /**
+ * Handle a automation composition element lock.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param elementId the ID of the automation composition element
+ */
+ public void lock(UUID messageId, UUID instanceId, UUID elementId) {
+ cleanExecution(elementId, messageId);
+ var result = executor.submit(() -> this.lockProcess(instanceId, elementId));
+ executionMap.put(elementId, result);
+ }
+
+ private void lockProcess(UUID instanceId, UUID elementId) {
+ try {
+ listener.lock(instanceId, elementId);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element lock failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
+ StateChangeResult.FAILED, "Automation composition element lock failed");
+ }
+ executionMap.remove(elementId);
+ }
+
+ /**
+ * Handle a automation composition element unlock.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param elementId the ID of the automation composition element
+ */
+ public void unlock(UUID messageId, UUID instanceId, UUID elementId) {
+ cleanExecution(elementId, messageId);
+ var result = executor.submit(() -> this.unlockProcess(instanceId, elementId));
+ executionMap.put(elementId, result);
+ }
+
+ private void unlockProcess(UUID instanceId, UUID elementId) {
+ try {
+ listener.unlock(instanceId, elementId);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element unlock failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
+ StateChangeResult.FAILED, "Automation composition element unlock failed");
+ }
+ executionMap.remove(elementId);
+ }
+
+ /**
+ * Handle a automation composition element delete.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param elementId the ID of the automation composition element
+ */
+ public void delete(UUID messageId, UUID instanceId, UUID elementId) {
+ cleanExecution(elementId, messageId);
+ var result = executor.submit(() -> this.deleteProcess(instanceId, elementId));
+ executionMap.put(elementId, result);
+ }
+
+ private void deleteProcess(UUID instanceId, UUID elementId) {
+ try {
+ listener.delete(instanceId, elementId);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element delete failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.UNDEPLOYED, null,
+ StateChangeResult.FAILED, "Automation composition element delete failed");
+ }
+ executionMap.remove(elementId);
+ }
+
+ /**
+ * Handle a automation composition element properties update.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param element the information on the automation composition element
+ * @param properties properties Map
+ */
+ public void update(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+ cleanExecution(element.getId(), messageId);
+ var result = executor.submit(() -> this.updateProcess(instanceId, element, properties));
+ executionMap.put(element.getId(), result);
+ }
+
+ private void updateProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+ try {
+ listener.update(instanceId, element, properties);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element update failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
+ null, StateChangeResult.FAILED, "Automation composition element update failed");
+ }
+ executionMap.remove(element.getId());
+ }
+
+ private void cleanExecution(UUID execIdentificationId, UUID messageId) {
+ var process = executionMap.get(execIdentificationId);
+ if (process != null) {
+ if (!process.isDone()) {
+ process.cancel(true);
+ }
+ executionMap.remove(execIdentificationId);
+ }
+ cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
+ }
+
+ /**
+ * Handles prime a Composition Definition.
+ *
+ * @param messageId the messageId
+ * @param compositionId the compositionId
+ * @param list the list of AutomationCompositionElementDefinition
+ */
+ public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
+ cleanExecution(compositionId, messageId);
+ var result = executor.submit(() -> this.primeProcess(compositionId, list));
+ executionMap.put(compositionId, result);
+ }
+
+ private void primeProcess(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
+ try {
+ listener.prime(compositionId, list);
+ executionMap.remove(compositionId);
+ } catch (PfModelException e) {
+ LOGGER.error("Composition Defintion prime failed {} {}", compositionId, e.getMessage());
+ intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.FAILED,
+ "Composition Defintion prime failed");
+ }
+ }
+
+ /**
+ * Handles deprime a Composition Definition.
+ *
+ * @param messageId the messageId
+ * @param compositionId the compositionId
+ */
+ public void deprime(UUID messageId, UUID compositionId) {
+ cleanExecution(compositionId, messageId);
+ var result = executor.submit(() -> this.deprimeProcess(compositionId));
+ executionMap.put(compositionId, result);
+ }
+
+ private void deprimeProcess(UUID compositionId) {
+ try {
+ listener.deprime(compositionId);
+ executionMap.remove(compositionId);
+ } catch (PfModelException e) {
+ LOGGER.error("Composition Defintion deprime failed {} {}", compositionId, e.getMessage());
+ intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED,
+ "Composition Defintion deprime failed");
+ }
+ }
+
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ executor.shutdown();
+ }
+}