summaryrefslogtreecommitdiffstats
path: root/runtime-controlloop/src/main
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2021-08-25 22:12:26 +0000
committerGerrit Code Review <gerrit@onap.org>2021-08-25 22:12:26 +0000
commitdb82b84e140e78e5f4145ed6d73fe089134dd173 (patch)
tree768c725bdf82d8e5dcf4b6b175a54de63b62898b /runtime-controlloop/src/main
parente9fc4bb89eb992bc3ff51fffcda2e6187bcc8ab7 (diff)
parent4bd0c9befe3256958d6f0a862dde4973dcedda9b (diff)
Merge "Intermittent issue in event handling between Cl runtime and participants"
Diffstat (limited to 'runtime-controlloop/src/main')
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java15
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java15
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java2
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java70
-rw-r--r--runtime-controlloop/src/main/resources/application.yaml5
5 files changed, 92 insertions, 15 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
index 2151dc143..7e070d700 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
@@ -57,6 +57,17 @@ public class HandleCounter<K> {
mapTimer.put(id, getEpochMilli());
}
+ /**
+ * Remove counter, timer and fault by id.
+ *
+ * @param id the id
+ */
+ public void remove(K id) {
+ mapFault.remove(id);
+ mapCounter.remove(id);
+ mapTimer.remove(id);
+ }
+
public void setFault(K id) {
mapCounter.put(id, 0);
mapFault.add(id);
@@ -88,4 +99,8 @@ public class HandleCounter<K> {
protected long getEpochMilli() {
return Instant.now().toEpochMilli();
}
+
+ public Set<K> keySet() {
+ return mapCounter.keySet();
+ }
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
index d0d18ab1a..fbb2742a7 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
@@ -26,10 +26,13 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
@@ -71,6 +74,18 @@ public class SupervisionAspect implements Closeable {
executor.execute(() -> supervisionScanner.handleParticipantStatus(participantStatusMessage.getParticipantId()));
}
+ @Before("@annotation(MessageIntercept) && args(participantRegisterMessage,..)")
+ public void handleParticipantRegister(ParticipantRegister participantRegisterMessage) {
+ executor.execute(() -> supervisionScanner.handleParticipantRegister(new ImmutablePair<>(
+ participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType())));
+ }
+
+ @Before("@annotation(MessageIntercept) && args(participantUpdateAckMessage,..)")
+ public void handleParticipantUpdateAck(ParticipantUpdateAck participantUpdateAckMessage) {
+ executor.execute(() -> supervisionScanner.handleParticipantUpdateAck(new ImmutablePair<>(
+ participantUpdateAckMessage.getParticipantId(), participantUpdateAckMessage.getParticipantType())));
+ }
+
@Override
public void close() throws IOException {
executor.shutdown();
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
index 7cc5d7fee..d06698ec4 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
@@ -249,7 +249,7 @@ public class SupervisionHandler {
}
private void superviseControlLoopPassivation(ControlLoop controlLoop)
- throws ControlLoopException, PfModelException {
+ throws ControlLoopException {
switch (controlLoop.getState()) {
case PASSIVE:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
index 7be407c3f..151b04cbf 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.controlloop.runtime.supervision;
import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
@@ -32,6 +33,7 @@ import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParame
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
@@ -46,15 +48,18 @@ public class SupervisionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class);
private HandleCounter<ToscaConceptIdentifier> controlLoopCounter = new HandleCounter<>();
- private HandleCounter<ToscaConceptIdentifier> participantCounter = new HandleCounter<>();
+ private HandleCounter<ToscaConceptIdentifier> participantStatusCounter = new HandleCounter<>();
+ private HandleCounter<Pair<ToscaConceptIdentifier, ToscaConceptIdentifier>> participantUpdateCounter =
+ new HandleCounter<>();
private final ControlLoopProvider controlLoopProvider;
private final ControlLoopStateChangePublisher controlLoopStateChangePublisher;
private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
private final ParticipantProvider participantProvider;
private final ParticipantStatusReqPublisher participantStatusReqPublisher;
+ private final ParticipantUpdatePublisher participantUpdatePublisher;
- private final long maxMessageAgeMs;
+ private final long maxWaitMs;
/**
* Constructor for instantiating SupervisionScanner.
@@ -64,30 +69,38 @@ public class SupervisionScanner {
* @param controlLoopUpdatePublisher the ControlLoopUpdate Publisher
* @param participantProvider the Participant Provider
* @param participantStatusReqPublisher the Participant StatusReq Publisher
+ * @param participantUpdatePublisher the Participant Update Publisher
* @param clRuntimeParameterGroup the parameters for the control loop runtime
*/
public SupervisionScanner(final ControlLoopProvider controlLoopProvider,
final ControlLoopStateChangePublisher controlLoopStateChangePublisher,
ControlLoopUpdatePublisher controlLoopUpdatePublisher, ParticipantProvider participantProvider,
ParticipantStatusReqPublisher participantStatusReqPublisher,
+ ParticipantUpdatePublisher participantUpdatePublisher,
final ClRuntimeParameterGroup clRuntimeParameterGroup) {
this.controlLoopProvider = controlLoopProvider;
this.controlLoopStateChangePublisher = controlLoopStateChangePublisher;
this.controlLoopUpdatePublisher = controlLoopUpdatePublisher;
this.participantProvider = participantProvider;
this.participantStatusReqPublisher = participantStatusReqPublisher;
+ this.participantUpdatePublisher = participantUpdatePublisher;
controlLoopCounter.setMaxRetryCount(
clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
controlLoopCounter
.setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
- participantCounter.setMaxRetryCount(
+ participantUpdateCounter.setMaxRetryCount(
clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
- participantCounter
+ participantUpdateCounter
.setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
- maxMessageAgeMs = clRuntimeParameterGroup.getParticipantParameters().getMaxMessageAgeMs();
+ participantStatusCounter.setMaxRetryCount(
+ clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
+ participantStatusCounter
+ .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
+
+ maxWaitMs = clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs();
}
/**
@@ -101,7 +114,7 @@ public class SupervisionScanner {
if (counterCheck) {
try {
for (Participant participant : participantProvider.getParticipants(null, null)) {
- scanParticipant(participant);
+ scanParticipantStatus(participant);
}
} catch (PfModelException pfme) {
LOGGER.warn("error reading participant from database", pfme);
@@ -116,24 +129,49 @@ public class SupervisionScanner {
} catch (PfModelException pfme) {
LOGGER.warn("error reading control loops from database", pfme);
}
+ if (counterCheck) {
+ scanParticipantUpdate();
+ }
LOGGER.debug("Control loop scan complete . . .");
}
- private void scanParticipant(Participant participant) throws PfModelException {
+ private void scanParticipantUpdate() {
+ LOGGER.debug("Scanning participants to update . . .");
+
+ for (var id : participantUpdateCounter.keySet()) {
+ if (participantUpdateCounter.isFault(id)) {
+ LOGGER.debug("report Participant Update fault");
+
+ } else if (participantUpdateCounter.getDuration(id) > maxWaitMs) {
+
+ if (participantUpdateCounter.count(id)) {
+ LOGGER.debug("retry message ParticipantUpdate");
+ participantUpdatePublisher.send(id.getLeft(), id.getRight());
+ } else {
+ LOGGER.debug("report Participant Update fault");
+ participantUpdateCounter.setFault(id);
+ }
+ }
+ }
+
+ LOGGER.debug("Participants to update scan complete . . .");
+ }
+
+ private void scanParticipantStatus(Participant participant) throws PfModelException {
ToscaConceptIdentifier id = participant.getKey().asIdentifier();
- if (participantCounter.isFault(id)) {
+ if (participantStatusCounter.isFault(id)) {
LOGGER.debug("report Participant fault");
return;
}
- if (participantCounter.getDuration(id) > maxMessageAgeMs) {
- if (participantCounter.count(id)) {
+ if (participantStatusCounter.getDuration(id) > maxWaitMs) {
+ if (participantStatusCounter.count(id)) {
LOGGER.debug("retry message ParticipantStatusReq");
participantStatusReqPublisher.send(id);
participant.setHealthStatus(ParticipantHealthStatus.NOT_HEALTHY);
} else {
LOGGER.debug("report Participant fault");
- participantCounter.setFault(id);
+ participantStatusCounter.setFault(id);
participant.setHealthStatus(ParticipantHealthStatus.OFF_LINE);
}
participantProvider.updateParticipants(List.of(participant));
@@ -144,7 +182,15 @@ public class SupervisionScanner {
* handle participant Status message.
*/
public void handleParticipantStatus(ToscaConceptIdentifier id) {
- participantCounter.clear(id);
+ participantStatusCounter.clear(id);
+ }
+
+ public void handleParticipantRegister(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
+ participantUpdateCounter.clear(id);
+ }
+
+ public void handleParticipantUpdateAck(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
+ participantUpdateCounter.remove(id);
}
private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException {
diff --git a/runtime-controlloop/src/main/resources/application.yaml b/runtime-controlloop/src/main/resources/application.yaml
index d0e5500d6..ea98aaa8c 100644
--- a/runtime-controlloop/src/main/resources/application.yaml
+++ b/runtime-controlloop/src/main/resources/application.yaml
@@ -24,9 +24,10 @@ runtime:
participantClStateChangeIntervalSec: 1000
participantParameters:
heartBeatMs: 120000
+ maxMessageAgeMs: 600000
updateParameters:
- maxRetryCount: 1
- maxWaitMs: 30000
+ maxRetryCount: 3
+ maxWaitMs: 100000
databaseProviderParameters:
name: PolicyProviderParameterGroup
implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl