diff options
Diffstat (limited to 'participant/participant-intermediary/src/main/java')
5 files changed, 27 insertions, 25 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java index 926de854b..8fa2ec366 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java @@ -126,18 +126,6 @@ public class ParticipantMessagePublisher implements Publisher { LOGGER.debug("Sent AutomationComposition Update/StateChange Ack to runtime - {}", automationCompositionAck); } - /** - * Method to send Participant heartbeat to clamp on demand. - * - * @param participantStatus the Participant Status - */ - @Timed(value = "publisher.participant_status", description = "PARTICIPANT_STATUS messages published") - public void sendHeartbeat(final ParticipantStatus participantStatus) { - validate(); - topicSinkClient.send(participantStatus); - LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus); - } - private void validate() { if (!active) { throw new AutomationCompositionRuntimeException(Status.NOT_ACCEPTABLE, NOT_ACTIVE_TEXT); diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java index 9f9b25af4..0ed333ebd 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java @@ -267,7 +267,7 @@ public class AutomationCompositionOutHandler { participantDefinition.setParticipantId(cacheProvider.getParticipantId()); participantDefinition.setAutomationCompositionElementDefinitionList(List.of(acElementDefinition)); statusMsg.setParticipantDefinitionUpdates(List.of(participantDefinition)); - publisher.sendHeartbeat(statusMsg); + publisher.sendParticipantStatus(statusMsg); } private AutomationCompositionElementDefinition getAutomationCompositionElementDefinition( diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java index f51e8baad..343f8a9e8 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java @@ -28,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto; import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; @@ -48,6 +49,10 @@ public class CacheProvider { @Getter private final UUID participantId; + @Getter + @Setter + private boolean registered = false; + private final List<ParticipantSupportedElementType> supportedAcElementTypes; @Getter diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java index 6586ae4b3..a77d5242a 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java @@ -24,6 +24,8 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import lombok.Getter; import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; @@ -78,16 +80,16 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl () -> TopicEndpointManager.getManager().start(), () -> TopicEndpointManager.getManager().shutdown()); + listeners.forEach(listener -> + addAction("Listener " + listener.getClass().getSimpleName(), + () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), + () -> msgDispatcher.unregister(listener.getType()))); + publishers.forEach(publisher -> addAction("Publisher " + publisher.getClass().getSimpleName(), () -> publisher.active(topicSinks), publisher::stop)); - listeners.forEach(listener -> - addAction("Listener " + listener.getClass().getSimpleName(), - () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), - () -> msgDispatcher.unregister(listener.getType()))); - addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); // @formatter:on } @@ -101,7 +103,13 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) { if (!isAlive()) { start(); - sendParticipantRegister(); + var task = new TimerTask() { + @Override + public void run() { + new Thread(participantHandler::sendParticipantRegister).start(); + } + }; + new Timer().schedule(task, 5000); } } @@ -118,10 +126,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl } } - private void sendParticipantRegister() { - participantHandler.sendParticipantRegister(); - } - private void sendParticipantDeregister() { participantHandler.sendParticipantDeregister(); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java index a4a92f883..0865dca8e 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java @@ -66,7 +66,7 @@ public class ParticipantHandler { */ @Timed(value = "listener.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages received") public void handleParticipantStatusReq(final ParticipantStatusReq participantStatusReqMsg) { - publisher.sendParticipantStatus(makeHeartbeat()); + sendHeartbeat(); } /** @@ -159,6 +159,7 @@ public class ParticipantHandler { public void handleParticipantRegisterAck(ParticipantRegisterAck participantRegisterAckMsg) { LOGGER.debug("ParticipantRegisterAck message received as responseTo {}", participantRegisterAckMsg.getResponseTo()); + cacheProvider.setRegistered(true); publisher.sendParticipantStatus(makeHeartbeat()); } @@ -210,7 +211,11 @@ public class ParticipantHandler { */ public void sendHeartbeat() { if (publisher.isActive()) { - publisher.sendHeartbeat(makeHeartbeat()); + if (!cacheProvider.isRegistered()) { + sendParticipantRegister(); + } else { + publisher.sendParticipantStatus(makeHeartbeat()); + } } } |