diff options
Diffstat (limited to 'runtime-controlloop/src/main/java')
4 files changed, 217 insertions, 52 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java new file mode 100644 index 000000000..2151dc143 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/HandleCounter.java @@ -0,0 +1,91 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision; + +import java.time.Instant; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import lombok.Getter; +import lombok.Setter; + +public class HandleCounter<K> { + @Getter + @Setter + private long maxWaitMs; + + @Getter + @Setter + private int maxRetryCount; + + private Map<K, Integer> mapCounter = new HashMap<>(); + private Set<K> mapFault = new HashSet<>(); + private Map<K, Long> mapTimer = new HashMap<>(); + + public long getDuration(K id) { + mapTimer.putIfAbsent(id, getEpochMilli()); + return getEpochMilli() - mapTimer.get(id); + } + + /** + * Reset timer and clear counter and fault by id. + * + * @param id the id + */ + public void clear(K id) { + mapFault.remove(id); + mapCounter.put(id, 0); + mapTimer.put(id, getEpochMilli()); + } + + public void setFault(K id) { + mapCounter.put(id, 0); + mapFault.add(id); + } + + /** + * Increment RetryCount by id e return true if minor or equal of maxRetryCount. + * + * @param id the identifier + * @return false if count is major of maxRetryCount + */ + public boolean count(K id) { + int counter = mapCounter.getOrDefault(id, 0) + 1; + if (counter <= maxRetryCount) { + mapCounter.put(id, counter); + return true; + } + return false; + } + + public boolean isFault(K id) { + return mapFault.contains(id); + } + + public int getCounter(K id) { + return mapCounter.getOrDefault(id, 0); + } + + protected long getEpochMilli() { + return Instant.now().toEpochMilli(); + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java index 293b5d5da..d0d18ab1a 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import org.aspectj.lang.annotation.After; import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -64,6 +66,11 @@ public class SupervisionAspect implements Closeable { } } + @Before("@annotation(MessageIntercept) && args(participantStatusMessage,..)") + public void handleParticipantStatus(ParticipantStatus participantStatusMessage) { + executor.execute(() -> supervisionScanner.handleParticipantStatus(participantStatusMessage.getParticipantId())); + } + @Override public void close() throws IOException { executor.shutdown(); diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java index b360f6703..7be407c3f 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java @@ -20,19 +20,18 @@ package org.onap.policy.clamp.controlloop.runtime.supervision; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import lombok.Getter; -import lombok.Setter; +import java.util.List; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; +import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; @@ -46,67 +45,49 @@ import org.springframework.stereotype.Component; public class SupervisionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class); - @Getter - @Setter - static class HandleCounter { - private int maxRetryCount; - private long maxWaitMs; - private Map<ToscaConceptIdentifier, Integer> mapCounter = new HashMap<>(); - private Set<ToscaConceptIdentifier> mapFault = new HashSet<>(); - - public void clear(ToscaConceptIdentifier id) { - mapCounter.put(id, 0); - mapFault.remove(id); - } - - public void setFault(ToscaConceptIdentifier id) { - mapCounter.put(id, 0); - mapFault.add(id); - } - - public boolean count(ToscaConceptIdentifier id) { - int counter = mapCounter.getOrDefault(id, 0) + 1; - if (counter <= maxRetryCount) { - mapCounter.put(id, counter); - return true; - } - return false; - } - - public boolean isFault(ToscaConceptIdentifier id) { - return mapFault.contains(id); - } - - public int getCounter(ToscaConceptIdentifier id) { - return mapCounter.getOrDefault(id, 0); - } - } - - private HandleCounter stateChange = new HandleCounter(); + private HandleCounter<ToscaConceptIdentifier> controlLoopCounter = new HandleCounter<>(); + private HandleCounter<ToscaConceptIdentifier> participantCounter = new HandleCounter<>(); private final ControlLoopProvider controlLoopProvider; private final ControlLoopStateChangePublisher controlLoopStateChangePublisher; private final ControlLoopUpdatePublisher controlLoopUpdatePublisher; + private final ParticipantProvider participantProvider; + private final ParticipantStatusReqPublisher participantStatusReqPublisher; + + private final long maxMessageAgeMs; /** * Constructor for instantiating SupervisionScanner. * * @param controlLoopProvider the provider to use to read control loops from the database - * @param controlLoopStateChangePublisher the ControlLoopStateChange Publisher + * @param controlLoopStateChangePublisher the ControlLoop StateChange Publisher + * @param controlLoopUpdatePublisher the ControlLoopUpdate Publisher + * @param participantProvider the Participant Provider + * @param participantStatusReqPublisher the Participant StatusReq Publisher * @param clRuntimeParameterGroup the parameters for the control loop runtime */ public SupervisionScanner(final ControlLoopProvider controlLoopProvider, final ControlLoopStateChangePublisher controlLoopStateChangePublisher, - ControlLoopUpdatePublisher controlLoopUpdatePublisher, + ControlLoopUpdatePublisher controlLoopUpdatePublisher, ParticipantProvider participantProvider, + ParticipantStatusReqPublisher participantStatusReqPublisher, final ClRuntimeParameterGroup clRuntimeParameterGroup) { this.controlLoopProvider = controlLoopProvider; this.controlLoopStateChangePublisher = controlLoopStateChangePublisher; this.controlLoopUpdatePublisher = controlLoopUpdatePublisher; + this.participantProvider = participantProvider; + this.participantStatusReqPublisher = participantStatusReqPublisher; + + controlLoopCounter.setMaxRetryCount( + clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); + controlLoopCounter + .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); - stateChange.setMaxRetryCount( + participantCounter.setMaxRetryCount( clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); - stateChange.setMaxWaitMs( - clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); + participantCounter + .setMaxWaitMs(clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); + + maxMessageAgeMs = clRuntimeParameterGroup.getParticipantParameters().getMaxMessageAgeMs(); } /** @@ -117,6 +98,17 @@ public class SupervisionScanner { public void run(boolean counterCheck) { LOGGER.debug("Scanning control loops in the database . . ."); + if (counterCheck) { + try { + for (Participant participant : participantProvider.getParticipants(null, null)) { + scanParticipant(participant); + } + } catch (PfModelException pfme) { + LOGGER.warn("error reading participant from database", pfme); + return; + } + } + try { for (ControlLoop controlLoop : controlLoopProvider.getControlLoops(null, null)) { scanControlLoop(controlLoop, counterCheck); @@ -128,6 +120,33 @@ public class SupervisionScanner { LOGGER.debug("Control loop scan complete . . ."); } + private void scanParticipant(Participant participant) throws PfModelException { + ToscaConceptIdentifier id = participant.getKey().asIdentifier(); + if (participantCounter.isFault(id)) { + LOGGER.debug("report Participant fault"); + return; + } + if (participantCounter.getDuration(id) > maxMessageAgeMs) { + if (participantCounter.count(id)) { + LOGGER.debug("retry message ParticipantStatusReq"); + participantStatusReqPublisher.send(id); + participant.setHealthStatus(ParticipantHealthStatus.NOT_HEALTHY); + } else { + LOGGER.debug("report Participant fault"); + participantCounter.setFault(id); + participant.setHealthStatus(ParticipantHealthStatus.OFF_LINE); + } + participantProvider.updateParticipants(List.of(participant)); + } + } + + /** + * handle participant Status message. + */ + public void handleParticipantStatus(ToscaConceptIdentifier id) { + participantCounter.clear(id); + } + private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException { LOGGER.debug("scanning control loop {} . . .", controlLoop.getKey().asIdentifier()); @@ -166,17 +185,17 @@ public class SupervisionScanner { } private void clearFaultAndCounter(ControlLoop controlLoop) { - stateChange.clear(controlLoop.getKey().asIdentifier()); + controlLoopCounter.clear(controlLoop.getKey().asIdentifier()); } private void handleCounter(ControlLoop controlLoop) { ToscaConceptIdentifier id = controlLoop.getKey().asIdentifier(); - if (stateChange.isFault(id)) { + if (controlLoopCounter.isFault(id)) { LOGGER.debug("report ControlLoop fault"); return; } - if (stateChange.count(id)) { + if (controlLoopCounter.count(id)) { if (ControlLoopState.UNINITIALISED2PASSIVE.equals(controlLoop.getState())) { LOGGER.debug("retry message ControlLoopUpdate"); controlLoopUpdatePublisher.send(controlLoop); @@ -186,7 +205,7 @@ public class SupervisionScanner { } } else { LOGGER.debug("report ControlLoop fault"); - stateChange.setFault(id); + controlLoopCounter.setFault(id); } } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusReqPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusReqPublisher.java new file mode 100644 index 000000000..69d598285 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusReqPublisher.java @@ -0,0 +1,48 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import java.time.Instant; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher<ParticipantStatusReq> { + + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusReqPublisher.class); + + /** + * Send ParticipantStatusReq to Participant. + * + * @param participantId the participant Id + */ + public void send(ToscaConceptIdentifier participantId) { + ParticipantStatusReq message = new ParticipantStatusReq(); + message.setParticipantId(participantId); + message.setTimestamp(Instant.now()); + + LOGGER.debug("Participant StatusReq sent {}", message); + super.send(message); + } +} |