diff options
8 files changed, 45 insertions, 34 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java index 926de854b..8fa2ec366 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java @@ -126,18 +126,6 @@ public class ParticipantMessagePublisher implements Publisher { LOGGER.debug("Sent AutomationComposition Update/StateChange Ack to runtime - {}", automationCompositionAck); } - /** - * Method to send Participant heartbeat to clamp on demand. - * - * @param participantStatus the Participant Status - */ - @Timed(value = "publisher.participant_status", description = "PARTICIPANT_STATUS messages published") - public void sendHeartbeat(final ParticipantStatus participantStatus) { - validate(); - topicSinkClient.send(participantStatus); - LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus); - } - private void validate() { if (!active) { throw new AutomationCompositionRuntimeException(Status.NOT_ACCEPTABLE, NOT_ACTIVE_TEXT); diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java index 9f9b25af4..0ed333ebd 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java @@ -267,7 +267,7 @@ public class AutomationCompositionOutHandler { participantDefinition.setParticipantId(cacheProvider.getParticipantId()); participantDefinition.setAutomationCompositionElementDefinitionList(List.of(acElementDefinition)); statusMsg.setParticipantDefinitionUpdates(List.of(participantDefinition)); - publisher.sendHeartbeat(statusMsg); + publisher.sendParticipantStatus(statusMsg); } private AutomationCompositionElementDefinition getAutomationCompositionElementDefinition( diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java index f51e8baad..343f8a9e8 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java @@ -28,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto; import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; @@ -48,6 +49,10 @@ public class CacheProvider { @Getter private final UUID participantId; + @Getter + @Setter + private boolean registered = false; + private final List<ParticipantSupportedElementType> supportedAcElementTypes; @Getter diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java index 6586ae4b3..a77d5242a 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java @@ -24,6 +24,8 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import lombok.Getter; import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; @@ -78,16 +80,16 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl () -> TopicEndpointManager.getManager().start(), () -> TopicEndpointManager.getManager().shutdown()); + listeners.forEach(listener -> + addAction("Listener " + listener.getClass().getSimpleName(), + () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), + () -> msgDispatcher.unregister(listener.getType()))); + publishers.forEach(publisher -> addAction("Publisher " + publisher.getClass().getSimpleName(), () -> publisher.active(topicSinks), publisher::stop)); - listeners.forEach(listener -> - addAction("Listener " + listener.getClass().getSimpleName(), - () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), - () -> msgDispatcher.unregister(listener.getType()))); - addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); // @formatter:on } @@ -101,7 +103,13 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) { if (!isAlive()) { start(); - sendParticipantRegister(); + var task = new TimerTask() { + @Override + public void run() { + new Thread(participantHandler::sendParticipantRegister).start(); + } + }; + new Timer().schedule(task, 5000); } } @@ -118,10 +126,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl } } - private void sendParticipantRegister() { - participantHandler.sendParticipantRegister(); - } - private void sendParticipantDeregister() { participantHandler.sendParticipantDeregister(); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java index a4a92f883..0865dca8e 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java @@ -66,7 +66,7 @@ public class ParticipantHandler { */ @Timed(value = "listener.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages received") public void handleParticipantStatusReq(final ParticipantStatusReq participantStatusReqMsg) { - publisher.sendParticipantStatus(makeHeartbeat()); + sendHeartbeat(); } /** @@ -159,6 +159,7 @@ public class ParticipantHandler { public void handleParticipantRegisterAck(ParticipantRegisterAck participantRegisterAckMsg) { LOGGER.debug("ParticipantRegisterAck message received as responseTo {}", participantRegisterAckMsg.getResponseTo()); + cacheProvider.setRegistered(true); publisher.sendParticipantStatus(makeHeartbeat()); } @@ -210,7 +211,11 @@ public class ParticipantHandler { */ public void sendHeartbeat() { if (publisher.isActive()) { - publisher.sendHeartbeat(makeHeartbeat()); + if (!cacheProvider.isRegistered()) { + sendParticipantRegister(); + } else { + publisher.sendParticipantStatus(makeHeartbeat()); + } } } diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java index 754fc6522..10f9d4586 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java @@ -93,8 +93,6 @@ class ParticipantCommTest { var participantStatus = new ParticipantStatus(); assertDoesNotThrow(() -> publisher.sendParticipantStatus(participantStatus)); - assertDoesNotThrow(() -> publisher.sendHeartbeat(participantStatus)); - var participantRegister = new ParticipantRegister(); assertDoesNotThrow(() -> publisher.sendParticipantRegister(participantRegister)); @@ -115,7 +113,6 @@ class ParticipantCommTest { var participantStatus = new ParticipantStatus(); assertThrows(AutomationCompositionRuntimeException.class, () -> publisher.sendParticipantStatus(participantStatus)); - assertThrows(AutomationCompositionRuntimeException.class, () -> publisher.sendHeartbeat(participantStatus)); var participantRegister = new ParticipantRegister(); assertThrows(AutomationCompositionRuntimeException.class, diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java index 76fbd068f..eed5319f8 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java @@ -206,16 +206,16 @@ class AutomationCompositionOutHandlerTest { var acOutHandler = new AutomationCompositionOutHandler(publisher, cacheProvider); acOutHandler.sendAcDefinitionInfo(null, null, Map.of()); - verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class)); + verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class)); acOutHandler.sendAcDefinitionInfo(UUID.randomUUID(), null, Map.of()); - verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class)); + verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class)); acOutHandler.sendAcDefinitionInfo(compositionId, new ToscaConceptIdentifier("wrong", "1.0.0"), Map.of()); - verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class)); + verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class)); acOutHandler.sendAcDefinitionInfo(compositionId, elementId, Map.of()); - verify(publisher).sendHeartbeat(any(ParticipantStatus.class)); + verify(publisher).sendParticipantStatus(any(ParticipantStatus.class)); } @Test diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java index 8aac93138..cd28d41fb 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -58,10 +59,16 @@ class ParticipantHandlerTest { @Test void handleParticipantStatusReqTest() { var publisher = mock(ParticipantMessagePublisher.class); + when(publisher.isActive()).thenReturn(true); var cacheProvider = mock(CacheProvider.class); var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class), mock(AcLockHandler.class), mock(AcDefinitionHandler.class), publisher, cacheProvider); participantHandler.handleParticipantStatusReq(new ParticipantStatusReq()); + verify(publisher).sendParticipantRegister(any(ParticipantRegister.class)); + + when(cacheProvider.isRegistered()).thenReturn(true); + clearInvocations(publisher); + participantHandler.handleParticipantStatusReq(new ParticipantStatusReq()); verify(publisher).sendParticipantStatus(any(ParticipantStatus.class)); } @@ -213,6 +220,7 @@ class ParticipantHandlerTest { void sendHeartbeatTest() { var cacheProvider = mock(CacheProvider.class); when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId()); + when(cacheProvider.isRegistered()).thenReturn(false); when(cacheProvider.getAutomationCompositions()).thenReturn(CommonTestData.getTestAutomationCompositionMap()); var publisher = mock(ParticipantMessagePublisher.class); when(publisher.isActive()).thenReturn(true); @@ -220,7 +228,11 @@ class ParticipantHandlerTest { var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class), mock(AcLockHandler.class), acHandler, publisher, cacheProvider); participantHandler.sendHeartbeat(); - verify(publisher).sendHeartbeat(any(ParticipantStatus.class)); - } + verify(publisher).sendParticipantRegister(any(ParticipantRegister.class)); + when(cacheProvider.isRegistered()).thenReturn(true); + clearInvocations(publisher); + participantHandler.sendHeartbeat(); + verify(publisher).sendParticipantStatus(any(ParticipantStatus.class)); + } } |