aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml8
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java15
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java15
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java15
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java2
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java70
-rw-r--r--runtime-controlloop/src/main/resources/application.yaml5
-rw-r--r--runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java31
8 files changed, 121 insertions, 40 deletions
diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml
index 3970d15f1..0aa3fb782 100644
--- a/packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml
+++ b/packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml
@@ -24,12 +24,10 @@ runtime:
participantClStateChangeIntervalSec: 1000
participantParameters:
heartBeatMs: 120000
+ maxMessageAgeMs: 600000
updateParameters:
- maxRetryCount: 1
- maxWaitMs: 30000
- stateChangeParameters:
- maxRetryCount: 1
- maxWaitMs: 30000
+ maxRetryCount: 3
+ maxWaitMs: 100000
databaseProviderParameters:
name: PolicyProviderParameterGroup
implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
index daf9ebe39..2bc21f713 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
@@ -250,13 +251,13 @@ public class ControlLoopHandler {
}
private List<ControlLoopElement> storeElementsOnThisParticipant(List<ParticipantUpdates> participantUpdates) {
- List<ControlLoopElement> clElementMap = new ArrayList<>();
- for (ParticipantUpdates participantUpdate : participantUpdates) {
- if (participantUpdate.getParticipantId().equals(participantType)) {
- clElementMap = participantUpdate.getControlLoopElementList();
- }
- }
- for (ControlLoopElement element : clElementMap) {
+ var clElementMap =
+ participantUpdates.stream()
+ .flatMap(participantUpdate -> participantUpdate.getControlLoopElementList().stream())
+ .filter(element -> participantType.equals(element.getParticipantType()))
+ .collect(Collectors.toList());
+
+ for (var element : clElementMap) {
elementsOnThisParticipant.put(element.getId(), element);
}
return clElementMap;
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
index 2151dc143..7e070d700 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
@@ -57,6 +57,17 @@ public class HandleCounter<K> {
mapTimer.put(id, getEpochMilli());
}
+ /**
+ * Remove counter, timer and fault by id.
+ *
+ * @param id the id
+ */
+ public void remove(K id) {
+ mapFault.remove(id);
+ mapCounter.remove(id);
+ mapTimer.remove(id);
+ }
+
public void setFault(K id) {
mapCounter.put(id, 0);
mapFault.add(id);
@@ -88,4 +99,8 @@ public class HandleCounter<K> {
protected long getEpochMilli() {
return Instant.now().toEpochMilli();
}
+
+ public Set<K> keySet() {
+ return mapCounter.keySet();
+ }
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
index d0d18ab1a..fbb2742a7 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
@@ -26,10 +26,13 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
@@ -71,6 +74,18 @@ public class SupervisionAspect implements Closeable {
executor.execute(() -> supervisionScanner.handleParticipantStatus(participantStatusMessage.getParticipantId()));
}
+ @Before("@annotation(MessageIntercept) && args(participantRegisterMessage,..)")
+ public void handleParticipantRegister(ParticipantRegister participantRegisterMessage) {
+ executor.execute(() -> supervisionScanner.handleParticipantRegister(new ImmutablePair<>(
+ participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType())));
+ }
+
+ @Before("@annotation(MessageIntercept) && args(participantUpdateAckMessage,..)")
+ public void handleParticipantUpdateAck(ParticipantUpdateAck participantUpdateAckMessage) {
+ executor.execute(() -> supervisionScanner.handleParticipantUpdateAck(new ImmutablePair<>(
+ participantUpdateAckMessage.getParticipantId(), participantUpdateAckMessage.getParticipantType())));
+ }
+
@Override
public void close() throws IOException {
executor.shutdown();
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
index 7cc5d7fee..d06698ec4 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
@@ -249,7 +249,7 @@ public class SupervisionHandler {
}
private void superviseControlLoopPassivation(ControlLoop controlLoop)
- throws ControlLoopException, PfModelException {
+ throws ControlLoopException {
switch (controlLoop.getState()) {
case PASSIVE:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
index 7be407c3f..151b04cbf 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.controlloop.runtime.supervision;
import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
@@ -32,6 +33,7 @@ import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParame
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
@@ -46,15 +48,18 @@ public class SupervisionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class);
private HandleCounter<ToscaConceptIdentifier> controlLoopCounter = new HandleCounter<>();
- private HandleCounter<ToscaConceptIdentifier> participantCounter = new HandleCounter<>();
+ private HandleCounter<ToscaConceptIdentifier> participantStatusCounter = new HandleCounter<>();
+ private HandleCounter<Pair<ToscaConceptIdentifier, ToscaConceptIdentifier>> participantUpdateCounter =
+ new HandleCounter<>();
private final ControlLoopProvider controlLoopProvider;
private final ControlLoopStateChangePublisher controlLoopStateChangePublisher;
private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
private final ParticipantProvider participantProvider;
private final ParticipantStatusReqPublisher participantStatusReqPublisher;
+ private final ParticipantUpdatePublisher participantUpdatePublisher;
- private final long maxMessageAgeMs;
+ private final long maxWaitMs;
/**
* Constructor for instantiating SupervisionScanner.
@@ -64,30 +69,38 @@ public class SupervisionScanner {
* @param controlLoopUpdatePublisher the ControlLoopUpdate Publisher
* @param participantProvider the Participant Provider
* @param participantStatusReqPublisher the Participant StatusReq Publisher
+ * @param participantUpdatePublisher the Participant Update Publisher
* @param clRuntimeParameterGroup the parameters for the control loop runtime
*/
public SupervisionScanner(final ControlLoopProvider controlLoopProvider,
final ControlLoopStateChangePublisher controlLoopStateChangePublisher,
ControlLoopUpdatePublisher controlLoopUpdatePublisher, ParticipantProvider participantProvider,
ParticipantStatusReqPublisher participantStatusReqPublisher,
+ ParticipantUpdatePublisher participantUpdatePublisher,
final ClRuntimeParameterGroup clRuntimeParameterGroup) {
this.controlLoopProvider = controlLoopProvider;
this.controlLoopStateChangePublisher = controlLoopStateChangePublisher;
this.controlLoopUpdatePublisher = controlLoopUpdatePublisher;
this.participantProvider = participantProvider;
this.participantStatusReqPublisher = participantStatusReqPublisher;
+ this.participantUpdatePublisher = participantUpdatePublisher;
controlLoopCounter.setMaxRetryCount(
clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
controlLoopCounter
.setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
- participantCounter.setMaxRetryCount(
+ participantUpdateCounter.setMaxRetryCount(
clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
- participantCounter
+ participantUpdateCounter
.setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
- maxMessageAgeMs = clRuntimeParameterGroup.getParticipantParameters().getMaxMessageAgeMs();
+ participantStatusCounter.setMaxRetryCount(
+ clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
+ participantStatusCounter
+ .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
+
+ maxWaitMs = clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs();
}
/**
@@ -101,7 +114,7 @@ public class SupervisionScanner {
if (counterCheck) {
try {
for (Participant participant : participantProvider.getParticipants(null, null)) {
- scanParticipant(participant);
+ scanParticipantStatus(participant);
}
} catch (PfModelException pfme) {
LOGGER.warn("error reading participant from database", pfme);
@@ -116,24 +129,49 @@ public class SupervisionScanner {
} catch (PfModelException pfme) {
LOGGER.warn("error reading control loops from database", pfme);
}
+ if (counterCheck) {
+ scanParticipantUpdate();
+ }
LOGGER.debug("Control loop scan complete . . .");
}
- private void scanParticipant(Participant participant) throws PfModelException {
+ private void scanParticipantUpdate() {
+ LOGGER.debug("Scanning participants to update . . .");
+
+ for (var id : participantUpdateCounter.keySet()) {
+ if (participantUpdateCounter.isFault(id)) {
+ LOGGER.debug("report Participant Update fault");
+
+ } else if (participantUpdateCounter.getDuration(id) > maxWaitMs) {
+
+ if (participantUpdateCounter.count(id)) {
+ LOGGER.debug("retry message ParticipantUpdate");
+ participantUpdatePublisher.send(id.getLeft(), id.getRight());
+ } else {
+ LOGGER.debug("report Participant Update fault");
+ participantUpdateCounter.setFault(id);
+ }
+ }
+ }
+
+ LOGGER.debug("Participants to update scan complete . . .");
+ }
+
+ private void scanParticipantStatus(Participant participant) throws PfModelException {
ToscaConceptIdentifier id = participant.getKey().asIdentifier();
- if (participantCounter.isFault(id)) {
+ if (participantStatusCounter.isFault(id)) {
LOGGER.debug("report Participant fault");
return;
}
- if (participantCounter.getDuration(id) > maxMessageAgeMs) {
- if (participantCounter.count(id)) {
+ if (participantStatusCounter.getDuration(id) > maxWaitMs) {
+ if (participantStatusCounter.count(id)) {
LOGGER.debug("retry message ParticipantStatusReq");
participantStatusReqPublisher.send(id);
participant.setHealthStatus(ParticipantHealthStatus.NOT_HEALTHY);
} else {
LOGGER.debug("report Participant fault");
- participantCounter.setFault(id);
+ participantStatusCounter.setFault(id);
participant.setHealthStatus(ParticipantHealthStatus.OFF_LINE);
}
participantProvider.updateParticipants(List.of(participant));
@@ -144,7 +182,15 @@ public class SupervisionScanner {
* handle participant Status message.
*/
public void handleParticipantStatus(ToscaConceptIdentifier id) {
- participantCounter.clear(id);
+ participantStatusCounter.clear(id);
+ }
+
+ public void handleParticipantRegister(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
+ participantUpdateCounter.clear(id);
+ }
+
+ public void handleParticipantUpdateAck(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
+ participantUpdateCounter.remove(id);
}
private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException {
diff --git a/runtime-controlloop/src/main/resources/application.yaml b/runtime-controlloop/src/main/resources/application.yaml
index d0e5500d6..ea98aaa8c 100644
--- a/runtime-controlloop/src/main/resources/application.yaml
+++ b/runtime-controlloop/src/main/resources/application.yaml
@@ -24,9 +24,10 @@ runtime:
participantClStateChangeIntervalSec: 1000
participantParameters:
heartBeatMs: 120000
+ maxMessageAgeMs: 600000
updateParameters:
- maxRetryCount: 1
- maxWaitMs: 30000
+ maxRetryCount: 3
+ maxWaitMs: 100000
databaseProviderParameters:
name: PolicyProviderParameterGroup
implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl
diff --git a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java
index 485f58dba..717858ebe 100644
--- a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java
+++ b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java
@@ -43,6 +43,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.clamp.controlloop.runtime.util.CommonTestData;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -56,14 +57,15 @@ class SupervisionScannerTest {
var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class);
var participantProvider = mock(ParticipantProvider.class);
var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
+ var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
var controlLoop = new ControlLoop();
when(controlLoopProvider.getControlLoops(null, null)).thenReturn(List.of(controlLoop));
- var supervisionScanner =
- new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher,
- participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup);
+ var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
+ controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
+ participantUpdatePublisher, clRuntimeParameterGroup);
supervisionScanner.run(false);
verify(controlLoopProvider, times(0)).updateControlLoop(any(ControlLoop.class));
@@ -82,11 +84,12 @@ class SupervisionScannerTest {
var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class);
var participantProvider = mock(ParticipantProvider.class);
var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
+ var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
- var supervisionScanner =
- new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher,
- participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup);
+ var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
+ controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
+ participantUpdatePublisher, clRuntimeParameterGroup);
supervisionScanner.run(false);
verify(controlLoopProvider, times(1)).updateControlLoop(any(ControlLoop.class));
@@ -107,11 +110,12 @@ class SupervisionScannerTest {
var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class);
var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class);
+ var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
- var supervisionScanner =
- new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher,
- participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup);
+ var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
+ controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
+ participantUpdatePublisher, clRuntimeParameterGroup);
supervisionScanner.handleParticipantStatus(participant.getKey().asIdentifier());
supervisionScanner.run(true);
@@ -126,7 +130,7 @@ class SupervisionScannerTest {
when(controlLoopProvider.getControlLoops(null, null)).thenReturn(List.of(controlLoop));
var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant");
- clRuntimeParameterGroup.getParticipantParameters().setMaxMessageAgeMs(0);
+ clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().setMaxWaitMs(0);
var participant = new Participant();
participant.setName("Participant0");
@@ -140,10 +144,11 @@ class SupervisionScannerTest {
var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class);
var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class);
+ var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
- var supervisionScanner =
- new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher,
- participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup);
+ var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
+ controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
+ participantUpdatePublisher, clRuntimeParameterGroup);
supervisionScanner.handleParticipantStatus(participant.getKey().asIdentifier());
supervisionScanner.run(true);