aboutsummaryrefslogtreecommitdiffstats
path: root/participant/participant-intermediary/src/main/java
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2024-06-19 14:04:57 +0100
committerFrancescoFioraEst <francesco.fiora@est.tech>2024-06-20 11:30:13 +0100
commit0460b264c3b02345cdbb46f05125289897d56304 (patch)
treeccd7ec8da36657237b0a414c416905fe73ea975a /participant/participant-intermediary/src/main/java
parenta1ce07d06745bfe966ffc000ad2be84789a555d3 (diff)
Add sync messages support in ACM-intermediary
Issue-ID: POLICY-5048 Change-Id: I4d3a362251931820e1a481f780586afb9e2c60ed Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant/participant-intermediary/src/main/java')
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java8
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV1.java63
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV2.java40
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AutomationCompositionElementListenerV1.java53
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantRestartListener.java44
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java43
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java21
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java51
9 files changed, 106 insertions, 223 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
index 505f515d7..6f4039254 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
@@ -20,9 +20,6 @@
package org.onap.policy.clamp.acm.participant.intermediary.api;
-import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
-import org.onap.policy.clamp.models.acm.concepts.DeployState;
-import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.onap.policy.models.base.PfModelException;
/**
@@ -89,11 +86,6 @@ public interface AutomationCompositionElementListener {
void deprime(CompositionDto composition) throws PfModelException;
- void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException;
-
- void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
- DeployState deployState, LockState lockState) throws PfModelException;
-
/**
* Handle an update on a automation composition element.
*
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV1.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV1.java
index 5d4e1fe7c..cf5ac419d 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV1.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV1.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
+import jakarta.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -44,9 +45,12 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
* Wrapper of AutomationCompositionElementListener.
* Valid since 7.1.0 release.
*/
-public abstract class AcElementListenerV1 implements AutomationCompositionElementListener {
+public abstract class AcElementListenerV1
+ implements AutomationCompositionElementListener, AutomationCompositionElementListenerV1 {
protected final ParticipantIntermediaryApi intermediaryApi;
+ private static final String NOT_SUPPORTED = "not supported!";
+
protected AcElementListenerV1(ParticipantIntermediaryApi intermediaryApi) {
this.intermediaryApi = intermediaryApi;
}
@@ -64,23 +68,19 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
deploy(instanceElement.instanceId(), element, properties);
}
- public abstract void deploy(UUID instanceId, AcElementDeploy element, Map<String, Object> properties)
- throws PfModelException;
-
@Override
public void undeploy(CompositionElementDto compositionElement, InstanceElementDto instanceElement)
throws PfModelException {
undeploy(instanceElement.instanceId(), instanceElement.elementId());
}
- public abstract void undeploy(UUID instanceId, UUID elementId) throws PfModelException;
-
@Override
public void lock(CompositionElementDto compositionElement, InstanceElementDto instanceElement)
throws PfModelException {
lock(instanceElement.instanceId(), instanceElement.elementId());
}
+ @Override
public void lock(UUID instanceId, UUID elementId) throws PfModelException {
intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
StateChangeResult.NO_ERROR, "Locked");
@@ -92,6 +92,7 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
unlock(instanceElement.instanceId(), instanceElement.elementId());
}
+ @Override
public void unlock(UUID instanceId, UUID elementId) throws PfModelException {
intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
StateChangeResult.NO_ERROR, "Unlocked");
@@ -103,6 +104,7 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
delete(instanceElement.instanceId(), instanceElement.elementId());
}
+ @Override
public void delete(UUID instanceId, UUID elementId) throws PfModelException {
intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DELETED, null,
StateChangeResult.NO_ERROR, "Deleted");
@@ -150,6 +152,7 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
prime(composition.compositionId(), createAcElementDefinitionList(composition));
}
+ @Override
public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
throws PfModelException {
intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Primed");
@@ -160,14 +163,14 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
deprime(composition.compositionId());
}
+ @Override
public void deprime(UUID compositionId) throws PfModelException {
intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.NO_ERROR,
"Deprimed");
}
- @Override
public void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException {
- handleRestartComposition(composition.compositionId(), createAcElementDefinitionList(composition), state);
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
/**
@@ -180,24 +183,12 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
*/
public void handleRestartComposition(UUID compositionId,
List<AutomationCompositionElementDefinition> elementDefinitionList, AcTypeState state) throws PfModelException {
- switch (state) {
- case PRIMING -> prime(compositionId, elementDefinitionList);
- case DEPRIMING -> deprime(compositionId);
- default ->
- intermediaryApi.updateCompositionState(compositionId, state, StateChangeResult.NO_ERROR, "Restarted");
- }
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
- @Override
public void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
DeployState deployState, LockState lockState) throws PfModelException {
- var element = new AcElementDeploy();
- element.setId(instanceElement.elementId());
- element.setDefinition(compositionElement.elementDefinitionId());
- element.setProperties(instanceElement.inProperties());
- Map<String, Object> properties = new HashMap<>(instanceElement.inProperties());
- properties.putAll(compositionElement.inProperties());
- handleRestartInstance(instanceElement.instanceId(), element, properties, deployState, lockState);
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
/**
@@ -212,33 +203,8 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
*/
public void handleRestartInstance(UUID instanceId, AcElementDeploy element,
Map<String, Object> properties, DeployState deployState, LockState lockState) throws PfModelException {
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
- if (DeployState.DEPLOYING.equals(deployState)) {
- deploy(instanceId, element, properties);
- return;
- }
- if (DeployState.UNDEPLOYING.equals(deployState)) {
- undeploy(instanceId, element.getId());
- return;
- }
- if (DeployState.UPDATING.equals(deployState)) {
- update(instanceId, element, properties);
- return;
- }
- if (DeployState.DELETING.equals(deployState)) {
- delete(instanceId, element.getId());
- return;
- }
- if (LockState.LOCKING.equals(lockState)) {
- lock(instanceId, element.getId());
- return;
- }
- if (LockState.UNLOCKING.equals(lockState)) {
- unlock(instanceId, element.getId());
- return;
- }
- intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(),
- deployState, lockState, StateChangeResult.NO_ERROR, "Restarted");
}
@Override
@@ -252,6 +218,7 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
element.getProperties());
}
+ @Override
public void migrate(UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
Map<String, Object> properties) throws PfModelException {
intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(),
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV2.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV2.java
index daf9d6e71..3fe33191f 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV2.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV2.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
+import jakarta.ws.rs.core.Response;
import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
@@ -38,6 +39,8 @@ import org.onap.policy.models.base.PfModelException;
public abstract class AcElementListenerV2 implements AutomationCompositionElementListener {
protected final ParticipantIntermediaryApi intermediaryApi;
+ private static final String NOT_SUPPORTED = "not supported!";
+
protected AcElementListenerV2(ParticipantIntermediaryApi intermediaryApi) {
this.intermediaryApi = intermediaryApi;
}
@@ -84,46 +87,13 @@ public abstract class AcElementListenerV2 implements AutomationCompositionElemen
StateChangeResult.NO_ERROR, "Deprimed");
}
- @Override
public void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException {
- switch (state) {
- case PRIMING -> prime(composition);
- case DEPRIMING -> deprime(composition);
- default -> intermediaryApi
- .updateCompositionState(composition.compositionId(), state, StateChangeResult.NO_ERROR, "Restarted");
- }
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
- @Override
public void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
DeployState deployState, LockState lockState) throws PfModelException {
-
- if (DeployState.DEPLOYING.equals(deployState)) {
- deploy(compositionElement, instanceElement);
- return;
- }
- if (DeployState.UNDEPLOYING.equals(deployState)) {
- undeploy(compositionElement, instanceElement);
- return;
- }
- if (DeployState.UPDATING.equals(deployState)) {
- update(compositionElement, instanceElement, instanceElement);
- return;
- }
- if (DeployState.DELETING.equals(deployState)) {
- delete(compositionElement, instanceElement);
- return;
- }
- if (LockState.LOCKING.equals(lockState)) {
- lock(compositionElement, instanceElement);
- return;
- }
- if (LockState.UNLOCKING.equals(lockState)) {
- unlock(compositionElement, instanceElement);
- return;
- }
- intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), deployState, lockState, StateChangeResult.NO_ERROR, "Restarted");
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
@Override
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AutomationCompositionElementListenerV1.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AutomationCompositionElementListenerV1.java
new file mode 100644
index 000000000..007ba3d80
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AutomationCompositionElementListenerV1.java
@@ -0,0 +1,53 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
+import org.onap.policy.models.base.PfModelException;
+
+public interface AutomationCompositionElementListenerV1 {
+
+ void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+ void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+ throws PfModelException;
+
+ void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+ void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+ void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+ void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+ throws PfModelException;
+
+ void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
+ throws PfModelException;
+
+ void deprime(UUID compositionId) throws PfModelException;
+
+ void migrate(UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
+ Map<String, Object> properties) throws PfModelException;
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantRestartListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantRestartListener.java
deleted file mode 100644
index fd59b02ee..000000000
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantRestartListener.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.clamp.acm.participant.intermediary.comm;
-
-import org.onap.policy.clamp.acm.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
-import org.springframework.stereotype.Component;
-
-@Component
-public class ParticipantRestartListener extends ParticipantListener<ParticipantRestart> {
-
- /**
- * Constructs the object.
- *
- * @param participantHandler the handler for managing the state of the participant
- */
- public ParticipantRestartListener(ParticipantHandler participantHandler) {
- super(ParticipantRestart.class, participantHandler, participantHandler::handleParticipantRestart);
- }
-
- @Override
- public String getType() {
- return ParticipantMessageType.PARTICIPANT_RESTART.name();
- }
-}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java
index d3ad4cf3e..b38df515a 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java
@@ -34,7 +34,7 @@ import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.springframework.stereotype.Component;
@Component
@@ -107,32 +107,33 @@ public class AcDefinitionHandler {
}
/**
- * Handle a ParticipantRestart message.
+ * Handle a Participant Sync message.
*
- * @param participantRestartMsg the participantRestart message
+ * @param participantSyncMsg the participantRestart message
*/
- public void handleParticipantRestart(ParticipantRestart participantRestartMsg) {
- List<AutomationCompositionElementDefinition> list = new ArrayList<>();
- for (var participantDefinition : participantRestartMsg.getParticipantDefinitionUpdates()) {
- list.addAll(participantDefinition.getAutomationCompositionElementDefinitionList());
+ public void handleParticipantSync(ParticipantSync participantSyncMsg) {
+
+ if (participantSyncMsg.isDelete()) {
+ if (AcTypeState.COMMISSIONED.equals(participantSyncMsg.getState())) {
+ cacheProvider.removeElementDefinition(participantSyncMsg.getCompositionId());
+ }
+ for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
+ cacheProvider.removeAutomationComposition(automationcomposition.getAutomationCompositionId());
+ }
+ return;
}
- if (!AcTypeState.COMMISSIONED.equals(participantRestartMsg.getState())) {
- cacheProvider.addElementDefinition(participantRestartMsg.getCompositionId(), list);
+
+ if (!participantSyncMsg.getParticipantDefinitionUpdates().isEmpty()) {
+ List<AutomationCompositionElementDefinition> list = new ArrayList<>();
+ for (var participantDefinition : participantSyncMsg.getParticipantDefinitionUpdates()) {
+ list.addAll(participantDefinition.getAutomationCompositionElementDefinitionList());
+ }
+ cacheProvider.addElementDefinition(participantSyncMsg.getCompositionId(), list);
}
- for (var automationcomposition : participantRestartMsg.getAutomationcompositionList()) {
+ for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
cacheProvider
- .initializeAutomationComposition(participantRestartMsg.getCompositionId(), automationcomposition);
+ .initializeAutomationComposition(participantSyncMsg.getCompositionId(), automationcomposition);
}
- var inPropertiesMap = list.stream().collect(Collectors.toMap(
- AutomationCompositionElementDefinition::getAcElementDefinitionId,
- el -> el.getAutomationCompositionElementToscaNodeTemplate().getProperties()));
- var outPropertiesMap = list.stream().collect(Collectors.toMap(
- AutomationCompositionElementDefinition::getAcElementDefinitionId,
- AutomationCompositionElementDefinition::getOutProperties));
- var composition =
- new CompositionDto(participantRestartMsg.getCompositionId(), inPropertiesMap, outPropertiesMap);
- listener.restarted(participantRestartMsg.getMessageId(), composition, participantRestartMsg.getState(),
- participantRestartMsg.getAutomationcompositionList());
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
index b85a3c35a..7a00e0892 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
@@ -191,6 +191,9 @@ public class CacheProvider {
ParticipantRestartAc participantRestartAc) {
Map<UUID, AutomationCompositionElement> acElementMap = new LinkedHashMap<>();
for (var element : participantRestartAc.getAcElementList()) {
+ if (!getParticipantId().equals(element.getParticipantId())) {
+ continue;
+ }
var acElement = new AutomationCompositionElement();
acElement.setId(element.getId());
acElement.setParticipantId(getParticipantId());
@@ -201,12 +204,13 @@ public class CacheProvider {
acElement.setUseState(element.getUseState());
acElement.setProperties(element.getProperties());
acElement.setOutProperties(element.getOutProperties());
- acElement.setRestarting(true);
acElementMap.put(element.getId(), acElement);
}
var automationComposition = new AutomationComposition();
automationComposition.setCompositionId(compositionId);
+ automationComposition.setDeployState(participantRestartAc.getDeployState());
+ automationComposition.setLockState(participantRestartAc.getLockState());
automationComposition.setInstanceId(participantRestartAc.getAutomationCompositionId());
automationComposition.setElements(acElementMap);
automationCompositions.put(automationComposition.getInstanceId(), automationComposition);
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
index caa2c5675..5ae8f0422 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
@@ -36,7 +36,6 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMe
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegisterAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
@@ -198,26 +197,18 @@ public class ParticipantHandler {
}
/**
- * Handle a ParticipantRestart message.
- *
- * @param participantRestartMsg the participantRestart message
- */
- @Timed(value = "listener.participant_restart", description = "PARTICIPANT_RESTART messages received")
- public void handleParticipantRestart(ParticipantRestart participantRestartMsg) {
- LOGGER.debug("ParticipantRestart message received for participantId {}",
- participantRestartMsg.getParticipantId());
- acDefinitionHandler.handleParticipantRestart(participantRestartMsg);
- }
-
- /**
* Handle a ParticipantSync message.
*
* @param participantSyncMsg the participantSync message
*/
@Timed(value = "listener.participant_sync_msg", description = "PARTICIPANT_SYNC messages received")
public void handleParticipantSync(ParticipantSync participantSyncMsg) {
- LOGGER.debug("ParticipantSync message received for participantId {}",
- participantSyncMsg.getParticipantId());
+ if (participantSyncMsg.getExcludeReplicas().contains(cacheProvider.getReplicaId())) {
+ LOGGER.debug("Ignore ParticipantSync message {}", participantSyncMsg.getMessageId());
+ return;
+ }
+ LOGGER.debug("ParticipantSync message received for participantId {}", participantSyncMsg.getParticipantId());
+ acDefinitionHandler.handleParticipantSync(participantSyncMsg);
}
/**
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
index 9f3e16777..00e0044b4 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
@@ -23,7 +23,6 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
import io.opentelemetry.context.Context;
import java.io.Closeable;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,7 +38,6 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantInterme
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.LockState;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.models.base.PfModelException;
import org.slf4j.Logger;
@@ -278,55 +276,6 @@ public class ThreadHandler implements Closeable {
}
/**
- * Handles restarted scenario.
- *
- * @param messageId the messageId
- * @param composition the composition
- * @param state the state of the composition
- * @param automationCompositionList list of ParticipantRestartAc
- */
- public void restarted(UUID messageId, CompositionDto composition,
- AcTypeState state, List<ParticipantRestartAc> automationCompositionList) {
- try {
- listener.handleRestartComposition(composition, state);
- } catch (PfModelException e) {
- LOGGER.error("Composition Defintion restarted failed {} {}", composition.compositionId(), e.getMessage());
- intermediaryApi.updateCompositionState(composition.compositionId(), state, StateChangeResult.FAILED,
- "Composition Defintion restarted failed");
- }
-
- for (var automationComposition : automationCompositionList) {
- for (var element : automationComposition.getAcElementList()) {
- var compositionElement = new CompositionElementDto(composition.compositionId(),
- element.getDefinition(), composition.inPropertiesMap().get(element.getDefinition()),
- composition.outPropertiesMap().get(element.getDefinition()));
- var instanceElementDto = new InstanceElementDto(automationComposition.getAutomationCompositionId(),
- element.getId(), element.getToscaServiceTemplateFragment(),
- element.getProperties(), element.getOutProperties());
- cleanExecution(element.getId(), messageId);
- var result = executor.submit(() ->
- this.restartedInstanceProcess(compositionElement, instanceElementDto,
- element.getDeployState(), element.getLockState()));
- executionMap.put(element.getId(), result);
- }
- }
- }
-
- private void restartedInstanceProcess(CompositionElementDto compositionElement,
- InstanceElementDto instanceElementDto, DeployState deployState, LockState lockState) {
- try {
- listener.handleRestartInstance(compositionElement, instanceElementDto, deployState, lockState);
- executionMap.remove(instanceElementDto.elementId());
- } catch (PfModelException e) {
- LOGGER.error("Automation composition element deploy failed {} {}",
- instanceElementDto.elementId(), e.getMessage());
- intermediaryApi.updateAutomationCompositionElementState(instanceElementDto.instanceId(),
- instanceElementDto.elementId(), deployState, lockState, StateChangeResult.FAILED,
- "Automation composition element restart failed");
- }
- }
-
- /**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.