aboutsummaryrefslogtreecommitdiffstats
path: root/participant/participant-intermediary/src/main/java/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'participant/participant-intermediary/src/main/java/org/onap')
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java12
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java2
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java5
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java24
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java9
5 files changed, 27 insertions, 25 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java
index 926de854b..8fa2ec366 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java
@@ -126,18 +126,6 @@ public class ParticipantMessagePublisher implements Publisher {
LOGGER.debug("Sent AutomationComposition Update/StateChange Ack to runtime - {}", automationCompositionAck);
}
- /**
- * Method to send Participant heartbeat to clamp on demand.
- *
- * @param participantStatus the Participant Status
- */
- @Timed(value = "publisher.participant_status", description = "PARTICIPANT_STATUS messages published")
- public void sendHeartbeat(final ParticipantStatus participantStatus) {
- validate();
- topicSinkClient.send(participantStatus);
- LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus);
- }
-
private void validate() {
if (!active) {
throw new AutomationCompositionRuntimeException(Status.NOT_ACCEPTABLE, NOT_ACTIVE_TEXT);
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
index 9f9b25af4..0ed333ebd 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
@@ -267,7 +267,7 @@ public class AutomationCompositionOutHandler {
participantDefinition.setParticipantId(cacheProvider.getParticipantId());
participantDefinition.setAutomationCompositionElementDefinitionList(List.of(acElementDefinition));
statusMsg.setParticipantDefinitionUpdates(List.of(participantDefinition));
- publisher.sendHeartbeat(statusMsg);
+ publisher.sendParticipantStatus(statusMsg);
}
private AutomationCompositionElementDefinition getAutomationCompositionElementDefinition(
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
index f51e8baad..343f8a9e8 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.NonNull;
+import lombok.Setter;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -48,6 +49,10 @@ public class CacheProvider {
@Getter
private final UUID participantId;
+ @Getter
+ @Setter
+ private boolean registered = false;
+
private final List<ParticipantSupportedElementType> supportedAcElementTypes;
@Getter
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
index 6586ae4b3..a77d5242a 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
@@ -24,6 +24,8 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import lombok.Getter;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
@@ -78,16 +80,16 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
+ listeners.forEach(listener ->
+ addAction("Listener " + listener.getClass().getSimpleName(),
+ () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
+ () -> msgDispatcher.unregister(listener.getType())));
+
publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
() -> publisher.active(topicSinks),
publisher::stop));
- listeners.forEach(listener ->
- addAction("Listener " + listener.getClass().getSimpleName(),
- () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
- () -> msgDispatcher.unregister(listener.getType())));
-
addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
// @formatter:on
}
@@ -101,7 +103,13 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
if (!isAlive()) {
start();
- sendParticipantRegister();
+ var task = new TimerTask() {
+ @Override
+ public void run() {
+ new Thread(participantHandler::sendParticipantRegister).start();
+ }
+ };
+ new Timer().schedule(task, 5000);
}
}
@@ -118,10 +126,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
}
}
- private void sendParticipantRegister() {
- participantHandler.sendParticipantRegister();
- }
-
private void sendParticipantDeregister() {
participantHandler.sendParticipantDeregister();
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
index a4a92f883..0865dca8e 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
@@ -66,7 +66,7 @@ public class ParticipantHandler {
*/
@Timed(value = "listener.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages received")
public void handleParticipantStatusReq(final ParticipantStatusReq participantStatusReqMsg) {
- publisher.sendParticipantStatus(makeHeartbeat());
+ sendHeartbeat();
}
/**
@@ -159,6 +159,7 @@ public class ParticipantHandler {
public void handleParticipantRegisterAck(ParticipantRegisterAck participantRegisterAckMsg) {
LOGGER.debug("ParticipantRegisterAck message received as responseTo {}",
participantRegisterAckMsg.getResponseTo());
+ cacheProvider.setRegistered(true);
publisher.sendParticipantStatus(makeHeartbeat());
}
@@ -210,7 +211,11 @@ public class ParticipantHandler {
*/
public void sendHeartbeat() {
if (publisher.isActive()) {
- publisher.sendHeartbeat(makeHeartbeat());
+ if (!cacheProvider.isRegistered()) {
+ sendParticipantRegister();
+ } else {
+ publisher.sendParticipantStatus(makeHeartbeat());
+ }
}
}