summaryrefslogtreecommitdiffstats
path: root/runtime-controlloop/src/main/java/org
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2021-08-20 15:05:26 +0100
committerFrancescoFioraEst <francesco.fiora@est.tech>2021-08-24 14:42:12 +0100
commit4bd0c9befe3256958d6f0a862dde4973dcedda9b (patch)
treeb060bcb62b9f7c49bf968b894d68c7865887bc68 /runtime-controlloop/src/main/java/org
parentbf06e83f40cdbfcfc1428040bf8fa58a518cdea4 (diff)
Intermittent issue in event handling between Cl runtime and participants
Issue-ID: POLICY-3544 Change-Id: I40c5dc537b17986d01ab0d213e7ea7c9cdb7d59e Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-controlloop/src/main/java/org')
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java15
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java15
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java3
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java70
4 files changed, 89 insertions, 14 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
index 2151dc143..7e070d700 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java
@@ -57,6 +57,17 @@ public class HandleCounter<K> {
mapTimer.put(id, getEpochMilli());
}
+ /**
+ * Remove counter, timer and fault by id.
+ *
+ * @param id the id
+ */
+ public void remove(K id) {
+ mapFault.remove(id);
+ mapCounter.remove(id);
+ mapTimer.remove(id);
+ }
+
public void setFault(K id) {
mapCounter.put(id, 0);
mapFault.add(id);
@@ -88,4 +99,8 @@ public class HandleCounter<K> {
protected long getEpochMilli() {
return Instant.now().toEpochMilli();
}
+
+ public Set<K> keySet() {
+ return mapCounter.keySet();
+ }
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
index d0d18ab1a..fbb2742a7 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
@@ -26,10 +26,13 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
@@ -71,6 +74,18 @@ public class SupervisionAspect implements Closeable {
executor.execute(() -> supervisionScanner.handleParticipantStatus(participantStatusMessage.getParticipantId()));
}
+ @Before("@annotation(MessageIntercept) && args(participantRegisterMessage,..)")
+ public void handleParticipantRegister(ParticipantRegister participantRegisterMessage) {
+ executor.execute(() -> supervisionScanner.handleParticipantRegister(new ImmutablePair<>(
+ participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType())));
+ }
+
+ @Before("@annotation(MessageIntercept) && args(participantUpdateAckMessage,..)")
+ public void handleParticipantUpdateAck(ParticipantUpdateAck participantUpdateAckMessage) {
+ executor.execute(() -> supervisionScanner.handleParticipantUpdateAck(new ImmutablePair<>(
+ participantUpdateAckMessage.getParticipantId(), participantUpdateAckMessage.getParticipantType())));
+ }
+
@Override
public void close() throws IOException {
executor.shutdown();
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
index 0e2ff5ca9..db7d34895 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
@@ -21,7 +21,6 @@
package org.onap.policy.clamp.controlloop.runtime.supervision;
import java.util.List;
-import java.util.Map;
import javax.ws.rs.core.Response;
import lombok.AllArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
@@ -225,7 +224,7 @@ public class SupervisionHandler {
}
private void superviseControlLoopPassivation(ControlLoop controlLoop)
- throws ControlLoopException, PfModelException {
+ throws ControlLoopException {
switch (controlLoop.getState()) {
case PASSIVE:
exceptionOccured(Response.Status.NOT_ACCEPTABLE,
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
index 7be407c3f..151b04cbf 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.controlloop.runtime.supervision;
import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
@@ -32,6 +33,7 @@ import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParame
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
@@ -46,15 +48,18 @@ public class SupervisionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class);
private HandleCounter<ToscaConceptIdentifier> controlLoopCounter = new HandleCounter<>();
- private HandleCounter<ToscaConceptIdentifier> participantCounter = new HandleCounter<>();
+ private HandleCounter<ToscaConceptIdentifier> participantStatusCounter = new HandleCounter<>();
+ private HandleCounter<Pair<ToscaConceptIdentifier, ToscaConceptIdentifier>> participantUpdateCounter =
+ new HandleCounter<>();
private final ControlLoopProvider controlLoopProvider;
private final ControlLoopStateChangePublisher controlLoopStateChangePublisher;
private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
private final ParticipantProvider participantProvider;
private final ParticipantStatusReqPublisher participantStatusReqPublisher;
+ private final ParticipantUpdatePublisher participantUpdatePublisher;
- private final long maxMessageAgeMs;
+ private final long maxWaitMs;
/**
* Constructor for instantiating SupervisionScanner.
@@ -64,30 +69,38 @@ public class SupervisionScanner {
* @param controlLoopUpdatePublisher the ControlLoopUpdate Publisher
* @param participantProvider the Participant Provider
* @param participantStatusReqPublisher the Participant StatusReq Publisher
+ * @param participantUpdatePublisher the Participant Update Publisher
* @param clRuntimeParameterGroup the parameters for the control loop runtime
*/
public SupervisionScanner(final ControlLoopProvider controlLoopProvider,
final ControlLoopStateChangePublisher controlLoopStateChangePublisher,
ControlLoopUpdatePublisher controlLoopUpdatePublisher, ParticipantProvider participantProvider,
ParticipantStatusReqPublisher participantStatusReqPublisher,
+ ParticipantUpdatePublisher participantUpdatePublisher,
final ClRuntimeParameterGroup clRuntimeParameterGroup) {
this.controlLoopProvider = controlLoopProvider;
this.controlLoopStateChangePublisher = controlLoopStateChangePublisher;
this.controlLoopUpdatePublisher = controlLoopUpdatePublisher;
this.participantProvider = participantProvider;
this.participantStatusReqPublisher = participantStatusReqPublisher;
+ this.participantUpdatePublisher = participantUpdatePublisher;
controlLoopCounter.setMaxRetryCount(
clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
controlLoopCounter
.setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
- participantCounter.setMaxRetryCount(
+ participantUpdateCounter.setMaxRetryCount(
clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
- participantCounter
+ participantUpdateCounter
.setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
- maxMessageAgeMs = clRuntimeParameterGroup.getParticipantParameters().getMaxMessageAgeMs();
+ participantStatusCounter.setMaxRetryCount(
+ clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
+ participantStatusCounter
+ .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
+
+ maxWaitMs = clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs();
}
/**
@@ -101,7 +114,7 @@ public class SupervisionScanner {
if (counterCheck) {
try {
for (Participant participant : participantProvider.getParticipants(null, null)) {
- scanParticipant(participant);
+ scanParticipantStatus(participant);
}
} catch (PfModelException pfme) {
LOGGER.warn("error reading participant from database", pfme);
@@ -116,24 +129,49 @@ public class SupervisionScanner {
} catch (PfModelException pfme) {
LOGGER.warn("error reading control loops from database", pfme);
}
+ if (counterCheck) {
+ scanParticipantUpdate();
+ }
LOGGER.debug("Control loop scan complete . . .");
}
- private void scanParticipant(Participant participant) throws PfModelException {
+ private void scanParticipantUpdate() {
+ LOGGER.debug("Scanning participants to update . . .");
+
+ for (var id : participantUpdateCounter.keySet()) {
+ if (participantUpdateCounter.isFault(id)) {
+ LOGGER.debug("report Participant Update fault");
+
+ } else if (participantUpdateCounter.getDuration(id) > maxWaitMs) {
+
+ if (participantUpdateCounter.count(id)) {
+ LOGGER.debug("retry message ParticipantUpdate");
+ participantUpdatePublisher.send(id.getLeft(), id.getRight());
+ } else {
+ LOGGER.debug("report Participant Update fault");
+ participantUpdateCounter.setFault(id);
+ }
+ }
+ }
+
+ LOGGER.debug("Participants to update scan complete . . .");
+ }
+
+ private void scanParticipantStatus(Participant participant) throws PfModelException {
ToscaConceptIdentifier id = participant.getKey().asIdentifier();
- if (participantCounter.isFault(id)) {
+ if (participantStatusCounter.isFault(id)) {
LOGGER.debug("report Participant fault");
return;
}
- if (participantCounter.getDuration(id) > maxMessageAgeMs) {
- if (participantCounter.count(id)) {
+ if (participantStatusCounter.getDuration(id) > maxWaitMs) {
+ if (participantStatusCounter.count(id)) {
LOGGER.debug("retry message ParticipantStatusReq");
participantStatusReqPublisher.send(id);
participant.setHealthStatus(ParticipantHealthStatus.NOT_HEALTHY);
} else {
LOGGER.debug("report Participant fault");
- participantCounter.setFault(id);
+ participantStatusCounter.setFault(id);
participant.setHealthStatus(ParticipantHealthStatus.OFF_LINE);
}
participantProvider.updateParticipants(List.of(participant));
@@ -144,7 +182,15 @@ public class SupervisionScanner {
* handle participant Status message.
*/
public void handleParticipantStatus(ToscaConceptIdentifier id) {
- participantCounter.clear(id);
+ participantStatusCounter.clear(id);
+ }
+
+ public void handleParticipantRegister(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
+ participantUpdateCounter.clear(id);
+ }
+
+ public void handleParticipantUpdateAck(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) {
+ participantUpdateCounter.remove(id);
}
private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException {