aboutsummaryrefslogtreecommitdiffstats
path: root/participant/participant-intermediary
diff options
context:
space:
mode:
Diffstat (limited to 'participant/participant-intermediary')
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java12
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java2
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java5
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java24
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java9
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java3
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java8
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java16
8 files changed, 45 insertions, 34 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java
index 926de854b..8fa2ec366 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java
@@ -126,18 +126,6 @@ public class ParticipantMessagePublisher implements Publisher {
LOGGER.debug("Sent AutomationComposition Update/StateChange Ack to runtime - {}", automationCompositionAck);
}
- /**
- * Method to send Participant heartbeat to clamp on demand.
- *
- * @param participantStatus the Participant Status
- */
- @Timed(value = "publisher.participant_status", description = "PARTICIPANT_STATUS messages published")
- public void sendHeartbeat(final ParticipantStatus participantStatus) {
- validate();
- topicSinkClient.send(participantStatus);
- LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus);
- }
-
private void validate() {
if (!active) {
throw new AutomationCompositionRuntimeException(Status.NOT_ACCEPTABLE, NOT_ACTIVE_TEXT);
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
index 9f9b25af4..0ed333ebd 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
@@ -267,7 +267,7 @@ public class AutomationCompositionOutHandler {
participantDefinition.setParticipantId(cacheProvider.getParticipantId());
participantDefinition.setAutomationCompositionElementDefinitionList(List.of(acElementDefinition));
statusMsg.setParticipantDefinitionUpdates(List.of(participantDefinition));
- publisher.sendHeartbeat(statusMsg);
+ publisher.sendParticipantStatus(statusMsg);
}
private AutomationCompositionElementDefinition getAutomationCompositionElementDefinition(
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
index f51e8baad..343f8a9e8 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.NonNull;
+import lombok.Setter;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -48,6 +49,10 @@ public class CacheProvider {
@Getter
private final UUID participantId;
+ @Getter
+ @Setter
+ private boolean registered = false;
+
private final List<ParticipantSupportedElementType> supportedAcElementTypes;
@Getter
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
index 6586ae4b3..a77d5242a 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
@@ -24,6 +24,8 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import lombok.Getter;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
@@ -78,16 +80,16 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
+ listeners.forEach(listener ->
+ addAction("Listener " + listener.getClass().getSimpleName(),
+ () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
+ () -> msgDispatcher.unregister(listener.getType())));
+
publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
() -> publisher.active(topicSinks),
publisher::stop));
- listeners.forEach(listener ->
- addAction("Listener " + listener.getClass().getSimpleName(),
- () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
- () -> msgDispatcher.unregister(listener.getType())));
-
addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
// @formatter:on
}
@@ -101,7 +103,13 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
if (!isAlive()) {
start();
- sendParticipantRegister();
+ var task = new TimerTask() {
+ @Override
+ public void run() {
+ new Thread(participantHandler::sendParticipantRegister).start();
+ }
+ };
+ new Timer().schedule(task, 5000);
}
}
@@ -118,10 +126,6 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
}
}
- private void sendParticipantRegister() {
- participantHandler.sendParticipantRegister();
- }
-
private void sendParticipantDeregister() {
participantHandler.sendParticipantDeregister();
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
index a4a92f883..0865dca8e 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
@@ -66,7 +66,7 @@ public class ParticipantHandler {
*/
@Timed(value = "listener.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages received")
public void handleParticipantStatusReq(final ParticipantStatusReq participantStatusReqMsg) {
- publisher.sendParticipantStatus(makeHeartbeat());
+ sendHeartbeat();
}
/**
@@ -159,6 +159,7 @@ public class ParticipantHandler {
public void handleParticipantRegisterAck(ParticipantRegisterAck participantRegisterAckMsg) {
LOGGER.debug("ParticipantRegisterAck message received as responseTo {}",
participantRegisterAckMsg.getResponseTo());
+ cacheProvider.setRegistered(true);
publisher.sendParticipantStatus(makeHeartbeat());
}
@@ -210,7 +211,11 @@ public class ParticipantHandler {
*/
public void sendHeartbeat() {
if (publisher.isActive()) {
- publisher.sendHeartbeat(makeHeartbeat());
+ if (!cacheProvider.isRegistered()) {
+ sendParticipantRegister();
+ } else {
+ publisher.sendParticipantStatus(makeHeartbeat());
+ }
}
}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
index 754fc6522..10f9d4586 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
@@ -93,8 +93,6 @@ class ParticipantCommTest {
var participantStatus = new ParticipantStatus();
assertDoesNotThrow(() -> publisher.sendParticipantStatus(participantStatus));
- assertDoesNotThrow(() -> publisher.sendHeartbeat(participantStatus));
-
var participantRegister = new ParticipantRegister();
assertDoesNotThrow(() -> publisher.sendParticipantRegister(participantRegister));
@@ -115,7 +113,6 @@ class ParticipantCommTest {
var participantStatus = new ParticipantStatus();
assertThrows(AutomationCompositionRuntimeException.class,
() -> publisher.sendParticipantStatus(participantStatus));
- assertThrows(AutomationCompositionRuntimeException.class, () -> publisher.sendHeartbeat(participantStatus));
var participantRegister = new ParticipantRegister();
assertThrows(AutomationCompositionRuntimeException.class,
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java
index 76fbd068f..eed5319f8 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java
@@ -206,16 +206,16 @@ class AutomationCompositionOutHandlerTest {
var acOutHandler = new AutomationCompositionOutHandler(publisher, cacheProvider);
acOutHandler.sendAcDefinitionInfo(null, null, Map.of());
- verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class));
+ verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
acOutHandler.sendAcDefinitionInfo(UUID.randomUUID(), null, Map.of());
- verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class));
+ verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
acOutHandler.sendAcDefinitionInfo(compositionId, new ToscaConceptIdentifier("wrong", "1.0.0"), Map.of());
- verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class));
+ verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
acOutHandler.sendAcDefinitionInfo(compositionId, elementId, Map.of());
- verify(publisher).sendHeartbeat(any(ParticipantStatus.class));
+ verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
}
@Test
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
index 8aac93138..cd28d41fb 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
@@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -58,10 +59,16 @@ class ParticipantHandlerTest {
@Test
void handleParticipantStatusReqTest() {
var publisher = mock(ParticipantMessagePublisher.class);
+ when(publisher.isActive()).thenReturn(true);
var cacheProvider = mock(CacheProvider.class);
var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
mock(AcLockHandler.class), mock(AcDefinitionHandler.class), publisher, cacheProvider);
participantHandler.handleParticipantStatusReq(new ParticipantStatusReq());
+ verify(publisher).sendParticipantRegister(any(ParticipantRegister.class));
+
+ when(cacheProvider.isRegistered()).thenReturn(true);
+ clearInvocations(publisher);
+ participantHandler.handleParticipantStatusReq(new ParticipantStatusReq());
verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
}
@@ -213,6 +220,7 @@ class ParticipantHandlerTest {
void sendHeartbeatTest() {
var cacheProvider = mock(CacheProvider.class);
when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+ when(cacheProvider.isRegistered()).thenReturn(false);
when(cacheProvider.getAutomationCompositions()).thenReturn(CommonTestData.getTestAutomationCompositionMap());
var publisher = mock(ParticipantMessagePublisher.class);
when(publisher.isActive()).thenReturn(true);
@@ -220,7 +228,11 @@ class ParticipantHandlerTest {
var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
mock(AcLockHandler.class), acHandler, publisher, cacheProvider);
participantHandler.sendHeartbeat();
- verify(publisher).sendHeartbeat(any(ParticipantStatus.class));
- }
+ verify(publisher).sendParticipantRegister(any(ParticipantRegister.class));
+ when(cacheProvider.isRegistered()).thenReturn(true);
+ clearInvocations(publisher);
+ participantHandler.sendHeartbeat();
+ verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
+ }
}