From d5c645873589e0b56a6ad0edd5bd0d480896f765 Mon Sep 17 00:00:00 2001 From: FrancescoFioraEst Date: Tue, 20 Jul 2021 15:49:57 +0100 Subject: Add Retry and Timeout handling Implementaton of Supervision, Retry and Timeout handling on all Participant messages Issue-ID: POLICY-3455 Change-Id: Idfd53ea0b8f5bb1272703256b983a6cbeeb4fdf4 Signed-off-by: FrancescoFioraEst --- .../clamp/controlloop/runtime/Application.java | 2 + .../runtime/main/rest/CommissioningController.java | 14 +- .../runtime/main/rest/InstantiationController.java | 20 +-- .../main/rest/MonitoringQueryController.java | 17 +-- .../runtime/supervision/MessageIntercept.java | 32 +++++ .../runtime/supervision/SupervisionAspect.java | 71 ++++++++++ .../runtime/supervision/SupervisionHandler.java | 96 ++----------- .../runtime/supervision/SupervisionScanner.java | 151 ++++++++++++++++----- .../comm/ControlLoopUpdatePublisher.java | 33 ++--- .../comm/ParticipantDeregisterAckPublisher.java | 13 ++ .../comm/ParticipantRegisterAckPublisher.java | 13 ++ .../comm/ParticipantUpdateAckListener.java | 1 - .../comm/ParticipantUpdatePublisher.java | 50 +++++++ .../src/main/resources/application.yaml | 3 - 14 files changed, 343 insertions(+), 173 deletions(-) create mode 100644 runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java create mode 100644 runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java (limited to 'runtime-controlloop/src/main') diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java index 28814b354..5fbd36c06 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java @@ -24,7 +24,9 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.context.annotation.ComponentScan; +import org.springframework.scheduling.annotation.EnableScheduling; +@EnableScheduling @SpringBootApplication @ComponentScan({"org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider", "org.onap.policy.clamp.controlloop.runtime"}) diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java index 74548e724..67c615dcd 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java @@ -62,6 +62,8 @@ public class CommissioningController extends AbstractRestController { private static final Logger LOGGER = LoggerFactory.getLogger(CommissioningController.class); + private static final String TAGS = "Clamp Control Loop Commissioning API"; + private final CommissioningProvider provider; /** @@ -88,7 +90,7 @@ public class CommissioningController extends AbstractRestController { value = "Commissions control loop definitions", notes = "Commissions control loop definitions, returning the commissioned control loop definition IDs", response = CommissioningResponse.class, - tags = {"Control Loop Commissioning API"}, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -158,7 +160,7 @@ public class CommissioningController extends AbstractRestController { @ApiOperation(value = "Delete a commissioned control loop", notes = "Deletes a Commissioned Control Loop, returning optional error details", response = CommissioningResponse.class, - tags = {"Clamp Control Loop Commissioning API"}, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -233,7 +235,7 @@ public class CommissioningController extends AbstractRestController { notes = "Queries details of the requested commissioned control loop definitions, " + "returning all control loop details", response = ToscaNodeTemplate.class, - tags = {"Clamp Control Loop Commissioning API"}, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -302,7 +304,7 @@ public class CommissioningController extends AbstractRestController { notes = "Queries details of the requested commissioned tosca service template, " + "returning all tosca service template details", response = ToscaServiceTemplate.class, - tags = {"Clamp Control Loop Commissioning API"}, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -379,7 +381,7 @@ public class CommissioningController extends AbstractRestController { notes = "Queries details of the requested commissioned tosca service template json schema, " + "returning all tosca service template json schema details", response = ToscaServiceTemplate.class, - tags = {"Clamp Control Loop Commissioning API"}, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -448,7 +450,7 @@ public class CommissioningController extends AbstractRestController { notes = "Queries details of the requested commissioned control loop element definitions, " + "returning all control loop elements' details", response = ToscaNodeTemplate.class, - tags = {"Clamp Control Loop Commissioning API"}, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java index aba585e29..5a320e8dc 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java @@ -59,6 +59,8 @@ public class InstantiationController extends AbstractRestController { private static final Logger LOGGER = LoggerFactory.getLogger(InstantiationController.class); + private static final String TAGS = "Clamp Control Loop Instantiation API"; + // The CL provider for instantiation requests private final ControlLoopInstantiationProvider provider; @@ -86,7 +88,7 @@ public class InstantiationController extends AbstractRestController { value = "Commissions control loop definitions", notes = "Commissions control loop definitions, returning the control loop IDs", response = InstantiationResponse.class, - tags = {"Control Loop Instantiation API"}, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -154,9 +156,7 @@ public class InstantiationController extends AbstractRestController { @ApiOperation(value = "Query details of the requested control loops", notes = "Queries details of the requested control loops, returning all control loop details", response = ControlLoops.class, - tags = { - "Clamp control loop Instantiation API" - }, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -220,9 +220,7 @@ public class InstantiationController extends AbstractRestController { value = "Updates control loop definitions", notes = "Updates control loop definitions, returning the updated control loop definition IDs", response = InstantiationResponse.class, - tags = { - "Control Loop Instantiation API" - }, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -290,9 +288,7 @@ public class InstantiationController extends AbstractRestController { @ApiOperation(value = "Delete a control loop", notes = "Deletes a control loop, returning optional error details", response = InstantiationResponse.class, - tags = { - "Clamp Control Loop Instantiation API" - }, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -362,9 +358,7 @@ public class InstantiationController extends AbstractRestController { @ApiOperation(value = "Issue a command to the requested control loops", notes = "Issues a command to a control loop, ordering a state change on the control loop", response = InstantiationResponse.class, - tags = { - "Clamp Control Loop Instantiation API" - }, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java index 86531597a..7ac95003e 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java @@ -52,6 +52,7 @@ import org.springframework.web.bind.annotation.RestController; public class MonitoringQueryController extends AbstractRestController { private static final Logger LOGGER = LoggerFactory.getLogger(MonitoringQueryController.class); + private static final String TAGS = "Clamp Control Loop Monitoring API"; private final MonitoringProvider provider; /** @@ -80,9 +81,7 @@ public class MonitoringQueryController extends AbstractRestController { @ApiOperation(value = "Query details of the requested participant stats", notes = "Queries details of the requested participant stats, returning all participant stats", response = ParticipantStatisticsList.class, - tags = { - "Clamp control loop Monitoring API" - }, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -168,9 +167,7 @@ public class MonitoringQueryController extends AbstractRestController { @ApiOperation(value = "Query details of all the participant stats in a control loop", notes = "Queries details of the participant stats, returning all participant stats", response = ClElementStatisticsList.class, - tags = { - "Clamp control loop Monitoring API" - }, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -235,9 +232,7 @@ public class MonitoringQueryController extends AbstractRestController { @ApiOperation(value = "Query details of the requested cl element stats in a control loop", notes = "Queries details of the requested cl element stats, returning all clElement stats", response = ClElementStatisticsList.class, - tags = { - "Clamp control loop Monitoring API" - }, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( @@ -306,9 +301,7 @@ public class MonitoringQueryController extends AbstractRestController { @ApiOperation(value = "Query details of the requested cl element stats", notes = "Queries details of the requested cl element stats, returning all clElement stats", response = ClElementStatisticsList.class, - tags = { - "Clamp control loop Monitoring API" - }, + tags = {TAGS}, authorizations = @Authorization(value = AUTHORIZATION_TYPE), responseHeaders = { @ResponseHeader( diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java new file mode 100644 index 000000000..c23ed833d --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface MessageIntercept { + +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java new file mode 100644 index 000000000..293b5d5da --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java @@ -0,0 +1,71 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import org.aspectj.lang.annotation.After; +import org.aspectj.lang.annotation.Aspect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Aspect +@Component +@RequiredArgsConstructor +public class SupervisionAspect implements Closeable { + + private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class); + + private final SupervisionScanner supervisionScanner; + + private ThreadPoolExecutor executor = + new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + + @Scheduled( + fixedRateString = "${runtime.participantParameters.heartBeatMs}", + initialDelayString = "${runtime.participantParameters.heartBeatMs}") + public void schedule() { + LOGGER.info("Add scheduled scanning"); + executor.execute(() -> supervisionScanner.run(true)); + } + + /** + * Intercept Messages from participant and run Supervision Scan. + */ + @After("@annotation(MessageIntercept)") + public void doCheck() { + if (executor.getQueue().size() < 2) { + LOGGER.debug("Add scanning Message"); + executor.execute(() -> supervisionScanner.run(false)); + } + } + + @Override + public void close() throws IOException { + executor.shutdown(); + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java index 56a1ba9b3..dadfe0de2 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java @@ -20,45 +20,27 @@ package org.onap.policy.clamp.controlloop.runtime.supervision; -import java.time.Instant; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.UUID; import javax.ws.rs.core.Response; import lombok.AllArgsConstructor; import org.apache.commons.collections4.CollectionUtils; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; -import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; -import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider; -import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher; -import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; -import org.onap.policy.common.utils.services.ServiceManager; -import org.onap.policy.common.utils.services.ServiceManagerException; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; @@ -84,7 +66,6 @@ public class SupervisionHandler { private final ControlLoopProvider controlLoopProvider; private final ParticipantProvider participantProvider; private final MonitoringProvider monitoringProvider; - private final CommissioningProvider commissioningProvider; // Publishers for participant communication private final ControlLoopUpdatePublisher controlLoopUpdatePublisher; @@ -130,6 +111,7 @@ public class SupervisionHandler { * * @param participantStatusMessage the ParticipantStatus message received from a participant */ + @MessageIntercept public void handleParticipantMessage(ParticipantStatus participantStatusMessage) { LOGGER.debug("Participant Status received {}", participantStatusMessage); try { @@ -151,10 +133,14 @@ public class SupervisionHandler { * * @param participantRegisterMessage the ParticipantRegister message received from a participant */ + @MessageIntercept public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) { LOGGER.debug("Participant Register received {}", participantRegisterMessage); - sendParticipantAckMessage(participantRegisterMessage); - sendParticipantUpdate(participantRegisterMessage); + + participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId()); + + participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(), + participantRegisterMessage.getParticipantType()); } /** @@ -162,9 +148,10 @@ public class SupervisionHandler { * * @param participantDeregisterMessage the ParticipantDeregister message received from a participant */ + @MessageIntercept public void handleParticipantMessage(ParticipantDeregister participantDeregisterMessage) { LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage); - sendParticipantAckMessage(participantDeregisterMessage); + participantDeregisterAckPublisher.send(participantDeregisterMessage.getMessageId()); } /** @@ -172,6 +159,7 @@ public class SupervisionHandler { * * @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant */ + @MessageIntercept public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) { LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage); } @@ -289,70 +277,6 @@ public class SupervisionHandler { } } - private void sendControlLoopUpdate(ControlLoop controlLoop) throws PfModelException { - var controlLoopUpdateMsg = new ControlLoopUpdate(); - controlLoopUpdateMsg.setControlLoopId(controlLoop.getKey().asIdentifier()); - controlLoopUpdateMsg.setControlLoop(controlLoop); - // TODO: We should look up the correct TOSCA node template here for the control loop - // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap - controlLoopUpdateMsg.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null)); - controlLoopUpdatePublisher.send(controlLoopUpdateMsg); - } - - private void sendControlLoopStateChange(ControlLoop controlLoop) { - var clsc = new ControlLoopStateChange(); - clsc.setControlLoopId(controlLoop.getKey().asIdentifier()); - clsc.setMessageId(UUID.randomUUID()); - clsc.setOrderedState(controlLoop.getOrderedState()); - controlLoopStateChangePublisher.send(clsc); - } - - private void sendParticipantUpdate(ParticipantRegister participantRegisterMessage) { - var message = new ParticipantUpdate(); - message.setParticipantId(participantRegisterMessage.getParticipantId()); - message.setParticipantType(participantRegisterMessage.getParticipantType()); - message.setTimestamp(Instant.now()); - - ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition(); - clDefinition.setId(UUID.randomUUID()); - - try { - clDefinition.setControlLoopElementToscaServiceTemplate(commissioningProvider - .getToscaServiceTemplate(null, null)); - } catch (PfModelException pfme) { - LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme); - return; - } - - Map controlLoopElementDefinitionMap = new LinkedHashMap<>(); - controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition); - - Map> - participantDefinitionUpdateMap = new LinkedHashMap<>(); - participantDefinitionUpdateMap.put(participantRegisterMessage.getParticipantId(), - controlLoopElementDefinitionMap); - message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap); - - LOGGER.debug("Participant Update sent", message); - participantUpdatePublisher.send(message); - } - - private void sendParticipantAckMessage(ParticipantRegister participantRegisterMessage) { - var message = new ParticipantRegisterAck(); - message.setResponseTo(participantRegisterMessage.getMessageId()); - message.setMessage("Participant Register Ack"); - message.setResult(true); - participantRegisterAckPublisher.send(message); - } - - private void sendParticipantAckMessage(ParticipantDeregister participantDeregisterMessage) { - var message = new ParticipantDeregisterAck(); - message.setResponseTo(participantDeregisterMessage.getMessageId()); - message.setMessage("Participant Deregister Ack"); - message.setResult(true); - participantDeregisterAckPublisher.send(message); - } - private void superviseParticipant(ParticipantStatus participantStatusMessage) throws PfModelException, ControlLoopException { if (participantStatusMessage.getParticipantId() == null) { diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java index 68f5830c0..b360f6703 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java @@ -20,15 +20,21 @@ package org.onap.policy.clamp.controlloop.runtime.supervision; -import java.io.Closeable; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import lombok.Getter; +import lombok.Setter; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher; import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -37,35 +43,83 @@ import org.springframework.stereotype.Component; * This class is used to scan the control loops in the database and check if they are in the correct state. */ @Component -public class SupervisionScanner implements Runnable, Closeable { +public class SupervisionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class); - private ControlLoopProvider controlLoopProvider; - private ScheduledExecutorService timerPool; + @Getter + @Setter + static class HandleCounter { + private int maxRetryCount; + private long maxWaitMs; + private Map mapCounter = new HashMap<>(); + private Set mapFault = new HashSet<>(); + + public void clear(ToscaConceptIdentifier id) { + mapCounter.put(id, 0); + mapFault.remove(id); + } + + public void setFault(ToscaConceptIdentifier id) { + mapCounter.put(id, 0); + mapFault.add(id); + } + + public boolean count(ToscaConceptIdentifier id) { + int counter = mapCounter.getOrDefault(id, 0) + 1; + if (counter <= maxRetryCount) { + mapCounter.put(id, counter); + return true; + } + return false; + } + + public boolean isFault(ToscaConceptIdentifier id) { + return mapFault.contains(id); + } + + public int getCounter(ToscaConceptIdentifier id) { + return mapCounter.getOrDefault(id, 0); + } + } + + private HandleCounter stateChange = new HandleCounter(); + + private final ControlLoopProvider controlLoopProvider; + private final ControlLoopStateChangePublisher controlLoopStateChangePublisher; + private final ControlLoopUpdatePublisher controlLoopUpdatePublisher; /** * Constructor for instantiating SupervisionScanner. * - * @param clRuntimeParameterGroup the parameters for the control loop runtime * @param controlLoopProvider the provider to use to read control loops from the database + * @param controlLoopStateChangePublisher the ControlLoopStateChange Publisher + * @param clRuntimeParameterGroup the parameters for the control loop runtime */ public SupervisionScanner(final ControlLoopProvider controlLoopProvider, - ClRuntimeParameterGroup clRuntimeParameterGroup) { + final ControlLoopStateChangePublisher controlLoopStateChangePublisher, + ControlLoopUpdatePublisher controlLoopUpdatePublisher, + final ClRuntimeParameterGroup clRuntimeParameterGroup) { this.controlLoopProvider = controlLoopProvider; + this.controlLoopStateChangePublisher = controlLoopStateChangePublisher; + this.controlLoopUpdatePublisher = controlLoopUpdatePublisher; - // Kick off the timer - timerPool = makeTimerPool(); - timerPool.scheduleAtFixedRate(this, 0, clRuntimeParameterGroup.getSupervisionScannerIntervalSec(), - TimeUnit.SECONDS); + stateChange.setMaxRetryCount( + clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); + stateChange.setMaxWaitMs( + clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); } - @Override - public void run() { + /** + * Run Scanning. + * + * @param counterCheck if true activate counter and retry + */ + public void run(boolean counterCheck) { LOGGER.debug("Scanning control loops in the database . . ."); try { for (ControlLoop controlLoop : controlLoopProvider.getControlLoops(null, null)) { - scanControlLoop(controlLoop); + scanControlLoop(controlLoop, counterCheck); } } catch (PfModelException pfme) { LOGGER.warn("error reading control loops from database", pfme); @@ -74,40 +128,65 @@ public class SupervisionScanner implements Runnable, Closeable { LOGGER.debug("Control loop scan complete . . ."); } - @Override - public void close() { - timerPool.shutdown(); - } - - private void scanControlLoop(final ControlLoop controlLoop) throws PfModelException { + private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException { LOGGER.debug("scanning control loop {} . . .", controlLoop.getKey().asIdentifier()); if (controlLoop.getState().equals(controlLoop.getOrderedState().asState())) { LOGGER.debug("control loop {} scanned, OK", controlLoop.getKey().asIdentifier()); + + // Clear missed report counter on Control Loop + clearFaultAndCounter(controlLoop); return; } + boolean completed = true; for (ControlLoopElement element : controlLoop.getElements().values()) { if (!element.getState().equals(element.getOrderedState().asState())) { - LOGGER.debug("control loop scan: transitioning from state {} to {}", controlLoop.getState(), - controlLoop.getOrderedState()); - return; + completed = false; + break; } } - LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(), - controlLoop.getOrderedState()); + if (completed) { + LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(), + controlLoop.getOrderedState()); + + controlLoop.setState(controlLoop.getOrderedState().asState()); + controlLoopProvider.updateControlLoop(controlLoop); - controlLoop.setState(controlLoop.getOrderedState().asState()); - controlLoopProvider.updateControlLoop(controlLoop); + // Clear missed report counter on Control Loop + clearFaultAndCounter(controlLoop); + } else { + LOGGER.debug("control loop scan: transition from state {} to {} not completed", controlLoop.getState(), + controlLoop.getOrderedState()); + if (counterCheck) { + handleCounter(controlLoop); + } + } } - /** - * Makes a new timer pool. - * - * @return a new timer pool - */ - protected ScheduledExecutorService makeTimerPool() { - return Executors.newScheduledThreadPool(1); + private void clearFaultAndCounter(ControlLoop controlLoop) { + stateChange.clear(controlLoop.getKey().asIdentifier()); + } + + private void handleCounter(ControlLoop controlLoop) { + ToscaConceptIdentifier id = controlLoop.getKey().asIdentifier(); + if (stateChange.isFault(id)) { + LOGGER.debug("report ControlLoop fault"); + return; + } + + if (stateChange.count(id)) { + if (ControlLoopState.UNINITIALISED2PASSIVE.equals(controlLoop.getState())) { + LOGGER.debug("retry message ControlLoopUpdate"); + controlLoopUpdatePublisher.send(controlLoop); + } else { + LOGGER.debug("retry message ControlLoopStateChange"); + controlLoopStateChangePublisher.send(controlLoop); + } + } else { + LOGGER.debug("report ControlLoop fault"); + stateChange.setFault(id); + } } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java index e562343ff..e366ba49a 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java @@ -20,42 +20,43 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; +import lombok.AllArgsConstructor; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate; import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider; import org.onap.policy.models.base.PfModelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * This class is used to send ControlLoopUpdate messages to participants on DMaaP. */ @Component +@AllArgsConstructor public class ControlLoopUpdatePublisher extends AbstractParticipantPublisher { - private final CommissioningProvider commissioningProvider; + private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdatePublisher.class); - /** - * Constructor. - * - * @param commissioningProvider the CommissioningProvider - */ - public ControlLoopUpdatePublisher(CommissioningProvider commissioningProvider) { - this.commissioningProvider = commissioningProvider; - } + private final CommissioningProvider commissioningProvider; /** * Send ControlLoopUpdate to Participant. * * @param controlLoop the ControlLoop - * @throws PfModelException on errors getting the Control Loop Definition */ - public void send(ControlLoop controlLoop) throws PfModelException { - var pclu = new ControlLoopUpdate(); - pclu.setControlLoopId(controlLoop.getKey().asIdentifier()); - pclu.setControlLoop(controlLoop); + public void send(ControlLoop controlLoop) { + var controlLoopUpdateMsg = new ControlLoopUpdate(); + controlLoopUpdateMsg.setControlLoopId(controlLoop.getKey().asIdentifier()); + controlLoopUpdateMsg.setControlLoop(controlLoop); // TODO: We should look up the correct TOSCA node template here for the control loop // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap - pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null)); - super.send(pclu); + try { + controlLoopUpdateMsg.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null)); + } catch (PfModelException pfme) { + LOGGER.warn("Get of tosca service template failed, cannot send ParticipantControlLoopUpdate", pfme); + return; + } + super.send(controlLoopUpdateMsg); } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java index c0fcb3e7d..e92b6ee1b 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; +import java.util.UUID; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; import org.springframework.stereotype.Component; @@ -29,4 +30,16 @@ import org.springframework.stereotype.Component; @Component public class ParticipantDeregisterAckPublisher extends AbstractParticipantAckPublisher { + /** + * Sent ParticipantDeregisterAck to Participant. + * + * @param responseTo the original request id in the request. + */ + public void send(UUID responseTo) { + var message = new ParticipantDeregisterAck(); + message.setResponseTo(responseTo); + message.setMessage("Participant Deregister Ack"); + message.setResult(true); + super.send(message); + } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java index 2c0c4b393..73860b5c2 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; +import java.util.UUID; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; import org.springframework.stereotype.Component; @@ -29,4 +30,16 @@ import org.springframework.stereotype.Component; @Component public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPublisher { + /** + * Send ParticipantRegisterAck to Participant. + * + * @param responseTo the original request id in the request. + */ + public void send(UUID responseTo) { + var message = new ParticipantRegisterAck(); + message.setResponseTo(responseTo); + message.setMessage("Participant Register Ack"); + message.setResult(true); + super.send(message); + } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java index b8538b1f7..4eeb0a8ce 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java @@ -21,7 +21,6 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener; import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java index 5af5f1f54..88cf90d02 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java @@ -20,13 +20,63 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import lombok.AllArgsConstructor; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * This class is used to send ParticipantUpdate messages to participants on DMaaP. */ @Component +@AllArgsConstructor public class ParticipantUpdatePublisher extends AbstractParticipantPublisher { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantUpdatePublisher.class); + + private final CommissioningProvider commissioningProvider; + + /** + * Send ParticipantUpdate to Participant. + * + * @param participantId the participant Id + * @param participantType the participant Type + */ + public void send(ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) { + var message = new ParticipantUpdate(); + message.setParticipantId(participantId); + message.setParticipantType(participantType); + message.setTimestamp(Instant.now()); + + var clDefinition = new ControlLoopElementDefinition(); + clDefinition.setId(UUID.randomUUID()); + + try { + clDefinition.setControlLoopElementToscaServiceTemplate( + commissioningProvider.getToscaServiceTemplate(null, null)); + } catch (PfModelException pfme) { + LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme); + return; + } + + Map controlLoopElementDefinitionMap = new LinkedHashMap<>(); + controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition); + + Map> participantDefinitionUpdateMap = + new LinkedHashMap<>(); + participantDefinitionUpdateMap.put(participantId, controlLoopElementDefinitionMap); + message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap); + + LOGGER.debug("Participant Update sent {}", message); + super.send(message); + } } diff --git a/runtime-controlloop/src/main/resources/application.yaml b/runtime-controlloop/src/main/resources/application.yaml index 01466abf6..1d36b6724 100644 --- a/runtime-controlloop/src/main/resources/application.yaml +++ b/runtime-controlloop/src/main/resources/application.yaml @@ -27,9 +27,6 @@ runtime: updateParameters: maxRetryCount: 1 maxWaitMs: 30000 - stateChangeParameters: - maxRetryCount: 1 - maxWaitMs: 30000 databaseProviderParameters: name: PolicyProviderParameterGroup implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl -- cgit 1.2.3-korg