From a48f784beca5e7aa189217c52cfa83452cf8fc47 Mon Sep 17 00:00:00 2001 From: FrancescoFioraEst Date: Tue, 4 Jun 2024 09:11:31 +0100 Subject: Add support for unique replica id generation in the participants Issue-ID: POLICY-5032 Change-Id: I9c36b87d1f03f03089d2c62308e0975e24f6e99a Signed-off-by: FrancescoFioraEst --- .../messages/kafka/participant/ParticipantAckMessage.java | 12 +++++++++--- .../acm/messages/kafka/participant/ParticipantMessage.java | 11 ++++++++--- .../kafka/participant/ParticipantAckMessageTest.java | 11 ++++++----- .../messages/kafka/participant/ParticipantMessageTest.java | 11 ++++++----- .../onap/policy/clamp/models/acm/utils/CommonTestData.java | 11 ++++++++++- .../intermediary/handler/AcDefinitionHandler.java | 1 + .../intermediary/handler/AutomationCompositionHandler.java | 1 + .../handler/AutomationCompositionOutHandler.java | 3 +++ .../acm/participant/intermediary/handler/CacheProvider.java | 4 ++++ .../participant/intermediary/handler/ParticipantHandler.java | 7 +++++-- .../intermediary/handler/ParticipantHandlerTest.java | 1 + .../intermediary/main/parameters/CommonTestData.java | 5 +++++ 12 files changed, 59 insertions(+), 19 deletions(-) diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessage.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessage.java index fa3e3d08e..7ea3310be 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessage.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessage.java @@ -57,6 +57,8 @@ public class ParticipantAckMessage { */ private UUID participantId; + private UUID replicaId; + /** * Participant State, or {@code null} for messages from participants. */ @@ -82,7 +84,9 @@ public class ParticipantAckMessage { this.stateChangeResult = source.stateChangeResult; this.message = source.message; this.messageType = source.messageType; + this.compositionId = source.compositionId; this.participantId = source.participantId; + this.replicaId = source.replicaId; this.state = source.state; } @@ -90,15 +94,17 @@ public class ParticipantAckMessage { * Determines if this message applies to this participant type. * * @param participantId id of the participant to match against + * @param replicaId id of the participant to match against * @return {@code true} if this message applies to this participant, {@code false} otherwise */ - public boolean appliesTo(@NonNull final UUID participantId) { + public boolean appliesTo(@NonNull final UUID participantId, @NonNull final UUID replicaId) { // Broadcast message to all participants - if (this.participantId == null) { + if ((this.participantId == null) + || (participantId.equals(this.participantId) && this.replicaId == null)) { return true; } // Targeted message at this specific participant - return participantId.equals(this.participantId); + return replicaId.equals(this.replicaId); } } diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessage.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessage.java index 304db8eb5..f8aea947b 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessage.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessage.java @@ -51,6 +51,8 @@ public class ParticipantMessage { */ private UUID participantId; + private UUID replicaId; + /** * Automation Composition ID, or {@code null} for messages to participants. */ @@ -75,6 +77,7 @@ public class ParticipantMessage { public ParticipantMessage(final ParticipantMessage source) { this.messageType = source.messageType; this.participantId = source.participantId; + this.replicaId = source.replicaId; this.automationCompositionId = source.automationCompositionId; this.compositionId = source.compositionId; } @@ -83,15 +86,17 @@ public class ParticipantMessage { * Determines if this message applies to this participant type. * * @param participantId id of the participant to match against + * @param replicaId id of the participant to match against * @return {@code true} if this message applies to this participant, {@code false} otherwise */ - public boolean appliesTo(@NonNull final UUID participantId) { + public boolean appliesTo(@NonNull final UUID participantId, @NonNull final UUID replicaId) { // Broadcast message to all participants - if (this.participantId == null) { + if ((this.participantId == null) + || (participantId.equals(this.participantId) && this.replicaId == null)) { return true; } // Targeted message at this specific participant - return participantId.equals(this.participantId); + return replicaId.equals(this.replicaId); } } diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessageTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessageTest.java index b6af01c44..72e4efb49 100644 --- a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessageTest.java +++ b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantAckMessageTest.java @@ -57,7 +57,8 @@ class ParticipantAckMessageTest { @Test void testAppliesTo_NullParticipantId() { message = makeMessage(); - assertThatThrownBy(() -> message.appliesTo(null)).isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> message.appliesTo(UUID.randomUUID(), null)).isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> message.appliesTo(null, UUID.randomUUID())).isInstanceOf(NullPointerException.class); } @Test @@ -65,8 +66,8 @@ class ParticipantAckMessageTest { message = makeMessage(); // ParticipantId matches - assertTrue(message.appliesTo(CommonTestData.getParticipantId())); - assertFalse(message.appliesTo(CommonTestData.getRndParticipantId())); + assertTrue(message.appliesTo(CommonTestData.getParticipantId(), CommonTestData.getReplicaId())); + assertFalse(message.appliesTo(CommonTestData.getRndParticipantId(), CommonTestData.getReplicaId())); } @Test @@ -74,8 +75,8 @@ class ParticipantAckMessageTest { message = makeMessage(); // ParticipantId does not match - assertFalse(message.appliesTo(CommonTestData.getRndParticipantId())); - assertTrue(message.appliesTo(CommonTestData.getParticipantId())); + assertFalse(message.appliesTo(CommonTestData.getRndParticipantId(), CommonTestData.getReplicaId())); + assertTrue(message.appliesTo(CommonTestData.getParticipantId(), CommonTestData.getReplicaId())); } private ParticipantAckMessage makeMessage() { diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageTest.java index 541d8ef31..db31d0f01 100644 --- a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageTest.java +++ b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageTest.java @@ -61,7 +61,8 @@ class ParticipantMessageTest { void testAppliesTo_NullParticipantId() { message = makeMessage(); - assertThatThrownBy(() -> message.appliesTo(null)).isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> message.appliesTo(UUID.randomUUID(), null)).isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> message.appliesTo(null, UUID.randomUUID())).isInstanceOf(NullPointerException.class); } @Test @@ -69,15 +70,15 @@ class ParticipantMessageTest { message = makeMessage(); // ParticipantId matches - assertTrue(message.appliesTo(CommonTestData.getParticipantId())); - assertFalse(message.appliesTo(CommonTestData.getRndParticipantId())); + assertTrue(message.appliesTo(CommonTestData.getParticipantId(), CommonTestData.getReplicaId())); + assertFalse(message.appliesTo(CommonTestData.getRndParticipantId(), CommonTestData.getReplicaId())); } @Test void testAppliesTo_ParticipantIdNoMatch() { message = makeMessage(); - assertFalse(message.appliesTo(CommonTestData.getRndParticipantId())); - assertTrue(message.appliesTo(CommonTestData.getParticipantId())); + assertFalse(message.appliesTo(CommonTestData.getRndParticipantId(), CommonTestData.getReplicaId())); + assertTrue(message.appliesTo(CommonTestData.getParticipantId(), CommonTestData.getReplicaId())); } private ParticipantMessage makeMessage() { diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/utils/CommonTestData.java b/models/src/test/java/org/onap/policy/clamp/models/acm/utils/CommonTestData.java index 131c8eefd..b8075c3ef 100644 --- a/models/src/test/java/org/onap/policy/clamp/models/acm/utils/CommonTestData.java +++ b/models/src/test/java/org/onap/policy/clamp/models/acm/utils/CommonTestData.java @@ -38,9 +38,9 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; public class CommonTestData { public static final UUID PARTCICIPANT_ID = UUID.randomUUID(); + public static final UUID REPLICA_ID = UUID.randomUUID(); private static final StandardYamlCoder YAML_TRANSLATOR = new StandardYamlCoder(); - /** * Returns participantId for test cases. * @@ -50,6 +50,15 @@ public class CommonTestData { return PARTCICIPANT_ID; } + /** + * Returns participantId for test cases. + * + * @return participant Id + */ + public static UUID getReplicaId() { + return REPLICA_ID; + } + /** * Returns participantId for test Jpa cases. * diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java index e1d4b0959..d3ad4cf3e 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java @@ -91,6 +91,7 @@ public class AcDefinitionHandler { participantPrimeAck.setCompositionState(AcTypeState.COMMISSIONED); participantPrimeAck.setStateChangeResult(StateChangeResult.NO_ERROR); participantPrimeAck.setParticipantId(cacheProvider.getParticipantId()); + participantPrimeAck.setReplicaId(cacheProvider.getReplicaId()); participantPrimeAck.setState(ParticipantState.ON_LINE); publisher.sendParticipantPrimeAck(participantPrimeAck); return; diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java index 5c54861f7..a3eafd844 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java @@ -77,6 +77,7 @@ public class AutomationCompositionHandler { var automationCompositionAck = new AutomationCompositionDeployAck( ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK); automationCompositionAck.setParticipantId(cacheProvider.getParticipantId()); + automationCompositionAck.setReplicaId(cacheProvider.getReplicaId()); automationCompositionAck.setMessage("Already deleted or never used"); automationCompositionAck.setResult(true); automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR); diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java index 0ed333ebd..1f4c036e7 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java @@ -103,6 +103,7 @@ public class AutomationCompositionOutHandler { var automationCompositionStateChangeAck = new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK); automationCompositionStateChangeAck.setParticipantId(cacheProvider.getParticipantId()); + automationCompositionStateChangeAck.setReplicaId(cacheProvider.getReplicaId()); automationCompositionStateChangeAck.setMessage(message); automationCompositionStateChangeAck.setResponseTo(cacheProvider.getMsgIdentification().get(element.getId())); automationCompositionStateChangeAck.setStateChangeResult(stateChangeResult); @@ -228,6 +229,7 @@ public class AutomationCompositionOutHandler { participantPrimeAck.setCompositionState(state); participantPrimeAck.setStateChangeResult(stateChangeResult); participantPrimeAck.setParticipantId(cacheProvider.getParticipantId()); + participantPrimeAck.setReplicaId(cacheProvider.getReplicaId()); participantPrimeAck.setState(ParticipantState.ON_LINE); publisher.sendParticipantPrimeAck(participantPrimeAck); cacheProvider.getMsgIdentification().remove(compositionId); @@ -286,6 +288,7 @@ public class AutomationCompositionOutHandler { private ParticipantStatus createParticipantStatus() { var statusMsg = new ParticipantStatus(); statusMsg.setParticipantId(cacheProvider.getParticipantId()); + statusMsg.setReplicaId(cacheProvider.getReplicaId()); statusMsg.setState(ParticipantState.ON_LINE); statusMsg.setParticipantSupportedElementType(cacheProvider.getSupportedAcElementTypes()); return statusMsg; diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java index 343f8a9e8..b85a3c35a 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java @@ -53,6 +53,9 @@ public class CacheProvider { @Setter private boolean registered = false; + @Getter + private final UUID replicaId; + private final List supportedAcElementTypes; @Getter @@ -73,6 +76,7 @@ public class CacheProvider { public CacheProvider(ParticipantParameters parameters) { this.participantId = parameters.getIntermediaryParameters().getParticipantId(); this.supportedAcElementTypes = parameters.getIntermediaryParameters().getParticipantSupportedElementTypes(); + this.replicaId = UUID.randomUUID(); } public List getSupportedAcElementTypes() { diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java index 0865dca8e..54a05912a 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java @@ -126,7 +126,7 @@ public class ParticipantHandler { * @return true if it applies, false otherwise */ public boolean appliesTo(ParticipantMessage participantMsg) { - return participantMsg.appliesTo(cacheProvider.getParticipantId()); + return participantMsg.appliesTo(cacheProvider.getParticipantId(), cacheProvider.getReplicaId()); } /** @@ -136,7 +136,7 @@ public class ParticipantHandler { * @return true if it applies, false otherwise */ public boolean appliesTo(ParticipantAckMessage participantMsg) { - return participantMsg.appliesTo(cacheProvider.getParticipantId()); + return participantMsg.appliesTo(cacheProvider.getParticipantId(), cacheProvider.getReplicaId()); } /** @@ -145,6 +145,7 @@ public class ParticipantHandler { public void sendParticipantRegister() { var participantRegister = new ParticipantRegister(); participantRegister.setParticipantId(cacheProvider.getParticipantId()); + participantRegister.setReplicaId(cacheProvider.getReplicaId()); participantRegister.setParticipantSupportedElementType(cacheProvider.getSupportedAcElementTypes()); publisher.sendParticipantRegister(participantRegister); @@ -169,6 +170,7 @@ public class ParticipantHandler { public void sendParticipantDeregister() { var participantDeregister = new ParticipantDeregister(); participantDeregister.setParticipantId(cacheProvider.getParticipantId()); + participantDeregister.setReplicaId(cacheProvider.getReplicaId()); publisher.sendParticipantDeregister(participantDeregister); } @@ -225,6 +227,7 @@ public class ParticipantHandler { private ParticipantStatus makeHeartbeat() { var heartbeat = new ParticipantStatus(); heartbeat.setParticipantId(cacheProvider.getParticipantId()); + heartbeat.setReplicaId(cacheProvider.getReplicaId()); heartbeat.setState(ParticipantState.ON_LINE); heartbeat.setParticipantSupportedElementType(cacheProvider.getSupportedAcElementTypes()); diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java index cd28d41fb..eb1db475b 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java @@ -125,6 +125,7 @@ class ParticipantHandlerTest { void appliesToTest() { var cacheProvider = mock(CacheProvider.class); when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId()); + when(cacheProvider.getReplicaId()).thenReturn(CommonTestData.getReplicaId()); var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class), mock(AcLockHandler.class), mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class), cacheProvider); diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java index 3011c91f5..1536a0be0 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java @@ -62,6 +62,7 @@ public class CommonTestData { public static final UUID AC_ID_0 = UUID.randomUUID(); public static final UUID AC_ID_1 = UUID.randomUUID(); public static final UUID PARTCICIPANT_ID = UUID.randomUUID(); + public static final UUID REPLICA_ID = UUID.randomUUID(); /** * Get ParticipantIntermediaryParameters. @@ -160,6 +161,10 @@ public class CommonTestData { return PARTCICIPANT_ID; } + public static UUID getReplicaId() { + return REPLICA_ID; + } + public static UUID getRndParticipantId() { return UUID.randomUUID(); } -- cgit 1.2.3-korg