diff options
8 files changed, 121 insertions, 40 deletions
diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml index 3970d15f1..0aa3fb782 100644 --- a/packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml +++ b/packages/policy-clamp-tarball/src/main/resources/etc/ClRuntimeParameters.yaml @@ -24,12 +24,10 @@ runtime: participantClStateChangeIntervalSec: 1000 participantParameters: heartBeatMs: 120000 + maxMessageAgeMs: 600000 updateParameters: - maxRetryCount: 1 - maxWaitMs: 30000 - stateChangeParameters: - maxRetryCount: 1 - maxWaitMs: 30000 + maxRetryCount: 3 + maxWaitMs: 100000 databaseProviderParameters: name: PolicyProviderParameterGroup implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java index fbe940c7f..0e276f390 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import lombok.Getter; import lombok.NoArgsConstructor; import org.apache.commons.collections4.CollectionUtils; @@ -245,13 +246,13 @@ public class ControlLoopHandler { } private List<ControlLoopElement> storeElementsOnThisParticipant(List<ParticipantUpdates> participantUpdates) { - List<ControlLoopElement> clElementMap = new ArrayList<>(); - for (ParticipantUpdates participantUpdate : participantUpdates) { - if (participantUpdate.getParticipantId().equals(participantType)) { - clElementMap = participantUpdate.getControlLoopElementList(); - } - } - for (ControlLoopElement element : clElementMap) { + var clElementMap = + participantUpdates.stream() + .flatMap(participantUpdate -> participantUpdate.getControlLoopElementList().stream()) + .filter(element -> participantType.equals(element.getParticipantType())) + .collect(Collectors.toList()); + + for (var element : clElementMap) { elementsOnThisParticipant.put(element.getId(), element); } return clElementMap; diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java index 2151dc143..7e070d700 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java @@ -57,6 +57,17 @@ public class HandleCounter<K> { mapTimer.put(id, getEpochMilli()); } + /** + * Remove counter, timer and fault by id. + * + * @param id the id + */ + public void remove(K id) { + mapFault.remove(id); + mapCounter.remove(id); + mapTimer.remove(id); + } + public void setFault(K id) { mapCounter.put(id, 0); mapFault.add(id); @@ -88,4 +99,8 @@ public class HandleCounter<K> { protected long getEpochMilli() { return Instant.now().toEpochMilli(); } + + public Set<K> keySet() { + return mapCounter.keySet(); + } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java index d0d18ab1a..fbb2742a7 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java @@ -26,10 +26,13 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.aspectj.lang.annotation.After; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -71,6 +74,18 @@ public class SupervisionAspect implements Closeable { executor.execute(() -> supervisionScanner.handleParticipantStatus(participantStatusMessage.getParticipantId())); } + @Before("@annotation(MessageIntercept) && args(participantRegisterMessage,..)") + public void handleParticipantRegister(ParticipantRegister participantRegisterMessage) { + executor.execute(() -> supervisionScanner.handleParticipantRegister(new ImmutablePair<>( + participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType()))); + } + + @Before("@annotation(MessageIntercept) && args(participantUpdateAckMessage,..)") + public void handleParticipantUpdateAck(ParticipantUpdateAck participantUpdateAckMessage) { + executor.execute(() -> supervisionScanner.handleParticipantUpdateAck(new ImmutablePair<>( + participantUpdateAckMessage.getParticipantId(), participantUpdateAckMessage.getParticipantType()))); + } + @Override public void close() throws IOException { executor.shutdown(); diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java index 561d4fdf5..2cc0f94e2 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java @@ -270,7 +270,7 @@ public class SupervisionHandler { } private void superviseControlLoopPassivation(ControlLoop controlLoop) - throws ControlLoopException, PfModelException { + throws ControlLoopException { switch (controlLoop.getState()) { case PASSIVE: exceptionOccured(Response.Status.NOT_ACCEPTABLE, diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java index 7be407c3f..151b04cbf 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java @@ -21,6 +21,7 @@ package org.onap.policy.clamp.controlloop.runtime.supervision; import java.util.List; +import org.apache.commons.lang3.tuple.Pair; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; @@ -32,6 +33,7 @@ import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParame import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; @@ -46,15 +48,18 @@ public class SupervisionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class); private HandleCounter<ToscaConceptIdentifier> controlLoopCounter = new HandleCounter<>(); - private HandleCounter<ToscaConceptIdentifier> participantCounter = new HandleCounter<>(); + private HandleCounter<ToscaConceptIdentifier> participantStatusCounter = new HandleCounter<>(); + private HandleCounter<Pair<ToscaConceptIdentifier, ToscaConceptIdentifier>> participantUpdateCounter = + new HandleCounter<>(); private final ControlLoopProvider controlLoopProvider; private final ControlLoopStateChangePublisher controlLoopStateChangePublisher; private final ControlLoopUpdatePublisher controlLoopUpdatePublisher; private final ParticipantProvider participantProvider; private final ParticipantStatusReqPublisher participantStatusReqPublisher; + private final ParticipantUpdatePublisher participantUpdatePublisher; - private final long maxMessageAgeMs; + private final long maxWaitMs; /** * Constructor for instantiating SupervisionScanner. @@ -64,30 +69,38 @@ public class SupervisionScanner { * @param controlLoopUpdatePublisher the ControlLoopUpdate Publisher * @param participantProvider the Participant Provider * @param participantStatusReqPublisher the Participant StatusReq Publisher + * @param participantUpdatePublisher the Participant Update Publisher * @param clRuntimeParameterGroup the parameters for the control loop runtime */ public SupervisionScanner(final ControlLoopProvider controlLoopProvider, final ControlLoopStateChangePublisher controlLoopStateChangePublisher, ControlLoopUpdatePublisher controlLoopUpdatePublisher, ParticipantProvider participantProvider, ParticipantStatusReqPublisher participantStatusReqPublisher, + ParticipantUpdatePublisher participantUpdatePublisher, final ClRuntimeParameterGroup clRuntimeParameterGroup) { this.controlLoopProvider = controlLoopProvider; this.controlLoopStateChangePublisher = controlLoopStateChangePublisher; this.controlLoopUpdatePublisher = controlLoopUpdatePublisher; this.participantProvider = participantProvider; this.participantStatusReqPublisher = participantStatusReqPublisher; + this.participantUpdatePublisher = participantUpdatePublisher; controlLoopCounter.setMaxRetryCount( clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); controlLoopCounter .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); - participantCounter.setMaxRetryCount( + participantUpdateCounter.setMaxRetryCount( clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); - participantCounter + participantUpdateCounter .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); - maxMessageAgeMs = clRuntimeParameterGroup.getParticipantParameters().getMaxMessageAgeMs(); + participantStatusCounter.setMaxRetryCount( + clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); + participantStatusCounter + .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); + + maxWaitMs = clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs(); } /** @@ -101,7 +114,7 @@ public class SupervisionScanner { if (counterCheck) { try { for (Participant participant : participantProvider.getParticipants(null, null)) { - scanParticipant(participant); + scanParticipantStatus(participant); } } catch (PfModelException pfme) { LOGGER.warn("error reading participant from database", pfme); @@ -116,24 +129,49 @@ public class SupervisionScanner { } catch (PfModelException pfme) { LOGGER.warn("error reading control loops from database", pfme); } + if (counterCheck) { + scanParticipantUpdate(); + } LOGGER.debug("Control loop scan complete . . ."); } - private void scanParticipant(Participant participant) throws PfModelException { + private void scanParticipantUpdate() { + LOGGER.debug("Scanning participants to update . . ."); + + for (var id : participantUpdateCounter.keySet()) { + if (participantUpdateCounter.isFault(id)) { + LOGGER.debug("report Participant Update fault"); + + } else if (participantUpdateCounter.getDuration(id) > maxWaitMs) { + + if (participantUpdateCounter.count(id)) { + LOGGER.debug("retry message ParticipantUpdate"); + participantUpdatePublisher.send(id.getLeft(), id.getRight()); + } else { + LOGGER.debug("report Participant Update fault"); + participantUpdateCounter.setFault(id); + } + } + } + + LOGGER.debug("Participants to update scan complete . . ."); + } + + private void scanParticipantStatus(Participant participant) throws PfModelException { ToscaConceptIdentifier id = participant.getKey().asIdentifier(); - if (participantCounter.isFault(id)) { + if (participantStatusCounter.isFault(id)) { LOGGER.debug("report Participant fault"); return; } - if (participantCounter.getDuration(id) > maxMessageAgeMs) { - if (participantCounter.count(id)) { + if (participantStatusCounter.getDuration(id) > maxWaitMs) { + if (participantStatusCounter.count(id)) { LOGGER.debug("retry message ParticipantStatusReq"); participantStatusReqPublisher.send(id); participant.setHealthStatus(ParticipantHealthStatus.NOT_HEALTHY); } else { LOGGER.debug("report Participant fault"); - participantCounter.setFault(id); + participantStatusCounter.setFault(id); participant.setHealthStatus(ParticipantHealthStatus.OFF_LINE); } participantProvider.updateParticipants(List.of(participant)); @@ -144,7 +182,15 @@ public class SupervisionScanner { * handle participant Status message. */ public void handleParticipantStatus(ToscaConceptIdentifier id) { - participantCounter.clear(id); + participantStatusCounter.clear(id); + } + + public void handleParticipantRegister(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) { + participantUpdateCounter.clear(id); + } + + public void handleParticipantUpdateAck(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) { + participantUpdateCounter.remove(id); } private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException { diff --git a/runtime-controlloop/src/main/resources/application.yaml b/runtime-controlloop/src/main/resources/application.yaml index d0e5500d6..ea98aaa8c 100644 --- a/runtime-controlloop/src/main/resources/application.yaml +++ b/runtime-controlloop/src/main/resources/application.yaml @@ -24,9 +24,10 @@ runtime: participantClStateChangeIntervalSec: 1000 participantParameters: heartBeatMs: 120000 + maxMessageAgeMs: 600000 updateParameters: - maxRetryCount: 1 - maxWaitMs: 30000 + maxRetryCount: 3 + maxWaitMs: 100000 databaseProviderParameters: name: PolicyProviderParameterGroup implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl diff --git a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java index 485f58dba..717858ebe 100644 --- a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java +++ b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScannerTest.java @@ -43,6 +43,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher; import org.onap.policy.clamp.controlloop.runtime.util.CommonTestData; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; @@ -56,14 +57,15 @@ class SupervisionScannerTest { var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class); var participantProvider = mock(ParticipantProvider.class); var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class); + var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class); var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); var controlLoop = new ControlLoop(); when(controlLoopProvider.getControlLoops(null, null)).thenReturn(List.of(controlLoop)); - var supervisionScanner = - new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher, - participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup); + var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, + controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher, + participantUpdatePublisher, clRuntimeParameterGroup); supervisionScanner.run(false); verify(controlLoopProvider, times(0)).updateControlLoop(any(ControlLoop.class)); @@ -82,11 +84,12 @@ class SupervisionScannerTest { var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class); var participantProvider = mock(ParticipantProvider.class); var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class); + var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class); var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); - var supervisionScanner = - new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher, - participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup); + var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, + controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher, + participantUpdatePublisher, clRuntimeParameterGroup); supervisionScanner.run(false); verify(controlLoopProvider, times(1)).updateControlLoop(any(ControlLoop.class)); @@ -107,11 +110,12 @@ class SupervisionScannerTest { var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class); var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class); var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class); + var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class); var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); - var supervisionScanner = - new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher, - participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup); + var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, + controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher, + participantUpdatePublisher, clRuntimeParameterGroup); supervisionScanner.handleParticipantStatus(participant.getKey().asIdentifier()); supervisionScanner.run(true); @@ -126,7 +130,7 @@ class SupervisionScannerTest { when(controlLoopProvider.getControlLoops(null, null)).thenReturn(List.of(controlLoop)); var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant"); - clRuntimeParameterGroup.getParticipantParameters().setMaxMessageAgeMs(0); + clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().setMaxWaitMs(0); var participant = new Participant(); participant.setName("Participant0"); @@ -140,10 +144,11 @@ class SupervisionScannerTest { var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class); var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class); var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class); + var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class); - var supervisionScanner = - new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, controlLoopUpdatePublisher, - participantProvider, participantStatusReqPublisher, clRuntimeParameterGroup); + var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher, + controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher, + participantUpdatePublisher, clRuntimeParameterGroup); supervisionScanner.handleParticipantStatus(participant.getKey().asIdentifier()); supervisionScanner.run(true); |