diff options
Diffstat (limited to 'runtime-controlloop/src/main/java/org')
4 files changed, 89 insertions, 14 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java index 2151dc143..7e070d700 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java @@ -57,6 +57,17 @@ public class HandleCounter<K> { mapTimer.put(id, getEpochMilli()); } + /** + * Remove counter, timer and fault by id. + * + * @param id the id + */ + public void remove(K id) { + mapFault.remove(id); + mapCounter.remove(id); + mapTimer.remove(id); + } + public void setFault(K id) { mapCounter.put(id, 0); mapFault.add(id); @@ -88,4 +99,8 @@ public class HandleCounter<K> { protected long getEpochMilli() { return Instant.now().toEpochMilli(); } + + public Set<K> keySet() { + return mapCounter.keySet(); + } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java index d0d18ab1a..fbb2742a7 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java @@ -26,10 +26,13 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.aspectj.lang.annotation.After; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -71,6 +74,18 @@ public class SupervisionAspect implements Closeable { executor.execute(() -> supervisionScanner.handleParticipantStatus(participantStatusMessage.getParticipantId())); } + @Before("@annotation(MessageIntercept) && args(participantRegisterMessage,..)") + public void handleParticipantRegister(ParticipantRegister participantRegisterMessage) { + executor.execute(() -> supervisionScanner.handleParticipantRegister(new ImmutablePair<>( + participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType()))); + } + + @Before("@annotation(MessageIntercept) && args(participantUpdateAckMessage,..)") + public void handleParticipantUpdateAck(ParticipantUpdateAck participantUpdateAckMessage) { + executor.execute(() -> supervisionScanner.handleParticipantUpdateAck(new ImmutablePair<>( + participantUpdateAckMessage.getParticipantId(), participantUpdateAckMessage.getParticipantType()))); + } + @Override public void close() throws IOException { executor.shutdown(); diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java index 0e2ff5ca9..db7d34895 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java @@ -21,7 +21,6 @@ package org.onap.policy.clamp.controlloop.runtime.supervision; import java.util.List; -import java.util.Map; import javax.ws.rs.core.Response; import lombok.AllArgsConstructor; import org.apache.commons.collections4.CollectionUtils; @@ -225,7 +224,7 @@ public class SupervisionHandler { } private void superviseControlLoopPassivation(ControlLoop controlLoop) - throws ControlLoopException, PfModelException { + throws ControlLoopException { switch (controlLoop.getState()) { case PASSIVE: exceptionOccured(Response.Status.NOT_ACCEPTABLE, diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java index 7be407c3f..151b04cbf 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.runtime.supervision; import java.util.List; +import org.apache.commons.lang3.tuple.Pair; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; @@ -32,6 +33,7 @@ import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParame import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; @@ -46,15 +48,18 @@ public class SupervisionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class); private HandleCounter<ToscaConceptIdentifier> controlLoopCounter = new HandleCounter<>(); - private HandleCounter<ToscaConceptIdentifier> participantCounter = new HandleCounter<>(); + private HandleCounter<ToscaConceptIdentifier> participantStatusCounter = new HandleCounter<>(); + private HandleCounter<Pair<ToscaConceptIdentifier, ToscaConceptIdentifier>> participantUpdateCounter = + new HandleCounter<>(); private final ControlLoopProvider controlLoopProvider; private final ControlLoopStateChangePublisher controlLoopStateChangePublisher; private final ControlLoopUpdatePublisher controlLoopUpdatePublisher; private final ParticipantProvider participantProvider; private final ParticipantStatusReqPublisher participantStatusReqPublisher; + private final ParticipantUpdatePublisher participantUpdatePublisher; - private final long maxMessageAgeMs; + private final long maxWaitMs; /** * Constructor for instantiating SupervisionScanner. @@ -64,30 +69,38 @@ public class SupervisionScanner { * @param controlLoopUpdatePublisher the ControlLoopUpdate Publisher * @param participantProvider the Participant Provider * @param participantStatusReqPublisher the Participant StatusReq Publisher + * @param participantUpdatePublisher the Participant Update Publisher * @param clRuntimeParameterGroup the parameters for the control loop runtime */ public SupervisionScanner(final ControlLoopProvider controlLoopProvider, final ControlLoopStateChangePublisher controlLoopStateChangePublisher, ControlLoopUpdatePublisher controlLoopUpdatePublisher, ParticipantProvider participantProvider, ParticipantStatusReqPublisher participantStatusReqPublisher, + ParticipantUpdatePublisher participantUpdatePublisher, final ClRuntimeParameterGroup clRuntimeParameterGroup) { this.controlLoopProvider = controlLoopProvider; this.controlLoopStateChangePublisher = controlLoopStateChangePublisher; this.controlLoopUpdatePublisher = controlLoopUpdatePublisher; this.participantProvider = participantProvider; this.participantStatusReqPublisher = participantStatusReqPublisher; + this.participantUpdatePublisher = participantUpdatePublisher; controlLoopCounter.setMaxRetryCount( clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); controlLoopCounter .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); - participantCounter.setMaxRetryCount( + participantUpdateCounter.setMaxRetryCount( clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); - participantCounter + participantUpdateCounter .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); - maxMessageAgeMs = clRuntimeParameterGroup.getParticipantParameters().getMaxMessageAgeMs(); + participantStatusCounter.setMaxRetryCount( + clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); + participantStatusCounter + .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); + + maxWaitMs = clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs(); } /** @@ -101,7 +114,7 @@ public class SupervisionScanner { if (counterCheck) { try { for (Participant participant : participantProvider.getParticipants(null, null)) { - scanParticipant(participant); + scanParticipantStatus(participant); } } catch (PfModelException pfme) { LOGGER.warn("error reading participant from database", pfme); @@ -116,24 +129,49 @@ public class SupervisionScanner { } catch (PfModelException pfme) { LOGGER.warn("error reading control loops from database", pfme); } + if (counterCheck) { + scanParticipantUpdate(); + } LOGGER.debug("Control loop scan complete . . ."); } - private void scanParticipant(Participant participant) throws PfModelException { + private void scanParticipantUpdate() { + LOGGER.debug("Scanning participants to update . . ."); + + for (var id : participantUpdateCounter.keySet()) { + if (participantUpdateCounter.isFault(id)) { + LOGGER.debug("report Participant Update fault"); + + } else if (participantUpdateCounter.getDuration(id) > maxWaitMs) { + + if (participantUpdateCounter.count(id)) { + LOGGER.debug("retry message ParticipantUpdate"); + participantUpdatePublisher.send(id.getLeft(), id.getRight()); + } else { + LOGGER.debug("report Participant Update fault"); + participantUpdateCounter.setFault(id); + } + } + } + + LOGGER.debug("Participants to update scan complete . . ."); + } + + private void scanParticipantStatus(Participant participant) throws PfModelException { ToscaConceptIdentifier id = participant.getKey().asIdentifier(); - if (participantCounter.isFault(id)) { + if (participantStatusCounter.isFault(id)) { LOGGER.debug("report Participant fault"); return; } - if (participantCounter.getDuration(id) > maxMessageAgeMs) { - if (participantCounter.count(id)) { + if (participantStatusCounter.getDuration(id) > maxWaitMs) { + if (participantStatusCounter.count(id)) { LOGGER.debug("retry message ParticipantStatusReq"); participantStatusReqPublisher.send(id); participant.setHealthStatus(ParticipantHealthStatus.NOT_HEALTHY); } else { LOGGER.debug("report Participant fault"); - participantCounter.setFault(id); + participantStatusCounter.setFault(id); participant.setHealthStatus(ParticipantHealthStatus.OFF_LINE); } participantProvider.updateParticipants(List.of(participant)); @@ -144,7 +182,15 @@ public class SupervisionScanner { * handle participant Status message. */ public void handleParticipantStatus(ToscaConceptIdentifier id) { - participantCounter.clear(id); + participantStatusCounter.clear(id); + } + + public void handleParticipantRegister(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) { + participantUpdateCounter.clear(id); + } + + public void handleParticipantUpdateAck(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) { + participantUpdateCounter.remove(id); } private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException { |