summaryrefslogtreecommitdiffstats
path: root/runtime-controlloop/src/main/java
diff options
context:
space:
mode:
authorFrancescoFioraEst <francesco.fiora@est.tech>2021-07-20 15:49:57 +0100
committerFrancescoFioraEst <francesco.fiora@est.tech>2021-07-26 14:55:31 +0100
commitd5c645873589e0b56a6ad0edd5bd0d480896f765 (patch)
tree928119de9a51ed93651bc2a8fd5876b82d65c454 /runtime-controlloop/src/main/java
parent395f61064ab29a9506a18bae505ebf1da3315e94 (diff)
Add Retry and Timeout handling
Implementaton of Supervision, Retry and Timeout handling on all Participant messages Issue-ID: POLICY-3455 Change-Id: Idfd53ea0b8f5bb1272703256b983a6cbeeb4fdf4 Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-controlloop/src/main/java')
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java2
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java14
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java20
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java17
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java32
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java71
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java96
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java151
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java33
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java13
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java13
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java1
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java50
13 files changed, 343 insertions, 170 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java
index 28814b354..5fbd36c06 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java
@@ -24,7 +24,9 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.context.annotation.ComponentScan;
+import org.springframework.scheduling.annotation.EnableScheduling;
+@EnableScheduling
@SpringBootApplication
@ComponentScan({"org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider",
"org.onap.policy.clamp.controlloop.runtime"})
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java
index 74548e724..67c615dcd 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java
@@ -62,6 +62,8 @@ public class CommissioningController extends AbstractRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(CommissioningController.class);
+ private static final String TAGS = "Clamp Control Loop Commissioning API";
+
private final CommissioningProvider provider;
/**
@@ -88,7 +90,7 @@ public class CommissioningController extends AbstractRestController {
value = "Commissions control loop definitions",
notes = "Commissions control loop definitions, returning the commissioned control loop definition IDs",
response = CommissioningResponse.class,
- tags = {"Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -158,7 +160,7 @@ public class CommissioningController extends AbstractRestController {
@ApiOperation(value = "Delete a commissioned control loop",
notes = "Deletes a Commissioned Control Loop, returning optional error details",
response = CommissioningResponse.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -233,7 +235,7 @@ public class CommissioningController extends AbstractRestController {
notes = "Queries details of the requested commissioned control loop definitions, "
+ "returning all control loop details",
response = ToscaNodeTemplate.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -302,7 +304,7 @@ public class CommissioningController extends AbstractRestController {
notes = "Queries details of the requested commissioned tosca service template, "
+ "returning all tosca service template details",
response = ToscaServiceTemplate.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -379,7 +381,7 @@ public class CommissioningController extends AbstractRestController {
notes = "Queries details of the requested commissioned tosca service template json schema, "
+ "returning all tosca service template json schema details",
response = ToscaServiceTemplate.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -448,7 +450,7 @@ public class CommissioningController extends AbstractRestController {
notes = "Queries details of the requested commissioned control loop element definitions, "
+ "returning all control loop elements' details",
response = ToscaNodeTemplate.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java
index aba585e29..5a320e8dc 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java
@@ -59,6 +59,8 @@ public class InstantiationController extends AbstractRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(InstantiationController.class);
+ private static final String TAGS = "Clamp Control Loop Instantiation API";
+
// The CL provider for instantiation requests
private final ControlLoopInstantiationProvider provider;
@@ -86,7 +88,7 @@ public class InstantiationController extends AbstractRestController {
value = "Commissions control loop definitions",
notes = "Commissions control loop definitions, returning the control loop IDs",
response = InstantiationResponse.class,
- tags = {"Control Loop Instantiation API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -154,9 +156,7 @@ public class InstantiationController extends AbstractRestController {
@ApiOperation(value = "Query details of the requested control loops",
notes = "Queries details of the requested control loops, returning all control loop details",
response = ControlLoops.class,
- tags = {
- "Clamp control loop Instantiation API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -220,9 +220,7 @@ public class InstantiationController extends AbstractRestController {
value = "Updates control loop definitions",
notes = "Updates control loop definitions, returning the updated control loop definition IDs",
response = InstantiationResponse.class,
- tags = {
- "Control Loop Instantiation API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -290,9 +288,7 @@ public class InstantiationController extends AbstractRestController {
@ApiOperation(value = "Delete a control loop",
notes = "Deletes a control loop, returning optional error details",
response = InstantiationResponse.class,
- tags = {
- "Clamp Control Loop Instantiation API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -362,9 +358,7 @@ public class InstantiationController extends AbstractRestController {
@ApiOperation(value = "Issue a command to the requested control loops",
notes = "Issues a command to a control loop, ordering a state change on the control loop",
response = InstantiationResponse.class,
- tags = {
- "Clamp Control Loop Instantiation API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java
index 86531597a..7ac95003e 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java
@@ -52,6 +52,7 @@ import org.springframework.web.bind.annotation.RestController;
public class MonitoringQueryController extends AbstractRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(MonitoringQueryController.class);
+ private static final String TAGS = "Clamp Control Loop Monitoring API";
private final MonitoringProvider provider;
/**
@@ -80,9 +81,7 @@ public class MonitoringQueryController extends AbstractRestController {
@ApiOperation(value = "Query details of the requested participant stats",
notes = "Queries details of the requested participant stats, returning all participant stats",
response = ParticipantStatisticsList.class,
- tags = {
- "Clamp control loop Monitoring API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -168,9 +167,7 @@ public class MonitoringQueryController extends AbstractRestController {
@ApiOperation(value = "Query details of all the participant stats in a control loop",
notes = "Queries details of the participant stats, returning all participant stats",
response = ClElementStatisticsList.class,
- tags = {
- "Clamp control loop Monitoring API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -235,9 +232,7 @@ public class MonitoringQueryController extends AbstractRestController {
@ApiOperation(value = "Query details of the requested cl element stats in a control loop",
notes = "Queries details of the requested cl element stats, returning all clElement stats",
response = ClElementStatisticsList.class,
- tags = {
- "Clamp control loop Monitoring API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -306,9 +301,7 @@ public class MonitoringQueryController extends AbstractRestController {
@ApiOperation(value = "Query details of the requested cl element stats",
notes = "Queries details of the requested cl element stats, returning all clElement stats",
response = ClElementStatisticsList.class,
- tags = {
- "Clamp control loop Monitoring API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java
new file mode 100644
index 000000000..c23ed833d
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface MessageIntercept {
+
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
new file mode 100644
index 000000000..293b5d5da
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
@@ -0,0 +1,71 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import org.aspectj.lang.annotation.After;
+import org.aspectj.lang.annotation.Aspect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Aspect
+@Component
+@RequiredArgsConstructor
+public class SupervisionAspect implements Closeable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class);
+
+ private final SupervisionScanner supervisionScanner;
+
+ private ThreadPoolExecutor executor =
+ new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+
+ @Scheduled(
+ fixedRateString = "${runtime.participantParameters.heartBeatMs}",
+ initialDelayString = "${runtime.participantParameters.heartBeatMs}")
+ public void schedule() {
+ LOGGER.info("Add scheduled scanning");
+ executor.execute(() -> supervisionScanner.run(true));
+ }
+
+ /**
+ * Intercept Messages from participant and run Supervision Scan.
+ */
+ @After("@annotation(MessageIntercept)")
+ public void doCheck() {
+ if (executor.getQueue().size() < 2) {
+ LOGGER.debug("Add scanning Message");
+ executor.execute(() -> supervisionScanner.run(false));
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ executor.shutdown();
+ }
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
index 56a1ba9b3..dadfe0de2 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
@@ -20,45 +20,27 @@
package org.onap.policy.clamp.controlloop.runtime.supervision;
-import java.time.Instant;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
-import java.util.UUID;
import javax.ws.rs.core.Response;
import lombok.AllArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
-import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
-import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
-import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher;
-import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
-import org.onap.policy.common.utils.services.ServiceManager;
-import org.onap.policy.common.utils.services.ServiceManagerException;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
@@ -84,7 +66,6 @@ public class SupervisionHandler {
private final ControlLoopProvider controlLoopProvider;
private final ParticipantProvider participantProvider;
private final MonitoringProvider monitoringProvider;
- private final CommissioningProvider commissioningProvider;
// Publishers for participant communication
private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
@@ -130,6 +111,7 @@ public class SupervisionHandler {
*
* @param participantStatusMessage the ParticipantStatus message received from a participant
*/
+ @MessageIntercept
public void handleParticipantMessage(ParticipantStatus participantStatusMessage) {
LOGGER.debug("Participant Status received {}", participantStatusMessage);
try {
@@ -151,10 +133,14 @@ public class SupervisionHandler {
*
* @param participantRegisterMessage the ParticipantRegister message received from a participant
*/
+ @MessageIntercept
public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
LOGGER.debug("Participant Register received {}", participantRegisterMessage);
- sendParticipantAckMessage(participantRegisterMessage);
- sendParticipantUpdate(participantRegisterMessage);
+
+ participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId());
+
+ participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(),
+ participantRegisterMessage.getParticipantType());
}
/**
@@ -162,9 +148,10 @@ public class SupervisionHandler {
*
* @param participantDeregisterMessage the ParticipantDeregister message received from a participant
*/
+ @MessageIntercept
public void handleParticipantMessage(ParticipantDeregister participantDeregisterMessage) {
LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage);
- sendParticipantAckMessage(participantDeregisterMessage);
+ participantDeregisterAckPublisher.send(participantDeregisterMessage.getMessageId());
}
/**
@@ -172,6 +159,7 @@ public class SupervisionHandler {
*
* @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant
*/
+ @MessageIntercept
public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) {
LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage);
}
@@ -289,70 +277,6 @@ public class SupervisionHandler {
}
}
- private void sendControlLoopUpdate(ControlLoop controlLoop) throws PfModelException {
- var controlLoopUpdateMsg = new ControlLoopUpdate();
- controlLoopUpdateMsg.setControlLoopId(controlLoop.getKey().asIdentifier());
- controlLoopUpdateMsg.setControlLoop(controlLoop);
- // TODO: We should look up the correct TOSCA node template here for the control loop
- // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap
- controlLoopUpdateMsg.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
- controlLoopUpdatePublisher.send(controlLoopUpdateMsg);
- }
-
- private void sendControlLoopStateChange(ControlLoop controlLoop) {
- var clsc = new ControlLoopStateChange();
- clsc.setControlLoopId(controlLoop.getKey().asIdentifier());
- clsc.setMessageId(UUID.randomUUID());
- clsc.setOrderedState(controlLoop.getOrderedState());
- controlLoopStateChangePublisher.send(clsc);
- }
-
- private void sendParticipantUpdate(ParticipantRegister participantRegisterMessage) {
- var message = new ParticipantUpdate();
- message.setParticipantId(participantRegisterMessage.getParticipantId());
- message.setParticipantType(participantRegisterMessage.getParticipantType());
- message.setTimestamp(Instant.now());
-
- ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition();
- clDefinition.setId(UUID.randomUUID());
-
- try {
- clDefinition.setControlLoopElementToscaServiceTemplate(commissioningProvider
- .getToscaServiceTemplate(null, null));
- } catch (PfModelException pfme) {
- LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme);
- return;
- }
-
- Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap = new LinkedHashMap<>();
- controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition);
-
- Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>>
- participantDefinitionUpdateMap = new LinkedHashMap<>();
- participantDefinitionUpdateMap.put(participantRegisterMessage.getParticipantId(),
- controlLoopElementDefinitionMap);
- message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
-
- LOGGER.debug("Participant Update sent", message);
- participantUpdatePublisher.send(message);
- }
-
- private void sendParticipantAckMessage(ParticipantRegister participantRegisterMessage) {
- var message = new ParticipantRegisterAck();
- message.setResponseTo(participantRegisterMessage.getMessageId());
- message.setMessage("Participant Register Ack");
- message.setResult(true);
- participantRegisterAckPublisher.send(message);
- }
-
- private void sendParticipantAckMessage(ParticipantDeregister participantDeregisterMessage) {
- var message = new ParticipantDeregisterAck();
- message.setResponseTo(participantDeregisterMessage.getMessageId());
- message.setMessage("Participant Deregister Ack");
- message.setResult(true);
- participantDeregisterAckPublisher.send(message);
- }
-
private void superviseParticipant(ParticipantStatus participantStatusMessage)
throws PfModelException, ControlLoopException {
if (participantStatusMessage.getParticipantId() == null) {
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
index 68f5830c0..b360f6703 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
@@ -20,15 +20,21 @@
package org.onap.policy.clamp.controlloop.runtime.supervision;
-import java.io.Closeable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -37,35 +43,83 @@ import org.springframework.stereotype.Component;
* This class is used to scan the control loops in the database and check if they are in the correct state.
*/
@Component
-public class SupervisionScanner implements Runnable, Closeable {
+public class SupervisionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class);
- private ControlLoopProvider controlLoopProvider;
- private ScheduledExecutorService timerPool;
+ @Getter
+ @Setter
+ static class HandleCounter {
+ private int maxRetryCount;
+ private long maxWaitMs;
+ private Map<ToscaConceptIdentifier, Integer> mapCounter = new HashMap<>();
+ private Set<ToscaConceptIdentifier> mapFault = new HashSet<>();
+
+ public void clear(ToscaConceptIdentifier id) {
+ mapCounter.put(id, 0);
+ mapFault.remove(id);
+ }
+
+ public void setFault(ToscaConceptIdentifier id) {
+ mapCounter.put(id, 0);
+ mapFault.add(id);
+ }
+
+ public boolean count(ToscaConceptIdentifier id) {
+ int counter = mapCounter.getOrDefault(id, 0) + 1;
+ if (counter <= maxRetryCount) {
+ mapCounter.put(id, counter);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isFault(ToscaConceptIdentifier id) {
+ return mapFault.contains(id);
+ }
+
+ public int getCounter(ToscaConceptIdentifier id) {
+ return mapCounter.getOrDefault(id, 0);
+ }
+ }
+
+ private HandleCounter stateChange = new HandleCounter();
+
+ private final ControlLoopProvider controlLoopProvider;
+ private final ControlLoopStateChangePublisher controlLoopStateChangePublisher;
+ private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
/**
* Constructor for instantiating SupervisionScanner.
*
- * @param clRuntimeParameterGroup the parameters for the control loop runtime
* @param controlLoopProvider the provider to use to read control loops from the database
+ * @param controlLoopStateChangePublisher the ControlLoopStateChange Publisher
+ * @param clRuntimeParameterGroup the parameters for the control loop runtime
*/
public SupervisionScanner(final ControlLoopProvider controlLoopProvider,
- ClRuntimeParameterGroup clRuntimeParameterGroup) {
+ final ControlLoopStateChangePublisher controlLoopStateChangePublisher,
+ ControlLoopUpdatePublisher controlLoopUpdatePublisher,
+ final ClRuntimeParameterGroup clRuntimeParameterGroup) {
this.controlLoopProvider = controlLoopProvider;
+ this.controlLoopStateChangePublisher = controlLoopStateChangePublisher;
+ this.controlLoopUpdatePublisher = controlLoopUpdatePublisher;
- // Kick off the timer
- timerPool = makeTimerPool();
- timerPool.scheduleAtFixedRate(this, 0, clRuntimeParameterGroup.getSupervisionScannerIntervalSec(),
- TimeUnit.SECONDS);
+ stateChange.setMaxRetryCount(
+ clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
+ stateChange.setMaxWaitMs(
+ clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
}
- @Override
- public void run() {
+ /**
+ * Run Scanning.
+ *
+ * @param counterCheck if true activate counter and retry
+ */
+ public void run(boolean counterCheck) {
LOGGER.debug("Scanning control loops in the database . . .");
try {
for (ControlLoop controlLoop : controlLoopProvider.getControlLoops(null, null)) {
- scanControlLoop(controlLoop);
+ scanControlLoop(controlLoop, counterCheck);
}
} catch (PfModelException pfme) {
LOGGER.warn("error reading control loops from database", pfme);
@@ -74,40 +128,65 @@ public class SupervisionScanner implements Runnable, Closeable {
LOGGER.debug("Control loop scan complete . . .");
}
- @Override
- public void close() {
- timerPool.shutdown();
- }
-
- private void scanControlLoop(final ControlLoop controlLoop) throws PfModelException {
+ private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException {
LOGGER.debug("scanning control loop {} . . .", controlLoop.getKey().asIdentifier());
if (controlLoop.getState().equals(controlLoop.getOrderedState().asState())) {
LOGGER.debug("control loop {} scanned, OK", controlLoop.getKey().asIdentifier());
+
+ // Clear missed report counter on Control Loop
+ clearFaultAndCounter(controlLoop);
return;
}
+ boolean completed = true;
for (ControlLoopElement element : controlLoop.getElements().values()) {
if (!element.getState().equals(element.getOrderedState().asState())) {
- LOGGER.debug("control loop scan: transitioning from state {} to {}", controlLoop.getState(),
- controlLoop.getOrderedState());
- return;
+ completed = false;
+ break;
}
}
- LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(),
- controlLoop.getOrderedState());
+ if (completed) {
+ LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(),
+ controlLoop.getOrderedState());
+
+ controlLoop.setState(controlLoop.getOrderedState().asState());
+ controlLoopProvider.updateControlLoop(controlLoop);
- controlLoop.setState(controlLoop.getOrderedState().asState());
- controlLoopProvider.updateControlLoop(controlLoop);
+ // Clear missed report counter on Control Loop
+ clearFaultAndCounter(controlLoop);
+ } else {
+ LOGGER.debug("control loop scan: transition from state {} to {} not completed", controlLoop.getState(),
+ controlLoop.getOrderedState());
+ if (counterCheck) {
+ handleCounter(controlLoop);
+ }
+ }
}
- /**
- * Makes a new timer pool.
- *
- * @return a new timer pool
- */
- protected ScheduledExecutorService makeTimerPool() {
- return Executors.newScheduledThreadPool(1);
+ private void clearFaultAndCounter(ControlLoop controlLoop) {
+ stateChange.clear(controlLoop.getKey().asIdentifier());
+ }
+
+ private void handleCounter(ControlLoop controlLoop) {
+ ToscaConceptIdentifier id = controlLoop.getKey().asIdentifier();
+ if (stateChange.isFault(id)) {
+ LOGGER.debug("report ControlLoop fault");
+ return;
+ }
+
+ if (stateChange.count(id)) {
+ if (ControlLoopState.UNINITIALISED2PASSIVE.equals(controlLoop.getState())) {
+ LOGGER.debug("retry message ControlLoopUpdate");
+ controlLoopUpdatePublisher.send(controlLoop);
+ } else {
+ LOGGER.debug("retry message ControlLoopStateChange");
+ controlLoopStateChangePublisher.send(controlLoop);
+ }
+ } else {
+ LOGGER.debug("report ControlLoop fault");
+ stateChange.setFault(id);
+ }
}
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java
index e562343ff..e366ba49a 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java
@@ -20,42 +20,43 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+import lombok.AllArgsConstructor;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
import org.onap.policy.models.base.PfModelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* This class is used to send ControlLoopUpdate messages to participants on DMaaP.
*/
@Component
+@AllArgsConstructor
public class ControlLoopUpdatePublisher extends AbstractParticipantPublisher<ControlLoopUpdate> {
- private final CommissioningProvider commissioningProvider;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdatePublisher.class);
- /**
- * Constructor.
- *
- * @param commissioningProvider the CommissioningProvider
- */
- public ControlLoopUpdatePublisher(CommissioningProvider commissioningProvider) {
- this.commissioningProvider = commissioningProvider;
- }
+ private final CommissioningProvider commissioningProvider;
/**
* Send ControlLoopUpdate to Participant.
*
* @param controlLoop the ControlLoop
- * @throws PfModelException on errors getting the Control Loop Definition
*/
- public void send(ControlLoop controlLoop) throws PfModelException {
- var pclu = new ControlLoopUpdate();
- pclu.setControlLoopId(controlLoop.getKey().asIdentifier());
- pclu.setControlLoop(controlLoop);
+ public void send(ControlLoop controlLoop) {
+ var controlLoopUpdateMsg = new ControlLoopUpdate();
+ controlLoopUpdateMsg.setControlLoopId(controlLoop.getKey().asIdentifier());
+ controlLoopUpdateMsg.setControlLoop(controlLoop);
// TODO: We should look up the correct TOSCA node template here for the control loop
// Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap
- pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
- super.send(pclu);
+ try {
+ controlLoopUpdateMsg.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
+ } catch (PfModelException pfme) {
+ LOGGER.warn("Get of tosca service template failed, cannot send ParticipantControlLoopUpdate", pfme);
+ return;
+ }
+ super.send(controlLoopUpdateMsg);
}
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
index c0fcb3e7d..e92b6ee1b 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+import java.util.UUID;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
import org.springframework.stereotype.Component;
@@ -29,4 +30,16 @@ import org.springframework.stereotype.Component;
@Component
public class ParticipantDeregisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantDeregisterAck> {
+ /**
+ * Sent ParticipantDeregisterAck to Participant.
+ *
+ * @param responseTo the original request id in the request.
+ */
+ public void send(UUID responseTo) {
+ var message = new ParticipantDeregisterAck();
+ message.setResponseTo(responseTo);
+ message.setMessage("Participant Deregister Ack");
+ message.setResult(true);
+ super.send(message);
+ }
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
index 2c0c4b393..73860b5c2 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+import java.util.UUID;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
import org.springframework.stereotype.Component;
@@ -29,4 +30,16 @@ import org.springframework.stereotype.Component;
@Component
public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantRegisterAck> {
+ /**
+ * Send ParticipantRegisterAck to Participant.
+ *
+ * @param responseTo the original request id in the request.
+ */
+ public void send(UUID responseTo) {
+ var message = new ParticipantRegisterAck();
+ message.setResponseTo(responseTo);
+ message.setMessage("Participant Register Ack");
+ message.setResult(true);
+ super.send(message);
+ }
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
index b8538b1f7..4eeb0a8ce 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
@@ -21,7 +21,6 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener;
import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
index 5af5f1f54..88cf90d02 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
@@ -20,13 +20,63 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import lombok.AllArgsConstructor;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
+import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* This class is used to send ParticipantUpdate messages to participants on DMaaP.
*/
@Component
+@AllArgsConstructor
public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<ParticipantUpdate> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantUpdatePublisher.class);
+
+ private final CommissioningProvider commissioningProvider;
+
+ /**
+ * Send ParticipantUpdate to Participant.
+ *
+ * @param participantId the participant Id
+ * @param participantType the participant Type
+ */
+ public void send(ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) {
+ var message = new ParticipantUpdate();
+ message.setParticipantId(participantId);
+ message.setParticipantType(participantType);
+ message.setTimestamp(Instant.now());
+
+ var clDefinition = new ControlLoopElementDefinition();
+ clDefinition.setId(UUID.randomUUID());
+
+ try {
+ clDefinition.setControlLoopElementToscaServiceTemplate(
+ commissioningProvider.getToscaServiceTemplate(null, null));
+ } catch (PfModelException pfme) {
+ LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme);
+ return;
+ }
+
+ Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap = new LinkedHashMap<>();
+ controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition);
+
+ Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>> participantDefinitionUpdateMap =
+ new LinkedHashMap<>();
+ participantDefinitionUpdateMap.put(participantId, controlLoopElementDefinitionMap);
+ message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
+
+ LOGGER.debug("Participant Update sent {}", message);
+ super.send(message);
+ }
}