From 9cdfa4dc5aadaaf8ec11223c4991b61c0aa6d0b0 Mon Sep 17 00:00:00 2001 From: FrancescoFioraEst Date: Tue, 18 Jun 2024 16:32:30 +0100 Subject: Add support for sync messages in ACM-runtime Issue-ID: POLICY-5035 Change-Id: Ibcf1c6a414a7ba9d1cafd42041551bb0fb198088 Signed-off-by: FrancescoFioraEst --- .../clamp/models/acm/concepts/Participant.java | 8 -- .../models/acm/concepts/ParticipantRestartAc.java | 7 +- .../kafka/participant/ParticipantRestart.java | 2 +- .../acm/persistence/concepts/JpaParticipant.java | 38 +----- .../policy/clamp/models/acm/utils/AcmUtils.java | 69 +++++++++- .../acm/concepts/ParticipantInformationTest.java | 3 - .../clamp/models/acm/concepts/ParticipantTest.java | 10 +- .../persistence/concepts/JpaParticipantTest.java | 39 +----- .../clamp/models/acm/utils/AcmUtilsTest.java | 42 +++++- .../intermediary/handler/ParticipantHandler.java | 1 - .../commissioning/CommissioningProvider.java | 6 +- ...AutomationCompositionInstantiationProvider.java | 10 +- .../participants/AcmParticipantProvider.java | 25 +--- .../runtime/supervision/SupervisionAcHandler.java | 6 + .../acm/runtime/supervision/SupervisionAspect.java | 6 +- .../runtime/supervision/SupervisionHandler.java | 24 ++-- .../supervision/SupervisionPartecipantScanner.java | 82 ------------ .../supervision/SupervisionParticipantHandler.java | 143 +++++++++++++-------- .../supervision/SupervisionParticipantScanner.java | 71 ++++++++++ .../runtime/supervision/SupervisionScanner.java | 12 +- .../comm/ParticipantPrimePublisher.java | 12 +- .../comm/ParticipantRestartPublisher.java | 51 +------- .../comm/ParticipantStatusReqPublisher.java | 2 +- .../supervision/comm/ParticipantSyncPublisher.java | 103 +++++++++++---- .../commissioning/CommissioningProviderTest.java | 18 +-- ...mationCompositionInstantiationProviderTest.java | 44 +++---- .../rest/InstantiationControllerTest.java | 5 +- .../supervision/SupervisionAcHandlerTest.java | 75 +++++++---- .../runtime/supervision/SupervisionAspectTest.java | 10 +- .../supervision/SupervisionHandlerTest.java | 15 +-- .../SupervisionParticipantHandlerTest.java | 92 +++++++++++-- .../SupervisionParticipantScannerTest.java | 20 ++- .../supervision/SupervisionScannerTest.java | 23 ++-- .../supervision/comm/SupervisionMessagesTest.java | 67 ++++++++-- .../clamp/acm/runtime/util/CommonTestData.java | 21 ++- 35 files changed, 683 insertions(+), 479 deletions(-) delete mode 100644 runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java create mode 100644 runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/Participant.java b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/Participant.java index 6ddec6179..457eb65e3 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/Participant.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/Participant.java @@ -40,12 +40,6 @@ public class Participant { @NonNull private UUID participantId; - @NonNull - private ParticipantState participantState = ParticipantState.ON_LINE; - - @NonNull - private String lastMsg; - @NonNull private Map participantSupportedElementTypes = new HashMap<>(); @@ -58,9 +52,7 @@ public class Participant { * @param otherParticipant the participant to copy from */ public Participant(Participant otherParticipant) { - this.participantState = otherParticipant.participantState; this.participantId = otherParticipant.participantId; - this.lastMsg = otherParticipant.lastMsg; this.participantSupportedElementTypes = PfUtils.mapMap(otherParticipant.getParticipantSupportedElementTypes(), ParticipantSupportedElementType::new); this.replicas = PfUtils.mapMap(otherParticipant.replicas, ParticipantReplica::new); diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java index e5f4ad4ae..3312752fa 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation. + * Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,9 @@ public class ParticipantRestartAc { private UUID automationCompositionId; + private DeployState deployState; + private LockState lockState; + private List acElementList = new ArrayList<>(); /** @@ -46,6 +49,8 @@ public class ParticipantRestartAc { */ public ParticipantRestartAc(ParticipantRestartAc copyConstructor) { this.automationCompositionId = copyConstructor.automationCompositionId; + this.deployState = copyConstructor.deployState; + this.lockState = copyConstructor.lockState; this.acElementList = PfUtils.mapList(copyConstructor.acElementList, AcElementRestart::new); } } diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java index 98c7d1071..ff9755ec1 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java @@ -36,7 +36,7 @@ import org.onap.policy.models.base.PfUtils; public class ParticipantRestart extends ParticipantMessage { // composition state - AcTypeState state; + private AcTypeState state; // element definition private List participantDefinitionUpdates = new ArrayList<>(); diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipant.java b/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipant.java index f35fff9e7..5bc2fc4cf 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipant.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipant.java @@ -32,7 +32,6 @@ import jakarta.persistence.JoinColumn; import jakarta.persistence.OneToMany; import jakarta.persistence.Table; import java.io.Serializable; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -44,8 +43,6 @@ import org.apache.commons.lang3.ObjectUtils; import org.hibernate.annotations.LazyCollection; import org.hibernate.annotations.LazyCollectionOption; import org.onap.policy.clamp.models.acm.concepts.Participant; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; -import org.onap.policy.clamp.models.acm.utils.TimestampHelper; import org.onap.policy.common.parameters.annotations.NotNull; import org.onap.policy.common.parameters.annotations.Valid; import org.onap.policy.models.base.PfAuthorative; @@ -69,52 +66,43 @@ public class JpaParticipant extends Validated @NotNull private String participantId; - @Column - @NotNull - private ParticipantState participantState; - @Column private String description; - @Column - @NotNull - private Timestamp lastMsg; - @NotNull @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL) @JoinColumn(name = "participantId", referencedColumnName = "participantId", foreignKey = @ForeignKey(name = "supported_element_fk")) + @SuppressWarnings("squid:S1948") private List<@NotNull @Valid JpaParticipantSupportedElementType> supportedElements; @NotNull - @OneToMany + @OneToMany(fetch = FetchType.LAZY, cascade = CascadeType.ALL) @LazyCollection(LazyCollectionOption.FALSE) @JoinColumn(name = "participantId", referencedColumnName = "participantId", foreignKey = @ForeignKey(name = "participant_replica_fk")) + @SuppressWarnings("squid:S1948") private List<@NotNull @Valid JpaParticipantReplica> replicas; /** * The Default Constructor creates a {@link JpaParticipant} object with a null key. */ public JpaParticipant() { - this(UUID.randomUUID().toString(), ParticipantState.ON_LINE, new ArrayList<>(), new ArrayList<>()); + this(UUID.randomUUID().toString(), new ArrayList<>(), new ArrayList<>()); } /** * The Key Constructor creates a {@link JpaParticipant} object with all mandatory fields. * * @param participantId the participant id - * @param participantState the state of the participant * @param supportedElements the list of supported Element Type * @param replicas the list of replica */ - public JpaParticipant(@NonNull String participantId, @NonNull final ParticipantState participantState, + public JpaParticipant(@NonNull String participantId, @NonNull final List supportedElements, @NonNull final List replicas) { this.participantId = participantId; - this.participantState = participantState; this.supportedElements = supportedElements; - this.lastMsg = TimestampHelper.nowTimestamp(); this.replicas = replicas; } @@ -124,12 +112,10 @@ public class JpaParticipant extends Validated * @param copyConcept the concept to copy from */ public JpaParticipant(@NonNull final JpaParticipant copyConcept) { - this.participantState = copyConcept.participantState; this.description = copyConcept.description; this.participantId = copyConcept.participantId; this.supportedElements = copyConcept.supportedElements; this.replicas = copyConcept.replicas; - this.lastMsg = copyConcept.lastMsg; } /** @@ -145,9 +131,7 @@ public class JpaParticipant extends Validated public Participant toAuthorative() { var participant = new Participant(); - participant.setParticipantState(participantState); participant.setParticipantId(UUID.fromString(participantId)); - participant.setLastMsg(this.lastMsg.toString()); participant.setParticipantSupportedElementTypes(new LinkedHashMap<>(this.supportedElements.size())); for (var element : this.supportedElements) { participant.getParticipantSupportedElementTypes() @@ -161,9 +145,7 @@ public class JpaParticipant extends Validated @Override public void fromAuthorative(@NonNull final Participant participant) { - this.setParticipantState(participant.getParticipantState()); this.participantId = participant.getParticipantId().toString(); - this.lastMsg = TimestampHelper.toTimestamp(participant.getLastMsg()); this.supportedElements = new ArrayList<>(participant.getParticipantSupportedElementTypes().size()); for (var elementEntry : participant.getParticipantSupportedElementTypes().entrySet()) { @@ -196,16 +178,6 @@ public class JpaParticipant extends Validated return result; } - result = lastMsg.compareTo(other.lastMsg); - if (result != 0) { - return result; - } - - result = ObjectUtils.compare(participantState, other.participantState); - if (result != 0) { - return result; - } - return ObjectUtils.compare(description, other.description); } } diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/utils/AcmUtils.java b/models/src/main/java/org/onap/policy/clamp/models/acm/utils/AcmUtils.java index f19d5db8b..f90e5a807 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/utils/AcmUtils.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/utils/AcmUtils.java @@ -42,6 +42,7 @@ import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy; import org.onap.policy.clamp.models.acm.concepts.AcElementRestart; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition; import org.onap.policy.clamp.models.acm.concepts.DeployState; @@ -49,6 +50,7 @@ import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy; +import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder; import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder; import org.onap.policy.clamp.models.acm.persistence.concepts.StringToMapConverter; @@ -115,6 +117,10 @@ public final class AcmUtils { return false; } + public static ToscaConceptIdentifier getType(ToscaNodeTemplate nodeTemplate) { + return new ToscaConceptIdentifier(nodeTemplate.getType(), nodeTemplate.getTypeVersion()); + } + /** * Prepare list of ParticipantDefinition for the Priming message. * @@ -126,8 +132,7 @@ public final class AcmUtils { Map> map = new HashMap<>(); for (var elementEntry : acElements) { - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); + var type = getType(elementEntry.getValue()); var participantId = supportedElementMap.get(type); if (participantId == null) { throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, @@ -431,6 +436,30 @@ public final class AcmUtils { return participantDeploys; } + /** + * Create a new ParticipantRestartAc for restarting scenario. + * + * @param automationComposition the AutomationComposition + * @param participantId the participantId of the participant restarted + * @param serviceTemplateFragment the ToscaServiceTemplate with policies and policy types + * @return the ParticipantRestartAc + */ + public static ParticipantRestartAc createAcRestart(AutomationComposition automationComposition, + UUID participantId, ToscaServiceTemplate serviceTemplateFragment) { + var syncAc = new ParticipantRestartAc(); + syncAc.setDeployState(automationComposition.getDeployState()); + syncAc.setLockState(automationComposition.getLockState()); + syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); + for (var element : automationComposition.getElements().values()) { + if (participantId.equals(element.getParticipantId())) { + var acElementSync = createAcElementRestart(element); + acElementSync.setToscaServiceTemplateFragment(serviceTemplateFragment); + syncAc.getAcElementList().add(acElementSync); + } + } + return syncAc; + } + /** * Create a new AcElementRestart from an AutomationCompositionElement. * @@ -451,6 +480,42 @@ public final class AcmUtils { return acElementRestart; } + /** + * Prepare the list of ParticipantDefinition for Participant Restarting/Sync msg. + * + * @param participantId the participantId + * @param acmDefinition the AutomationCompositionDefinition + * @param toscaElementName the ElementName + * @return List of ParticipantDefinition + */ + public static List prepareParticipantRestarting(UUID participantId, + AutomationCompositionDefinition acmDefinition, String toscaElementName) { + var acElements = extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(), + toscaElementName); + + // list of entry filtered by participantId + List> elementList = new ArrayList<>(); + Map supportedElementMap = new HashMap<>(); + for (var elementEntry : acElements) { + var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); + if (participantId == null || participantId.equals(elementState.getParticipantId())) { + supportedElementMap.put(getType(elementEntry.getValue()), elementState.getParticipantId()); + elementList.add(elementEntry); + } + } + var list = prepareParticipantPriming(elementList, supportedElementMap); + for (var participantDefinition : list) { + for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) { + var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName()); + if (state != null) { + elementDe.setOutProperties(state.getOutProperties()); + } + } + } + return list; + } + + /** * Recursive Merge. * diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantInformationTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantInformationTest.java index a843c8279..1a7a419d6 100644 --- a/models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantInformationTest.java +++ b/models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantInformationTest.java @@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.HashMap; import java.util.UUID; import org.junit.jupiter.api.Test; -import org.onap.policy.clamp.models.acm.utils.TimestampHelper; class ParticipantInformationTest { @@ -33,8 +32,6 @@ class ParticipantInformationTest { void testCopyConstructor() { var participant = new Participant(); participant.setParticipantId(UUID.randomUUID()); - participant.setParticipantState(ParticipantState.ON_LINE); - participant.setLastMsg(TimestampHelper.now()); participant.setParticipantSupportedElementTypes(new HashMap<>()); var participantInfo1 = new ParticipantInformation(); participantInfo1.setParticipant(participant); diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantTest.java index 7486d0d70..2c6c60edc 100644 --- a/models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantTest.java +++ b/models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantTest.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021-2023 Nordix Foundation. + * Copyright (C) 2021-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ package org.onap.policy.clamp.models.acm.concepts; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -46,7 +45,6 @@ class ParticipantTest { var p1 = new Participant(); p1.setParticipantId(CommonTestData.getParticipantId()); - p1.setParticipantState(ParticipantState.ON_LINE); assertThat(p1.toString()).contains("Participant("); assertNotEquals(0, p1.hashCode()); @@ -56,11 +54,6 @@ class ParticipantTest { assertNotEquals(p1, p0); var p2 = new Participant(); - - // @formatter:off - assertThatThrownBy(() -> p2.setParticipantState(null)).isInstanceOf(NullPointerException.class); - // @formatter:on - assertEquals(p2, p0); } @@ -68,7 +61,6 @@ class ParticipantTest { void testCopyConstructor() { var p0 = new Participant(); p0.setParticipantId(UUID.randomUUID()); - p0.setParticipantState(ParticipantState.ON_LINE); var supportedElementType = new ParticipantSupportedElementType(); supportedElementType.setId(UUID.randomUUID()); supportedElementType.setTypeName("type"); diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipantTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipantTest.java index e0f2f55c1..d2d253e06 100644 --- a/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipantTest.java +++ b/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipantTest.java @@ -27,15 +27,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.sql.Timestamp; -import java.time.Instant; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.UUID; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.models.acm.concepts.Participant; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; -import org.onap.policy.clamp.models.acm.utils.TimestampHelper; /** * Test the {@link JpaParticipant} class. @@ -47,30 +43,23 @@ class JpaParticipantTest { @Test void testJpaParticipantConstructor() { assertThatThrownBy(() -> new JpaParticipant((Participant) null)) - .hasMessageMatching("authorativeConcept is marked .*ull but is null"); + .hasMessageMatching("authorativeConcept is marked .*ull but is null"); assertThatThrownBy(() -> new JpaParticipant((JpaParticipant) null)) .hasMessageMatching("copyConcept is marked .*ull but is null"); - assertThatThrownBy(() -> new JpaParticipant(null, ParticipantState.ON_LINE, - new ArrayList<>(), new ArrayList<>())) + assertThatThrownBy(() -> new JpaParticipant(null, new ArrayList<>(), new ArrayList<>())) .hasMessageMatching(NULL_KEY_ERROR); - assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), null, - new ArrayList<>(), new ArrayList<>())) - .hasMessageMatching("participantState is marked .*ull but is null"); - - assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), ParticipantState.ON_LINE, - null, new ArrayList<>())) + assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), null, new ArrayList<>())) .hasMessageMatching("supportedElements is marked .*ull but is null"); - assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), ParticipantState.ON_LINE, - new ArrayList<>(), null)) - .hasMessageMatching("replicas is marked .*ull but is null"); + assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), new ArrayList<>(), null)) + .hasMessageMatching("replicas is marked .*ull but is null"); assertDoesNotThrow(() -> new JpaParticipant()); assertDoesNotThrow(() -> new JpaParticipant(UUID.randomUUID().toString(), - ParticipantState.ON_LINE, new ArrayList<>(), new ArrayList<>())); + new ArrayList<>(), new ArrayList<>())); } @Test @@ -116,18 +105,6 @@ class JpaParticipantTest { assertEquals(0, testJpaParticipant.compareTo(testJpaParticipant)); assertNotEquals(0, testJpaParticipant.compareTo(new DummyJpaParticipantChild())); - testJpaParticipant.setParticipantState(ParticipantState.OFF_LINE); - assertNotEquals(0, testJpaParticipant.compareTo(otherJpaParticipant)); - testJpaParticipant.setParticipantState(ParticipantState.ON_LINE); - assertEquals(0, testJpaParticipant.compareTo(otherJpaParticipant)); - assertEquals(testJpaParticipant, new JpaParticipant(testJpaParticipant)); - - testJpaParticipant.setLastMsg(Timestamp.from(Instant.EPOCH)); - assertNotEquals(0, testJpaParticipant.compareTo(otherJpaParticipant)); - testJpaParticipant.setLastMsg(otherJpaParticipant.getLastMsg()); - assertEquals(0, testJpaParticipant.compareTo(otherJpaParticipant)); - assertEquals(testJpaParticipant, new JpaParticipant(testJpaParticipant)); - var newJpaParticipant = new JpaParticipant(testJpaParticipant); newJpaParticipant.setParticipantId(testJpaParticipant.getParticipantId()); assertEquals(testJpaParticipant, newJpaParticipant); @@ -143,8 +120,6 @@ class JpaParticipantTest { var p1 = new JpaParticipant(); - p1.setParticipantState(ParticipantState.ON_LINE); - assertThat(p1.toString()).contains("Participant("); assertNotEquals(0, p1.hashCode()); assertNotEquals(p1, p0); @@ -154,14 +129,12 @@ class JpaParticipantTest { var p2 = new JpaParticipant(); p2.setParticipantId(p0.getParticipantId()); - p2.setLastMsg(p0.getLastMsg()); assertEquals(p2, p0); } private Participant createParticipantInstance() { var testParticipant = new Participant(); testParticipant.setParticipantId(UUID.randomUUID()); - testParticipant.setLastMsg(TimestampHelper.now()); testParticipant.setParticipantSupportedElementTypes(new LinkedHashMap<>()); testParticipant.setReplicas(new LinkedHashMap<>()); diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/utils/AcmUtilsTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/utils/AcmUtilsTest.java index a5c93e86a..024060f0a 100644 --- a/models/src/test/java/org/onap/policy/clamp/models/acm/utils/AcmUtilsTest.java +++ b/models/src/test/java/org/onap/policy/clamp/models/acm/utils/AcmUtilsTest.java @@ -37,7 +37,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.Test; +import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.LockState; @@ -45,6 +47,7 @@ import org.onap.policy.clamp.models.acm.document.concepts.DocToscaServiceTemplat import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder; import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder; import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.onap.policy.models.tosca.authorative.concepts.ToscaDataType; import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; @@ -276,9 +279,7 @@ class AcmUtilsTest { } private Map getDummyPolicyTypesMap() { - Map policyTypes = new HashMap<>(); - policyTypes.put("onap.policies.Match", new ToscaPolicyType()); - return policyTypes; + return Map.of("onap.policies.Match", new ToscaPolicyType()); } private Map getDummyToscaDataTypeMap() { @@ -290,11 +291,44 @@ class AcmUtilsTest { private Map getDummyNodeTemplates() { Map nodeTemplates = new HashMap<>(); var nodeTemplate = new ToscaNodeTemplate(); - nodeTemplate.setType("org.onap.policy.clamp.acm.AutomationCompositionElement"); + nodeTemplate.setType(AUTOMATION_COMPOSITION_ELEMENT); nodeTemplates.put("org.onap.dcae.acm.DCAEMicroserviceAutomationCompositionParticipant", nodeTemplate); return nodeTemplates; } + @Test + void testcreateAcRestart() { + var automationComposition = getDummyAutomationComposition(); + automationComposition.setInstanceId(UUID.randomUUID()); + var toscaServiceTemplate = getDummyToscaServiceTemplate(); + var participantId = automationComposition.getElements().values().iterator().next().getParticipantId(); + var serviceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(toscaServiceTemplate); + var result = AcmUtils.createAcRestart(automationComposition, participantId, serviceTemplateFragment); + assertEquals(result.getAutomationCompositionId(), automationComposition.getInstanceId()); + assertThat(result.getAcElementList()).hasSize(1); + } + + @Test + void testPrepareParticipantRestarting() { + var serviceTemplate = CommonTestData.getToscaServiceTemplate(TOSCA_TEMPLATE_YAML); + var acmDefinition = new AutomationCompositionDefinition(); + acmDefinition.setElementStateMap(Map.of()); + acmDefinition.setServiceTemplate(serviceTemplate); + var acElements = AcmUtils.extractAcElementsFromServiceTemplate(serviceTemplate, AUTOMATION_COMPOSITION_ELEMENT); + acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.COMMISSIONED)); + acmDefinition.getElementStateMap() + .values().forEach(element -> element.setParticipantId(UUID.randomUUID())); + var participantId = UUID.randomUUID(); + var result = AcmUtils.prepareParticipantRestarting(participantId, acmDefinition, + AUTOMATION_COMPOSITION_ELEMENT); + assertThat(result).isEmpty(); + + participantId = acmDefinition.getElementStateMap().values().iterator().next().getParticipantId(); + result = AcmUtils.prepareParticipantRestarting(participantId, acmDefinition, + AUTOMATION_COMPOSITION_ELEMENT); + assertThat(result).hasSize(1); + } + @Test void testRecursiveMergeMap() { var oldProperties = """ diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java index 7ac58ae6c..caa2c5675 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java @@ -218,7 +218,6 @@ public class ParticipantHandler { public void handleParticipantSync(ParticipantSync participantSyncMsg) { LOGGER.debug("ParticipantSync message received for participantId {}", participantSyncMsg.getParticipantId()); - acDefinitionHandler.handleParticipantRestart(participantSyncMsg); } /** diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java index 74ccb9cc6..8a56fbb1e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java @@ -29,7 +29,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -39,6 +38,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.Commissionin import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.TimestampHelper; import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; @@ -56,7 +56,7 @@ public class CommissioningProvider { private final AcDefinitionProvider acDefinitionProvider; private final AutomationCompositionProvider acProvider; - private final AcmParticipantProvider acmParticipantProvider; + private final ParticipantProvider participantProvider; private final AcTypeStateResolver acTypeStateResolver; private final ParticipantPrimePublisher participantPrimePublisher; private final AcRuntimeParameterGroup acRuntimeParameterGroup; @@ -229,7 +229,7 @@ public class CommissioningProvider { } } if (!participantIds.isEmpty()) { - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); } acmDefinition.setState(AcTypeState.DEPRIMING); acmDefinition.setLastMsg(TimestampHelper.now()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java index 220636b9d..2bf08220a 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; @@ -43,6 +42,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.Instantiatio import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ObjectValidationResult; @@ -69,7 +69,7 @@ public class AutomationCompositionInstantiationProvider { private final AcDefinitionProvider acDefinitionProvider; private final AcInstanceStateResolver acInstanceStateResolver; private final SupervisionAcHandler supervisionAcHandler; - private final AcmParticipantProvider acmParticipantProvider; + private final ParticipantProvider participantProvider; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -265,7 +265,7 @@ public class AutomationCompositionInstantiationProvider { var participantIds = acDefinitionOpt.get().getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); result.addResult(AcmUtils.validateAutomationComposition(automationComposition, acDefinitionOpt.get().getServiceTemplate(), @@ -331,7 +331,7 @@ public class AutomationCompositionInstantiationProvider { var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); var participantIds = acDefinition.getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); supervisionAcHandler.delete(automationComposition, acDefinition); var response = new InstantiationResponse(); response.setInstanceId(automationComposition.getInstanceId()); @@ -374,7 +374,7 @@ public class AutomationCompositionInstantiationProvider { var participantIds = acDefinition.getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); var result = acInstanceStateResolver.resolve(acInstanceStateUpdate.getDeployOrder(), acInstanceStateUpdate.getLockOrder(), automationComposition.getDeployState(), automationComposition.getLockState(), automationComposition.getStateChangeResult()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java index 282389a74..62ba7b017 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java @@ -20,12 +20,10 @@ package org.onap.policy.clamp.acm.runtime.participants; -import jakarta.ws.rs.core.Response; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.MapUtils; @@ -33,9 +31,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPu import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.ParticipantInformation; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; -import org.onap.policy.models.base.PfModelRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -94,12 +90,11 @@ public class AcmParticipantProvider { * @param participantId The UUID of the participant to send request to */ public void sendParticipantStatusRequest(UUID participantId) { - var participant = this.participantProvider.getParticipantById(participantId); + // check if participant is present + this.participantProvider.getParticipantById(participantId); LOGGER.debug("Requesting Participant Status Now ParticipantStatusReq"); participantStatusReqPublisher.send(participantId); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.saveParticipant(participant); } /** @@ -110,22 +105,6 @@ public class AcmParticipantProvider { this.participantStatusReqPublisher.send((UUID) null); } - /** - * Verify Participant state. - * - * @param participantIds The list of UUIDs of the participants to get - * @throws PfModelRuntimeException in case the participant is offline - */ - public void verifyParticipantState(Set participantIds) { - for (UUID participantId : participantIds) { - var participant = this.participantProvider.getParticipantById(participantId); - if (! participant.getParticipantState().equals(ParticipantState.ON_LINE)) { - throw new PfModelRuntimeException(Response.Status.CONFLICT, - "Participant: " + participantId + " is OFFLINE"); - } - } - } - private Map getAutomationCompositionElementsForParticipant(UUID participantId) { var automationCompositionElements = participantProvider .getAutomationCompositionElements(participantId); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java index 802c6603b..3e2057ed5 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java @@ -33,6 +33,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -42,6 +43,7 @@ import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck; +import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.slf4j.Logger; @@ -58,12 +60,14 @@ public class SupervisionAcHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAcHandler.class); private final AutomationCompositionProvider automationCompositionProvider; + private final AcDefinitionProvider acDefinitionProvider; // Publishers for participant communication private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher; private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher; private final AcElementPropertiesPublisher acElementPropertiesPublisher; private final AutomationCompositionMigrationPublisher acCompositionMigrationPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; private final ExecutorService executor = Context.taskWrapping(Executors.newFixedThreadPool(1)); @@ -260,6 +264,8 @@ public class SupervisionAcHandler { automationCompositionAckMessage.getStateChangeResult()); if (updated) { automationCompositionProvider.updateAutomationComposition(automationComposition); + var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); + participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), automationComposition); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java index 8f3a4c2eb..9ef979f8e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java @@ -41,9 +41,9 @@ public class SupervisionAspect implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class); private final SupervisionScanner supervisionScanner; - private final SupervisionPartecipantScanner partecipantScanner; + private final SupervisionParticipantScanner participantScanner; - private ThreadPoolExecutor executor = + private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); @Scheduled( @@ -56,7 +56,7 @@ public class SupervisionAspect implements Closeable { private void executeScan() { supervisionScanner.run(); - partecipantScanner.run(); + participantScanner.run(); } /** diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java index 963e4830e..a4e470495 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java @@ -23,9 +23,10 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; import lombok.AllArgsConstructor; -import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; @@ -43,7 +44,7 @@ public class SupervisionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class); private final AcDefinitionProvider acDefinitionProvider; - private final AcRuntimeParameterGroup acRuntimeParameterGroup; + private final ParticipantSyncPublisher participantSyncPublisher; /** * Handle a ParticipantPrimeAck message from a participant. @@ -82,12 +83,7 @@ public class SupervisionHandler { boolean completed = true; boolean restarting = false; for (var element : acDefinition.getElementStateMap().values()) { - if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) { - element.setMessage(participantPrimeAckMessage.getMessage()); - element.setState(participantPrimeAckMessage.getCompositionState()); - element.setRestarting(null); - acDefinitionProvider.updateAcDefinitionElement(element, acDefinition.getCompositionId()); - } + handlePrimeAckElement(participantPrimeAckMessage, element); if (!finalState.equals(element.getState())) { completed = false; } @@ -110,6 +106,18 @@ public class SupervisionHandler { if (toUpdate) { acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(), acDefinition.getStateChangeResult(), acDefinition.getRestarting()); + if (!participantPrimeAckMessage.getParticipantId().equals(participantPrimeAckMessage.getReplicaId())) { + participantSyncPublisher.sendSync(acDefinition, participantPrimeAckMessage.getReplicaId()); + } + } + } + + private void handlePrimeAckElement(ParticipantPrimeAck participantPrimeAckMessage, NodeTemplateState element) { + if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) { + element.setMessage(participantPrimeAckMessage.getMessage()); + element.setState(participantPrimeAckMessage.getCompositionState()); + element.setRestarting(null); + acDefinitionProvider.updateAcDefinitionElement(element, participantPrimeAckMessage.getCompositionId()); } } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java deleted file mode 100644 index b7f0be8eb..000000000 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java +++ /dev/null @@ -1,82 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2023-2024 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.clamp.acm.runtime.supervision; - -import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.models.acm.concepts.Participant; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; -import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; -import org.onap.policy.clamp.models.acm.utils.TimestampHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -/** - * This class is used to scan the automation compositions in the database and check if they are in the correct state. - */ -@Component -public class SupervisionPartecipantScanner { - private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionPartecipantScanner.class); - - private final long maxWaitMs; - - private final ParticipantProvider participantProvider; - - /** - * Constructor for instantiating SupervisionPartecipantScanner. - * - * @param participantProvider the Participant Provider - * @param acRuntimeParameterGroup the parameters for the automation composition runtime - */ - public SupervisionPartecipantScanner(final ParticipantProvider participantProvider, - final AcRuntimeParameterGroup acRuntimeParameterGroup) { - this.participantProvider = participantProvider; - this.maxWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs(); - } - - /** - * Run Scanning. - */ - public void run() { - LOGGER.debug("Scanning participans in the database . . ."); - - for (var participant : participantProvider.getParticipants()) { - scanParticipantStatus(participant); - } - - LOGGER.debug("Participans scan complete . . ."); - } - - private void scanParticipantStatus(Participant participant) { - var id = participant.getParticipantId(); - if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) { - LOGGER.debug("report Participant is still OFF_LINE {}", id); - return; - } - var now = TimestampHelper.nowEpochMilli(); - var lastMsg = TimestampHelper.toEpochMilli(participant.getLastMsg()); - if ((now - lastMsg) > maxWaitMs) { - LOGGER.debug("report Participant OFF_LINE {}", id); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.saveParticipant(participant); - } - } -} diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java index 609e036be..4c8c5815c 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java @@ -21,7 +21,6 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,11 +31,13 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.Participant; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; @@ -65,6 +66,7 @@ public class SupervisionParticipantHandler { private final AutomationCompositionProvider automationCompositionProvider; private final AcDefinitionProvider acDefinitionProvider; private final ParticipantRestartPublisher participantRestartPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -72,21 +74,11 @@ public class SupervisionParticipantHandler { * * @param participantRegisterMsg the ParticipantRegister message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received") public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) { - var participantOpt = participantProvider.findParticipant(participantRegisterMsg.getParticipantId()); - - if (participantOpt.isPresent()) { - var participant = participantOpt.get(); - checkOnline(participant); - handleRestart(participant.getParticipantId()); - } else { - var participant = createParticipant(participantRegisterMsg.getParticipantId(), - listToMap(participantRegisterMsg.getParticipantSupportedElementType())); - participantProvider.saveParticipant(participant); - - } + saveIfNotPresent(participantRegisterMsg.getReplicaId(), + participantRegisterMsg.getParticipantId(), + participantRegisterMsg.getParticipantSupportedElementType(), true); participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(), participantRegisterMsg.getParticipantId()); @@ -97,15 +89,13 @@ public class SupervisionParticipantHandler { * * @param participantDeregisterMsg the ParticipantDeregister message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received") public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) { - var participantOpt = participantProvider.findParticipant(participantDeregisterMsg.getParticipantId()); - - if (participantOpt.isPresent()) { - var participant = participantOpt.get(); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.saveParticipant(participant); + var replicaId = participantDeregisterMsg.getReplicaId() != null + ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId(); + var replicaOpt = participantProvider.findParticipantReplica(replicaId); + if (replicaOpt.isPresent()) { + participantProvider.deleteParticipantReplica(replicaId); } participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId()); @@ -116,32 +106,57 @@ public class SupervisionParticipantHandler { * * @param participantStatusMsg the ParticipantStatus message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received") public void handleParticipantMessage(ParticipantStatus participantStatusMsg) { + saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(), + participantStatusMsg.getParticipantSupportedElementType(), false); - var participantOpt = participantProvider.findParticipant(participantStatusMsg.getParticipantId()); - if (participantOpt.isEmpty()) { - var participant = createParticipant(participantStatusMsg.getParticipantId(), - listToMap(participantStatusMsg.getParticipantSupportedElementType())); - participantProvider.saveParticipant(participant); - } else { - checkOnline(participantOpt.get()); - } if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) { automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList()); } if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty() && participantStatusMsg.getCompositionId() != null) { updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(), - participantStatusMsg.getParticipantDefinitionUpdates()); + participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates()); } } - private void updateAcDefinitionOutProperties(UUID composotionId, List list) { - var acDefinitionOpt = acDefinitionProvider.findAcDefinition(composotionId); + private void saveIfNotPresent(UUID msgReplicaId, UUID participantId, + List participantSupportedElementType, boolean registration) { + var replicaId = msgReplicaId != null ? msgReplicaId : participantId; + var replicaOpt = participantProvider.findParticipantReplica(replicaId); + if (replicaOpt.isPresent()) { + var replica = replicaOpt.get(); + checkOnline(replica); + } else { + var participant = getParticipant(participantId, listToMap(participantSupportedElementType)); + participant.getReplicas().put(replicaId, createReplica(replicaId)); + participantProvider.saveParticipant(participant); + } + if (registration) { + handleRestart(participantId, replicaId); + } + } + + private Participant getParticipant(UUID participantId, + Map participantSupportedElementType) { + var participantOpt = participantProvider.findParticipant(participantId); + return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType)); + } + + private ParticipantReplica createReplica(UUID replicaId) { + var replica = new ParticipantReplica(); + replica.setReplicaId(replicaId); + replica.setParticipantState(ParticipantState.ON_LINE); + replica.setLastMsg(TimestampHelper.now()); + return replica; + + } + + private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List list) { + var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId); if (acDefinitionOpt.isEmpty()) { - LOGGER.error("Ac Definition with id {} not found", composotionId); + LOGGER.error("Ac Definition with id {} not found", compositionId); return; } var acDefinition = acDefinitionOpt.get(); @@ -155,26 +170,32 @@ public class SupervisionParticipantHandler { } acDefinitionProvider.updateAcDefinition(acDefinition, acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName()); + participantSyncPublisher.sendSync(acDefinition, replicaId); } - private void checkOnline(Participant participant) { - if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) { - participant.setParticipantState(ParticipantState.ON_LINE); + private void checkOnline(ParticipantReplica replica) { + if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) { + replica.setParticipantState(ParticipantState.ON_LINE); } - participant.setLastMsg(TimestampHelper.now()); - participantProvider.saveParticipant(participant); + replica.setLastMsg(TimestampHelper.now()); + participantProvider.saveParticipantReplica(replica); } - private void handleRestart(UUID participantId) { + private void handleRestart(UUID participantId, UUID replicaId) { var compositionIds = participantProvider.getCompositionIds(participantId); + var oldParticipant = participantId.equals(replicaId); for (var compositionId : compositionIds) { var acDefinition = acDefinitionProvider.getAcDefinition(compositionId); LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId()); - handleRestart(participantId, acDefinition); + if (oldParticipant) { + handleRestart(participantId, acDefinition); + } else { + handleSyncRestart(participantId, replicaId, acDefinition); + } } } - private void handleRestart(UUID participantId, AutomationCompositionDefinition acDefinition) { + private void handleRestart(final UUID participantId, AutomationCompositionDefinition acDefinition) { if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId()); return; @@ -185,14 +206,6 @@ public class SupervisionParticipantHandler { elementState.setRestarting(true); } } - var automationCompositionList = - automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); - List automationCompositions = new ArrayList<>(); - for (var automationComposition : automationCompositionList) { - if (isAcToBeRestarted(participantId, automationComposition)) { - automationCompositions.add(automationComposition); - } - } // expected final state if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) { acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR); @@ -200,6 +213,11 @@ public class SupervisionParticipantHandler { acDefinition.setRestarting(true); acDefinitionProvider.updateAcDefinition(acDefinition, acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName()); + + var automationCompositionList = + automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); + var automationCompositions = automationCompositionList.stream() + .filter(ac -> isAcToBeRestarted(participantId, ac)).toList(); participantRestartPublisher.send(participantId, acDefinition, automationCompositions); } @@ -222,13 +240,34 @@ public class SupervisionParticipantHandler { return toAdd; } + private void handleSyncRestart(final UUID participantId, UUID replicaId, + AutomationCompositionDefinition acDefinition) { + if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { + LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId()); + return; + } + LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId()); + var automationCompositionList = + automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); + var automationCompositions = automationCompositionList.stream() + .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList(); + participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions); + } + + private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) { + for (var element : automationComposition.getElements().values()) { + if (participantId.equals(element.getParticipantId())) { + return true; + } + } + return false; + } + private Participant createParticipant(UUID participantId, Map participantSupportedElementType) { var participant = new Participant(); participant.setParticipantId(participantId); participant.setParticipantSupportedElementTypes(participantSupportedElementType); - participant.setParticipantState(ParticipantState.ON_LINE); - participant.setLastMsg(TimestampHelper.now()); return participant; } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java new file mode 100644 index 000000000..4ada199b6 --- /dev/null +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java @@ -0,0 +1,71 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2023-2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.runtime.supervision; + +import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; +import org.onap.policy.clamp.models.acm.utils.TimestampHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * This class is used to scan the automation compositions in the database and check if they are in the correct state. + */ +@Component +public class SupervisionParticipantScanner { + private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantScanner.class); + + private final long maxWaitMs; + + private final ParticipantProvider participantProvider; + + /** + * Constructor for instantiating SupervisionParticipantScanner. + * + * @param participantProvider the Participant Provider + * @param acRuntimeParameterGroup the parameters for the automation composition runtime + */ + public SupervisionParticipantScanner(final ParticipantProvider participantProvider, + final AcRuntimeParameterGroup acRuntimeParameterGroup) { + this.participantProvider = participantProvider; + this.maxWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs(); + } + + /** + * Run Scanning. + */ + public void run() { + LOGGER.debug("Scanning participants in the database . . ."); + participantProvider.findReplicasOnLine().forEach(this::scanParticipantReplicaStatus); + LOGGER.debug("Participants scan complete . . ."); + } + + private void scanParticipantReplicaStatus(ParticipantReplica replica) { + var now = TimestampHelper.nowEpochMilli(); + var lastMsg = TimestampHelper.toEpochMilli(replica.getLastMsg()); + if ((now - lastMsg) > maxWaitMs) { + LOGGER.debug("Participant OFF_LINE {}", replica.getReplicaId()); + participantProvider.deleteParticipantReplica(replica.getReplicaId()); + } + } +} diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java index 06d464671..75a2f0540 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -55,6 +56,7 @@ public class SupervisionScanner { private final AcDefinitionProvider acDefinitionProvider; private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher; private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; /** * Constructor for instantiating SupervisionScanner. @@ -69,11 +71,13 @@ public class SupervisionScanner { final AcDefinitionProvider acDefinitionProvider, final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher, final AutomationCompositionDeployPublisher automationCompositionDeployPublisher, + final ParticipantSyncPublisher participantSyncPublisher, final AcRuntimeParameterGroup acRuntimeParameterGroup) { this.automationCompositionProvider = automationCompositionProvider; this.acDefinitionProvider = acDefinitionProvider; this.automationCompositionStateChangePublisher = automationCompositionStateChangePublisher; this.automationCompositionDeployPublisher = automationCompositionDeployPublisher; + this.participantSyncPublisher = participantSyncPublisher; this.maxStatusWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs(); } @@ -118,6 +122,7 @@ public class SupervisionScanner { if (completed) { acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), finalState, StateChangeResult.NO_ERROR, null); + participantSyncPublisher.sendSync(acDefinition, null); } else { handleTimeout(acDefinition); } @@ -132,7 +137,6 @@ public class SupervisionScanner { || StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult())) { LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId()); - // Clear Timeout on automation composition return; } @@ -158,7 +162,7 @@ public class SupervisionScanner { LOGGER.debug("automation composition scan: transition state {} {} completed", automationComposition.getDeployState(), automationComposition.getLockState()); - complete(automationComposition); + complete(automationComposition, serviceTemplate); } else { LOGGER.debug("automation composition scan: transition state {} {} not completed", automationComposition.getDeployState(), automationComposition.getLockState()); @@ -183,7 +187,8 @@ public class SupervisionScanner { } } - private void complete(final AutomationComposition automationComposition) { + private void complete(final AutomationComposition automationComposition, + ToscaServiceTemplate serviceTemplate) { var deployState = automationComposition.getDeployState(); if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) { // migration scenario @@ -201,6 +206,7 @@ public class SupervisionScanner { } else { automationCompositionProvider.updateAutomationComposition(automationComposition); } + participantSyncPublisher.sendSync(serviceTemplate, automationComposition); } private void handleTimeout(AutomationCompositionDefinition acDefinition) { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java index 89763a2b6..b0848bd51 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.UUID; import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; @@ -55,7 +54,6 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher prepareParticipantRestarting(UUID participantId, - AutomationCompositionDefinition acmDefinition) { - var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(), - acRuntimeParameterGroup.getAcmParameters().getToscaElementName()); - - // list of entry filtered by participantId - List> elementList = new ArrayList<>(); - Map supportedElementMap = new HashMap<>(); - for (var elementEntry : acElements) { - var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); - if (participantId.equals(elementState.getParticipantId())) { - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - supportedElementMap.put(type, participantId); - elementList.add(elementEntry); - } - } - var list = AcmUtils.prepareParticipantPriming(elementList, supportedElementMap); - for (var participantDefinition : list) { - for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) { - var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName()); - if (state != null) { - elementDe.setOutProperties(state.getOutProperties()); - } - } - } - return list; - } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java index 76feee72f..2eb434b64 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java @@ -40,7 +40,7 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher< */ @Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published") public void send(UUID participantId) { - ParticipantStatusReq message = new ParticipantStatusReq(); + var message = new ParticipantStatusReq(); message.setParticipantId(participantId); message.setTimestamp(Instant.now()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java index ae7eda1ee..b63bc0a6b 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java @@ -23,69 +23,58 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import io.micrometer.core.annotation.Timed; import java.time.Instant; import java.util.List; -import java.util.Optional; import java.util.UUID; +import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; +import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync; import org.onap.policy.clamp.models.acm.utils.AcmUtils; -import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; - @Component -public class ParticipantSyncPublisher extends ParticipantRestartPublisher { +@AllArgsConstructor +public class ParticipantSyncPublisher extends AbstractParticipantPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class); - private final AcRuntimeParameterGroup acRuntimeParameterGroup; - public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) { - super(acRuntimeParameterGroup); - this.acRuntimeParameterGroup = acRuntimeParameterGroup; - } - - /** - * Send sync msg to Participant. + * Send Restart sync msg to Participant by participantId. * - * @param participantId the ParticipantId + * @param participantId the participantId + * @param replicaId the replicaId * @param acmDefinition the AutomationComposition Definition * @param automationCompositions the list of automationCompositions */ - @Override @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") - public void send(UUID participantId, AutomationCompositionDefinition acmDefinition, + public void sendRestartMsg(UUID participantId, UUID replicaId, AutomationCompositionDefinition acmDefinition, List automationCompositions) { var message = new ParticipantSync(); message.setParticipantId(participantId); + message.setReplicaId(replicaId); + message.setRestarting(true); message.setCompositionId(acmDefinition.getCompositionId()); message.setMessageId(UUID.randomUUID()); message.setTimestamp(Instant.now()); message.setState(acmDefinition.getState()); - message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition)); + message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(participantId, acmDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate()); for (var automationComposition : automationCompositions) { - var syncAc = new ParticipantRestartAc(); - syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); - for (var element : automationComposition.getElements().values()) { - if (participantId.equals(element.getParticipantId())) { - var acElementSync = AcmUtils.createAcElementRestart(element); - acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); - syncAc.getAcElementList().add(acElementSync); - } - } + var syncAc = AcmUtils.createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment); message.getAutomationcompositionList().add(syncAc); } - LOGGER.debug("Participant Sync sent {}", message); + LOGGER.debug("Participant Restarting Sync sent {}", message); super.send(message); } @@ -98,4 +87,64 @@ public class ParticipantSyncPublisher extends ParticipantRestartPublisher { return false; } + /** + * Send AutomationCompositionDefinition sync msg to all Participants. + * + * @param acDefinition the AutomationComposition Definition + * @param excludeReplicaId the replica to be excluded + */ + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void sendSync(AutomationCompositionDefinition acDefinition, UUID excludeReplicaId) { + var message = new ParticipantSync(); + message.setCompositionId(acDefinition.getCompositionId()); + if (excludeReplicaId != null) { + message.getExcludeReplicas().add(excludeReplicaId); + } + message.setState(acDefinition.getState()); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { + message.setDelete(true); + } else { + message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); + } + LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message); + super.send(message); + } + + /** + * Send AutomationComposition sync msg to all Participants. + * + * @param serviceTemplate the ServiceTemplate + * @param automationComposition the automationComposition + */ + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void sendSync(ToscaServiceTemplate serviceTemplate, AutomationComposition automationComposition) { + var message = new ParticipantSync(); + message.setCompositionId(automationComposition.getCompositionId()); + message.setAutomationCompositionId(automationComposition.getInstanceId()); + message.setState(AcTypeState.PRIMED); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + var syncAc = new ParticipantRestartAc(); + syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); + syncAc.setDeployState(automationComposition.getDeployState()); + syncAc.setLockState(automationComposition.getLockState()); + if (DeployState.DELETED.equals(automationComposition.getDeployState())) { + message.setDelete(true); + } else { + var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(serviceTemplate); + for (var element : automationComposition.getElements().values()) { + var acElementSync = AcmUtils.createAcElementRestart(element); + acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); + syncAc.getAcElementList().add(acElementSync); + + } + } + message.getAutomationcompositionList().add(syncAc); + + LOGGER.debug("Participant AutomationComposition Sync sent {}", message); + super.send(message); + } } diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java index 5c26ea3bd..413719110 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021-2023 Nordix Foundation. + * Copyright (C) 2021-2024 Nordix Foundation. * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -36,7 +36,6 @@ import java.util.UUID; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; @@ -47,6 +46,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.PrimeOrder; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; class CommissioningProviderTest { @@ -165,7 +165,7 @@ class CommissioningProviderTest { var participantPrimePublisher = mock(ParticipantPrimePublisher.class); var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class), - mock(AcmParticipantProvider.class), new AcTypeStateResolver(), participantPrimePublisher, + mock(ParticipantProvider.class), new AcTypeStateResolver(), participantPrimePublisher, CommonTestData.getTestParamaterGroup()); var acTypeStateUpdate = new AcTypeStateUpdate(); @@ -184,15 +184,15 @@ class CommissioningProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition); var participantPrimePublisher = mock(ParticipantPrimePublisher.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class), - acmParticipantProvider, new AcTypeStateResolver(), participantPrimePublisher, + participantProvider, new AcTypeStateResolver(), participantPrimePublisher, CommonTestData.getTestParamaterGroup()); var acTypeStateUpdate = new AcTypeStateUpdate(); acTypeStateUpdate.setPrimeOrder(PrimeOrder.DEPRIME); - doNothing().when(acmParticipantProvider).verifyParticipantState(any()); + doNothing().when(participantProvider).verifyParticipantState(any()); provider.compositionDefinitionPriming(compositionId, acTypeStateUpdate); verify(participantPrimePublisher, timeout(1000).times(1)).sendDepriming(compositionId); } @@ -201,7 +201,7 @@ class CommissioningProviderTest { void testBadRequest() { var acProvider = mock(AutomationCompositionProvider.class); var provider = new CommissioningProvider(mock(AcDefinitionProvider.class), acProvider, - mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), + mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), mock(AcRuntimeParameterGroup.class)); var compositionId = UUID.randomUUID(); @@ -225,7 +225,7 @@ class CommissioningProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition); var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class), - mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), + mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), mock(AcRuntimeParameterGroup.class)); assertThatThrownBy(() -> provider.updateCompositionDefinition(compositionId, toscaServiceTemplate)) @@ -245,7 +245,7 @@ class CommissioningProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition); var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class), - mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), + mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class), mock(AcRuntimeParameterGroup.class)); var acTypeStateUpdate = new AcTypeStateUpdate(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java index fbd8330fc..2ee6a152e 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java @@ -37,7 +37,6 @@ import java.util.UUID; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; @@ -52,6 +51,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.persistence.provider.ProviderUtils; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; import org.onap.policy.models.tosca.simple.concepts.JpaToscaServiceTemplate; @@ -101,9 +101,9 @@ class AutomationCompositionInstantiationProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition); var acProvider = mock(AutomationCompositionProvider.class); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, + null, supervisionAcHandler, participantProvider, CommonTestData.getTestParamaterGroup()); var automationCompositionCreate = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Crud"); @@ -141,7 +141,7 @@ class AutomationCompositionInstantiationProviderTest { when(acProvider.deleteAutomationComposition(automationCompositionUpdate.getInstanceId())) .thenReturn(automationCompositionUpdate); - doNothing().when(acmParticipantProvider).verifyParticipantState(any()); + doNothing().when(participantProvider).verifyParticipantState(any()); instantiationProvider.deleteAutomationComposition(automationCompositionCreate.getCompositionId(), automationCompositionCreate.getInstanceId()); @@ -167,9 +167,9 @@ class AutomationCompositionInstantiationProviderTest { when(acProvider.updateAutomationComposition(acmFromDb)).thenReturn(acmFromDb); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, + null, supervisionAcHandler, participantProvider, CommonTestData.getTestParamaterGroup()); var instantiationResponse = instantiationProvider.updateAutomationComposition( automationCompositionUpdate.getCompositionId(), automationCompositionUpdate); @@ -201,7 +201,7 @@ class AutomationCompositionInstantiationProviderTest { var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, mock(AcDefinitionProvider.class), null, - mock(SupervisionAcHandler.class), mock(AcmParticipantProvider.class), + mock(SupervisionAcHandler.class), mock(ParticipantProvider.class), mock(AcRuntimeParameterGroup.class)); var compositionId = automationCompositionUpdate.getCompositionId(); @@ -232,7 +232,7 @@ class AutomationCompositionInstantiationProviderTest { var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, mock(AcDefinitionProvider.class), null, - mock(SupervisionAcHandler.class), mock(AcmParticipantProvider.class), + mock(SupervisionAcHandler.class), mock(ParticipantProvider.class), mock(AcRuntimeParameterGroup.class)); var compositionId = automationCompositionUpdate.getCompositionId(); @@ -273,9 +273,9 @@ class AutomationCompositionInstantiationProviderTest { .thenReturn(automationCompositionUpdate); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, + null, supervisionAcHandler, participantProvider, mock(AcRuntimeParameterGroup.class)); assertThatThrownBy( () -> instantiationProvider.updateAutomationComposition(compositionId, automationCompositionUpdate)) @@ -303,9 +303,9 @@ class AutomationCompositionInstantiationProviderTest { when(acProvider.updateAutomationComposition(automationComposition)).thenReturn(automationComposition); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, new AcRuntimeParameterGroup()); + null, supervisionAcHandler, participantProvider, new AcRuntimeParameterGroup()); assertThatThrownBy(() -> instantiationProvider .updateAutomationComposition(automationComposition.getCompositionId(), automationComposition)) @@ -352,9 +352,9 @@ class AutomationCompositionInstantiationProviderTest { automationComposition.getElements().clear(); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, new AcRuntimeParameterGroup()); + null, supervisionAcHandler, participantProvider, new AcRuntimeParameterGroup()); assertThatThrownBy(() -> instantiationProvider .updateAutomationComposition(automationComposition.getCompositionId(), acMigrate)) @@ -373,11 +373,11 @@ class AutomationCompositionInstantiationProviderTest { when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition); automationComposition.setCompositionId(compositionId); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, supervisionAcHandler, acmParticipantProvider, acRuntimeParameterGroup); + null, supervisionAcHandler, participantProvider, acRuntimeParameterGroup); when(acProvider.getAutomationComposition(automationComposition.getInstanceId())) .thenReturn(automationComposition); @@ -436,10 +436,10 @@ class AutomationCompositionInstantiationProviderTest { var acProvider = mock(AutomationCompositionProvider.class); when(acProvider.createAutomationComposition(automationCompositionCreate)) .thenReturn(automationCompositionCreate); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - null, null, acmParticipantProvider, + null, null, participantProvider, CommonTestData.getTestParamaterGroup()); var instantiationResponse = instantiationProvider.createAutomationComposition( @@ -457,7 +457,7 @@ class AutomationCompositionInstantiationProviderTest { @Test void testCreateAutomationCompositions_CommissionedAcElementNotFound() { var acDefinitionProvider = mock(AcDefinitionProvider.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var compositionId = acDefinition.getCompositionId(); when(acDefinitionProvider.findAcDefinition(compositionId)).thenReturn(Optional.of(acDefinition)); @@ -467,7 +467,7 @@ class AutomationCompositionInstantiationProviderTest { var acProvider = mock(AutomationCompositionProvider.class); var provider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, null, null, - acmParticipantProvider, CommonTestData.getTestParamaterGroup()); + participantProvider, CommonTestData.getTestParamaterGroup()); assertThatThrownBy(() -> provider.createAutomationComposition(compositionId, automationComposition)) .hasMessageMatching(AC_ELEMENT_NAME_NOT_FOUND); @@ -572,9 +572,9 @@ class AutomationCompositionInstantiationProviderTest { when(acProvider.getAutomationComposition(instanceId)).thenReturn(automationComposition); var supervisionAcHandler = mock(SupervisionAcHandler.class); - var acmParticipantProvider = mock(AcmParticipantProvider.class); + var participantProvider = mock(ParticipantProvider.class); var provider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, - new AcInstanceStateResolver(), supervisionAcHandler, acmParticipantProvider, + new AcInstanceStateResolver(), supervisionAcHandler, participantProvider, mock(AcRuntimeParameterGroup.class)); var acInstanceStateUpdate = new AcInstanceStateUpdate(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java index bcfdea1dd..ca58fad51 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java @@ -374,6 +374,9 @@ class InstantiationControllerTest extends CommonRestController { } private void saveDummyParticipantInDb() { - participantProvider.saveParticipant(CommonTestData.createParticipant(CommonTestData.getParticipantId())); + var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); + var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId()); + participant.getReplicas().put(replica.getReplicaId(), replica); + participantProvider.saveParticipant(participant); } } diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java index 8f39c9e2e..0bec9d0ce 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java @@ -39,16 +39,19 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType; +import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; class SupervisionAcHandlerTest { @@ -64,9 +67,14 @@ class SupervisionAcHandlerTest { when(automationCompositionProvider.findAutomationComposition(IDENTIFIER)) .thenReturn(Optional.of(automationComposition)); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var acDefinitionProvider = mock(AcDefinitionProvider.class); + when(acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId())) + .thenReturn(new AutomationCompositionDefinition()); + + var handler = new SupervisionAcHandler(automationCompositionProvider, acDefinitionProvider, mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class), - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var automationCompositionAckMessage = getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK, @@ -100,14 +108,19 @@ class SupervisionAcHandlerTest { when(automationCompositionProvider.findAutomationComposition(IDENTIFIER)) .thenReturn(Optional.of(automationComposition)); + var acDefinitionProvider = mock(AcDefinitionProvider.class); + when(acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId())) + .thenReturn(new AutomationCompositionDefinition()); + var automationCompositionAckMessage = getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_DEPLOY_ACK, automationComposition, DeployState.DEPLOYED, LockState.LOCKED); automationCompositionAckMessage.setParticipantId(CommonTestData.getParticipantId()); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, acDefinitionProvider, mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class), - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage); @@ -142,9 +155,9 @@ class SupervisionAcHandlerTest { var automationCompositionStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), automationCompositionStateChangePublisher, null, - null); + null, mock(ParticipantSyncPublisher.class)); handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage); @@ -156,8 +169,9 @@ class SupervisionAcHandlerTest { void testDeployFailed() { var automationCompositionDeployPublisher = mock(AutomationCompositionDeployPublisher.class); var automationCompositionProvider = mock(AutomationCompositionProvider.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, automationCompositionDeployPublisher, - mock(AutomationCompositionStateChangePublisher.class), mock(AcElementPropertiesPublisher.class), null); + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), + automationCompositionDeployPublisher, mock(AutomationCompositionStateChangePublisher.class), + mock(AcElementPropertiesPublisher.class), null, mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); @@ -174,9 +188,10 @@ class SupervisionAcHandlerTest { void testUndeploy() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -191,9 +206,10 @@ class SupervisionAcHandlerTest { void testUndeployFailed() { var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); var automationCompositionProvider = mock(AutomationCompositionProvider.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); @@ -211,9 +227,10 @@ class SupervisionAcHandlerTest { void testUnlock() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -228,9 +245,10 @@ class SupervisionAcHandlerTest { void testUnlockFailed() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -247,9 +265,10 @@ class SupervisionAcHandlerTest { void testLock() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -264,9 +283,10 @@ class SupervisionAcHandlerTest { void testLockFailed() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher, - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED); var automationComposition = @@ -294,9 +314,10 @@ class SupervisionAcHandlerTest { .setParticipantId(automationComposition.getElements().values().iterator().next().getParticipantId()); automationCompositionAckMessage.setAutomationCompositionId(IDENTIFIER); - var handler = new SupervisionAcHandler(automationCompositionProvider, + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class), - mock(AcElementPropertiesPublisher.class), null); + mock(AcElementPropertiesPublisher.class), null, + mock(ParticipantSyncPublisher.class)); handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage); @@ -308,8 +329,9 @@ class SupervisionAcHandlerTest { void testUpdate() { var acElementPropertiesPublisher = mock(AcElementPropertiesPublisher.class); var handler = new SupervisionAcHandler(mock(AutomationCompositionProvider.class), - mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class), - acElementPropertiesPublisher, null); + mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class), + mock(AutomationCompositionStateChangePublisher.class), acElementPropertiesPublisher, null, + mock(ParticipantSyncPublisher.class)); var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Lock"); handler.update(automationComposition); @@ -320,8 +342,9 @@ class SupervisionAcHandlerTest { void testMigrate() { var automationCompositionProvider = mock(AutomationCompositionProvider.class); var acCompositionMigrationPublisher = mock(AutomationCompositionMigrationPublisher.class); - var handler = new SupervisionAcHandler(automationCompositionProvider, null, null, null, - acCompositionMigrationPublisher); + var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class), + null, null, null, + acCompositionMigrationPublisher, mock(ParticipantSyncPublisher.class)); var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Migrate"); handler.migrate(automationComposition, UUID.randomUUID()); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java index f78344bcb..7a72e0ef5 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java @@ -32,19 +32,19 @@ class SupervisionAspectTest { @Test void testSchedule() throws Exception { var supervisionScanner = mock(SupervisionScanner.class); - var partecipantScanner = mock(SupervisionPartecipantScanner.class); - try (var supervisionAspect = new SupervisionAspect(supervisionScanner, partecipantScanner)) { + var participantScanner = mock(SupervisionParticipantScanner.class); + try (var supervisionAspect = new SupervisionAspect(supervisionScanner, participantScanner)) { supervisionAspect.schedule(); verify(supervisionScanner, timeout(500)).run(); - verify(partecipantScanner, timeout(500)).run(); + verify(participantScanner, timeout(500)).run(); } } @Test void testDoCheck() throws Exception { var supervisionScanner = mock(SupervisionScanner.class); - var partecipantScanner = mock(SupervisionPartecipantScanner.class); - try (var supervisionAspect = new SupervisionAspect(supervisionScanner, partecipantScanner)) { + var participantScanner = mock(SupervisionParticipantScanner.class); + try (var supervisionAspect = new SupervisionAspect(supervisionScanner, participantScanner)) { supervisionAspect.doCheck(); supervisionAspect.doCheck(); verify(supervisionScanner, timeout(500).times(2)).run(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java index 448666f8f..e8be3b6b7 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java @@ -30,7 +30,7 @@ import static org.onap.policy.clamp.acm.runtime.util.CommonTestData.TOSCA_SERVIC import java.util.Optional; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils; -import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; @@ -46,8 +46,7 @@ class SupervisionHandlerTest { participantPrimeAckMessage.setParticipantId(CommonTestData.getParticipantId()); participantPrimeAckMessage.setState(ParticipantState.ON_LINE); var acDefinitionProvider = mock(AcDefinitionProvider.class); - var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class); - var handler = new SupervisionHandler(acDefinitionProvider, acRuntimeParameterGroup); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); @@ -66,9 +65,7 @@ class SupervisionHandlerTest { var acDefinitionProvider = mock(AcDefinitionProvider.class); when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId())) .thenReturn(Optional.of(acDefinition)); - var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class); - - var handler = new SupervisionHandler(acDefinitionProvider, acRuntimeParameterGroup); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); @@ -93,7 +90,7 @@ class SupervisionHandlerTest { when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId())) .thenReturn(Optional.of(acDefinition)); - var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup()); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); @@ -120,7 +117,7 @@ class SupervisionHandlerTest { when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId())) .thenReturn(Optional.of(acDefinition)); - var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup()); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); @@ -150,7 +147,7 @@ class SupervisionHandlerTest { when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId())) .thenReturn(Optional.of(acDefinition)); - var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup()); + var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class)); handler.handleParticipantMessage(participantPrimeAckMessage); verify(acDefinitionProvider).findAcDefinition(any()); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java index e352d2f2a..bebaa3319 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java @@ -23,6 +23,7 @@ package org.onap.policy.clamp.acm.runtime.supervision; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -37,6 +38,7 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -45,6 +47,7 @@ import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo; import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.Participant; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister; @@ -60,25 +63,26 @@ class SupervisionParticipantHandlerTest { @Test void testHandleParticipantDeregister() { - var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); + var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId()); var participantProvider = mock(ParticipantProvider.class); - when(participantProvider.findParticipant(CommonTestData.getParticipantId())) - .thenReturn(Optional.of(participant)); + when(participantProvider.findParticipantReplica(replica.getReplicaId())) + .thenReturn(Optional.of(replica)); var participantDeregisterMessage = new ParticipantDeregister(); participantDeregisterMessage.setMessageId(UUID.randomUUID()); participantDeregisterMessage.setParticipantId(CommonTestData.getParticipantId()); + participantDeregisterMessage.setReplicaId(replica.getReplicaId()); var participantDeregisterAckPublisher = mock(ParticipantDeregisterAckPublisher.class); var handler = new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), participantDeregisterAckPublisher, mock(AutomationCompositionProvider.class), mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); handler.handleParticipantMessage(participantDeregisterMessage); - verify(participantProvider).saveParticipant(any()); + verify(participantProvider).deleteParticipantReplica(CommonTestData.getReplicaId()); verify(participantDeregisterAckPublisher).send(participantDeregisterMessage.getMessageId()); } @@ -95,7 +99,7 @@ class SupervisionParticipantHandlerTest { var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher, mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class), mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); handler.handleParticipantMessage(participantRegisterMessage); verify(participantProvider).saveParticipant(any()); @@ -109,13 +113,18 @@ class SupervisionParticipantHandlerTest { participantRegisterMessage.setMessageId(UUID.randomUUID()); var participantId = CommonTestData.getParticipantId(); participantRegisterMessage.setParticipantId(participantId); + participantRegisterMessage.setReplicaId(participantId); var supportedElementType = CommonTestData.createParticipantSupportedElementType(); participantRegisterMessage.setParticipantSupportedElementType(List.of(supportedElementType)); var participant = new Participant(); + var replica = new ParticipantReplica(); + replica.setReplicaId(participantId); participant.setParticipantId(participantId); + participant.getReplicas().put(replica.getReplicaId(), replica); var participantProvider = mock(ParticipantProvider.class); when(participantProvider.findParticipant(participantId)).thenReturn(Optional.of(participant)); + when(participantProvider.findParticipantReplica(participantId)).thenReturn(Optional.of(replica)); var compositionId = UUID.randomUUID(); var composition2Id = UUID.randomUUID(); when(participantProvider.getCompositionIds(participantId)).thenReturn(Set.of(compositionId, composition2Id)); @@ -145,7 +154,8 @@ class SupervisionParticipantHandlerTest { var participantRestartPublisher = mock(ParticipantRestartPublisher.class); var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher, mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, acDefinitionProvider, - participantRestartPublisher, CommonTestData.getTestParamaterGroup()); + participantRestartPublisher, mock(ParticipantSyncPublisher.class), + CommonTestData.getTestParamaterGroup()); handler.handleParticipantMessage(participantRegisterMessage); verify(participantRegisterAckPublisher).send(participantRegisterMessage.getMessageId(), participantId); @@ -154,6 +164,65 @@ class SupervisionParticipantHandlerTest { verify(participantRestartPublisher).send(any(), any(AutomationCompositionDefinition.class), any()); } + @Test + void testHandleParticipantSyncRestart() { + var participantRegisterMessage = new ParticipantRegister(); + participantRegisterMessage.setMessageId(UUID.randomUUID()); + var participantId = CommonTestData.getParticipantId(); + participantRegisterMessage.setParticipantId(participantId); + var replicaId = CommonTestData.getReplicaId(); + participantRegisterMessage.setReplicaId(replicaId); + var supportedElementType = CommonTestData.createParticipantSupportedElementType(); + participantRegisterMessage.setParticipantSupportedElementType(List.of(supportedElementType)); + + var participant = new Participant(); + var replica = new ParticipantReplica(); + replica.setReplicaId(replicaId); + participant.setParticipantId(participantId); + participant.getReplicas().put(replica.getReplicaId(), replica); + var participantProvider = mock(ParticipantProvider.class); + when(participantProvider.findParticipant(participantId)).thenReturn(Optional.of(participant)); + when(participantProvider.findParticipantReplica(replicaId)).thenReturn(Optional.of(replica)); + var compositionId = UUID.randomUUID(); + var composition2Id = UUID.randomUUID(); + when(participantProvider.getCompositionIds(participantId)).thenReturn(Set.of(compositionId, composition2Id)); + + var acDefinitionProvider = mock(AcDefinitionProvider.class); + var acDefinition = new AutomationCompositionDefinition(); + acDefinition.setState(AcTypeState.COMMISSIONED); + acDefinition.setCompositionId(composition2Id); + when(acDefinitionProvider.getAcDefinition(composition2Id)).thenReturn(acDefinition); + + acDefinition = new AutomationCompositionDefinition(); + acDefinition.setCompositionId(compositionId); + acDefinition.setState(AcTypeState.PRIMED); + var nodeTemplateState = new NodeTemplateState(); + nodeTemplateState.setParticipantId(participantId); + acDefinition.setElementStateMap(Map.of("code", nodeTemplateState)); + when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition); + + var automationComposition = + InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Crud"); + automationComposition.getElements().values().iterator().next().setParticipantId(participantId); + var automationCompositionProvider = mock(AutomationCompositionProvider.class); + when(automationCompositionProvider.getAcInstancesByCompositionId(compositionId)) + .thenReturn(List.of(automationComposition)); + + var participantRegisterAckPublisher = mock(ParticipantRegisterAckPublisher.class); + var participantSyncPublisher = mock(ParticipantSyncPublisher.class); + var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher, + mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, acDefinitionProvider, + mock(ParticipantRestartPublisher.class), participantSyncPublisher, + CommonTestData.getTestParamaterGroup()); + handler.handleParticipantMessage(participantRegisterMessage); + + verify(participantRegisterAckPublisher).send(participantRegisterMessage.getMessageId(), participantId); + verify(acDefinitionProvider, times(0)).updateAcDefinition(any(AutomationCompositionDefinition.class), + eq(CommonTestData.TOSCA_COMP_NAME)); + verify(participantSyncPublisher) + .sendRestartMsg(any(), any(), any(AutomationCompositionDefinition.class), any()); + } + @Test void testHandleParticipantStatus() { var participantStatusMessage = createParticipantStatus(); @@ -165,7 +234,7 @@ class SupervisionParticipantHandlerTest { new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); when(participantProvider.findParticipant(CommonTestData.getParticipantId())) .thenReturn(Optional.of(participant)); @@ -201,7 +270,7 @@ class SupervisionParticipantHandlerTest { new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(ParticipantRestartPublisher.class), - CommonTestData.getTestParamaterGroup()); + mock(ParticipantSyncPublisher.class), CommonTestData.getTestParamaterGroup()); handler.handleParticipantMessage(participantStatusMessage); verify(acDefinitionProvider).updateAcDefinition(acDefinition, CommonTestData.TOSCA_COMP_NAME); @@ -218,7 +287,7 @@ class SupervisionParticipantHandlerTest { new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); handler.handleParticipantMessage(participantStatusMessage); verify(participantProvider).saveParticipant(any()); @@ -236,9 +305,8 @@ class SupervisionParticipantHandlerTest { new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class), mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class), - mock(AcRuntimeParameterGroup.class)); + mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class)); var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); - participant.setParticipantState(ParticipantState.OFF_LINE); when(participantProvider.findParticipant(CommonTestData.getParticipantId())) .thenReturn(Optional.of(participant)); handler.handleParticipantMessage(participantStatusMessage); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java index 690ad9672..0ae1c1a06 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java @@ -29,28 +29,26 @@ import static org.mockito.Mockito.when; import java.util.List; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; class SupervisionParticipantScannerTest { @Test void testScanParticipant() { - var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant"); - acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1); - - var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId()); var participantProvider = mock(ParticipantProvider.class); - when(participantProvider.getParticipants()).thenReturn(List.of(participant)); + var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId()); + when(participantProvider.findReplicasOnLine()).thenReturn(List.of(replica)); - var supervisionScanner = new SupervisionPartecipantScanner(participantProvider, acRuntimeParameterGroup); + var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant"); + var supervisionScanner = new SupervisionParticipantScanner(participantProvider, acRuntimeParameterGroup); - participant.setParticipantState(ParticipantState.OFF_LINE); + acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(100000); supervisionScanner.run(); - verify(participantProvider, times(0)).saveParticipant(any()); + verify(participantProvider, times(0)).saveParticipantReplica(any()); - participant.setParticipantState(ParticipantState.ON_LINE); + acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1); + supervisionScanner = new SupervisionParticipantScanner(participantProvider, acRuntimeParameterGroup); supervisionScanner.run(); - verify(participantProvider, times(1)).saveParticipant(any()); + verify(participantProvider).deleteParticipantReplica(CommonTestData.getReplicaId()); } } diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java index d5163be14..fa5929f0b 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.acm.runtime.util.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; @@ -101,7 +102,7 @@ class SupervisionScannerTest { var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class), - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any(), any()); } @@ -113,7 +114,7 @@ class SupervisionScannerTest { var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class), - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); // Ac Definition in Priming state verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any(), any()); @@ -121,7 +122,7 @@ class SupervisionScannerTest { acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1); supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider, mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class), - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); // set Timeout verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(), @@ -164,7 +165,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); // not in transition supervisionScanner.run(); @@ -192,7 +193,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(automationCompositionProvider).updateAutomationComposition(any(AutomationComposition.class)); @@ -213,7 +214,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(automationCompositionProvider).deleteAutomationComposition(automationComposition.getInstanceId()); @@ -232,7 +233,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class)); @@ -263,7 +264,7 @@ class SupervisionScannerTest { // verify timeout scenario var scannerObj2 = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); automationComposition.setLastMsg(TimestampHelper.now()); @@ -312,7 +313,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); @@ -347,7 +348,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class)); @@ -390,7 +391,7 @@ class SupervisionScannerTest { var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(), automationCompositionStateChangePublisher, automationCompositionDeployPublisher, - acRuntimeParameterGroup); + mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup); supervisionScanner.run(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java index 766380ac4..cab5adf9c 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java @@ -35,7 +35,6 @@ import java.util.UUID; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler; @@ -149,7 +148,7 @@ class SupervisionMessagesTest { @Test void testParticipantPrimePublisherDecommissioning() { var publisher = new ParticipantPrimePublisher(mock(ParticipantProvider.class), - mock(AcmParticipantProvider.class), mock(AcRuntimeParameterGroup.class)); + mock(AcRuntimeParameterGroup.class)); var topicSink = mock(TopicSink.class); publisher.active(topicSink); publisher.sendDepriming(UUID.randomUUID()); @@ -170,8 +169,7 @@ class SupervisionMessagesTest { participantId); var participantProvider = mock(ParticipantProvider.class); when(participantProvider.getSupportedElementMap()).thenReturn(supportedElementMap); - var publisher = new ParticipantPrimePublisher(participantProvider, mock(AcmParticipantProvider.class), - CommonTestData.getTestParamaterGroup()); + var publisher = new ParticipantPrimePublisher(participantProvider, CommonTestData.getTestParamaterGroup()); var topicSink = mock(TopicSink.class); publisher.active(topicSink); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); @@ -262,29 +260,70 @@ class SupervisionMessagesTest { } @Test - void testParticipantSyncPublisher() { + void testParticipantSyncPublisherAutomationComposition() { var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup()); var topicSink = mock(TopicSink.class); publisher.active(topicSink); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); - var acmDefinition = new AutomationCompositionDefinition(); - acmDefinition.setCompositionId(UUID.randomUUID()); - acmDefinition.setServiceTemplate(serviceTemplate); - var acElements = AcmUtils - .extractAcElementsFromServiceTemplate(serviceTemplate, ""); - acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED)); - var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud"); + publisher.sendSync(serviceTemplate, automationComposition); + verify(topicSink).send(anyString()); + } + + @Test + void testParticipantSyncPublisherAcDefinition() { + var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup()); + var topicSink = mock(TopicSink.class); + publisher.active(topicSink); + + var acmDefinition = getAcmDefinition(); + publisher.sendSync(acmDefinition, null); + verify(topicSink).send(anyString()); + } + + @Test + void testParticipantSyncPublisherAcDefinitionCommissioned() { + var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup()); + var topicSink = mock(TopicSink.class); + publisher.active(topicSink); + var acmDefinition = getAcmDefinition(); + acmDefinition.setState(AcTypeState.COMMISSIONED); + publisher.sendSync(acmDefinition, UUID.randomUUID()); + verify(topicSink).send(anyString()); + } + + @Test + void testParticipantSyncPublisherRestart() { + var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup()); + var topicSink = mock(TopicSink.class); + publisher.active(topicSink); + + var automationComposition = + InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud"); var participantId = automationComposition.getElements().values().iterator().next().getParticipantId(); + var acmDefinition = getAcmDefinition(); acmDefinition.getElementStateMap().values().iterator().next().setParticipantId(participantId); - - publisher.send(participantId, acmDefinition, List.of(automationComposition)); + var replicaId = UUID.randomUUID(); + publisher.sendRestartMsg(participantId, replicaId, acmDefinition, List.of(automationComposition)); verify(topicSink).send(anyString()); } + private AutomationCompositionDefinition getAcmDefinition() { + var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); + var acmDefinition = new AutomationCompositionDefinition(); + acmDefinition.setCompositionId(UUID.randomUUID()); + acmDefinition.setState(AcTypeState.PRIMED); + acmDefinition.setServiceTemplate(serviceTemplate); + var acElements = AcmUtils + .extractAcElementsFromServiceTemplate(serviceTemplate, TOSCA_ELEMENT_NAME); + acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED)); + acmDefinition.getElementStateMap().values().forEach(element -> element.setParticipantId(UUID.randomUUID())); + return acmDefinition; + } + @Test void testParticipantRegisterListener() { final var participantRegister = new ParticipantRegister(); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java index e031e0f5a..c3b5ff919 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java @@ -28,6 +28,7 @@ import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeEx import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.Participant; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType; import org.onap.policy.clamp.models.acm.utils.AcmUtils; @@ -88,11 +89,23 @@ public class CommonTestData { public static Participant createParticipant(UUID participantId) { var participant = new Participant(); participant.setParticipantId(participantId); - participant.setParticipantState(ParticipantState.ON_LINE); - participant.setLastMsg(TimestampHelper.now()); return participant; } + /** + * Create a new ParticipantReplica. + * + * @param replicaId the replica id + * @return a new ParticipantReplica + */ + public static ParticipantReplica createParticipantReplica(UUID replicaId) { + var replica = new ParticipantReplica(); + replica.setReplicaId(replicaId); + replica.setParticipantState(ParticipantState.ON_LINE); + replica.setLastMsg(TimestampHelper.now()); + return replica; + } + /** * Create a new ParticipantSupportedElementType. * @@ -105,6 +118,10 @@ public class CommonTestData { return supportedElementType; } + public static UUID getReplicaId() { + return UUID.fromString("201c62b3-8918-41b9-a747-d21eb79c6c09"); + } + public static UUID getParticipantId() { return UUID.fromString("101c62b3-8918-41b9-a747-d21eb79c6c03"); } -- cgit 1.2.3-korg