summaryrefslogtreecommitdiffstats
path: root/runtime-controlloop/src
diff options
context:
space:
mode:
Diffstat (limited to 'runtime-controlloop/src')
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java2
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java3
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantParameters.java5
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantStateChangeParameters.java46
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java14
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java20
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java17
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java (renamed from runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java)18
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java71
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java101
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java151
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopStateChangePublisher.java (renamed from runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java)10
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java33
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java13
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java13
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java1
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java50
-rw-r--r--runtime-controlloop/src/main/resources/application.yaml28
-rw-r--r--runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProviderTest.java6
-rw-r--r--runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/SupervisionMessagesTest.java6
-rw-r--r--runtime-controlloop/src/test/resources/application_test.properties1
-rw-r--r--runtime-controlloop/src/test/resources/parameters/TestParameters.json1
22 files changed, 347 insertions, 263 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java
index 28814b354..5fbd36c06 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java
@@ -24,7 +24,9 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.context.annotation.ComponentScan;
+import org.springframework.scheduling.annotation.EnableScheduling;
+@EnableScheduling
@SpringBootApplication
@ComponentScan({"org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider",
"org.onap.policy.clamp.controlloop.runtime"})
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java
index f46b90294..86473caa8 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java
@@ -57,9 +57,6 @@ public class ClRuntimeParameterGroup {
private long supervisionScannerIntervalSec;
@Min(value = 0)
- private long participantStateChangeIntervalSec;
-
- @Min(value = 0)
private long participantClUpdateIntervalSec;
@Min(value = 0)
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantParameters.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantParameters.java
index a4e84af0d..47a99ca29 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantParameters.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantParameters.java
@@ -49,9 +49,4 @@ public class ParticipantParameters {
@Valid
@NotNull
private ParticipantUpdateParameters updateParameters;
-
- @Valid
- @NotNull
- private ParticipantStateChangeParameters stateChangeParameters;
-
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantStateChangeParameters.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantStateChangeParameters.java
deleted file mode 100644
index a3e2eee2e..000000000
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantStateChangeParameters.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.clamp.controlloop.runtime.main.parameters;
-
-import javax.validation.constraints.Min;
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.validation.annotation.Validated;
-
-/**
- * Parameters for Participant STATE-CHANGE requests.
- */
-@Getter
-@Setter
-@Validated
-public class ParticipantStateChangeParameters {
-
- /**
- * Maximum number of times to re-send a request to a PDP.
- */
- @Min(value = 0)
- private int maxRetryCount;
-
- /**
- * Maximum time to wait, in milliseconds, for a PDP response.
- */
- @Min(value = 0)
- private long maxWaitMs;
-
-}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java
index cc4ce16f7..8fe3c0c88 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java
@@ -62,6 +62,8 @@ public class CommissioningController extends AbstractRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(CommissioningController.class);
+ private static final String TAGS = "Clamp Control Loop Commissioning API";
+
private final CommissioningProvider provider;
/**
@@ -88,7 +90,7 @@ public class CommissioningController extends AbstractRestController {
value = "Commissions control loop definitions",
notes = "Commissions control loop definitions, returning the commissioned control loop definition IDs",
response = CommissioningResponse.class,
- tags = {"Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -158,7 +160,7 @@ public class CommissioningController extends AbstractRestController {
@ApiOperation(value = "Delete a commissioned control loop",
notes = "Deletes a Commissioned Control Loop, returning optional error details",
response = CommissioningResponse.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -233,7 +235,7 @@ public class CommissioningController extends AbstractRestController {
notes = "Queries details of the requested commissioned control loop definitions, "
+ "returning all control loop details",
response = ToscaNodeTemplate.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -302,7 +304,7 @@ public class CommissioningController extends AbstractRestController {
notes = "Queries details of the requested commissioned tosca service template, "
+ "returning all tosca service template details",
response = ToscaServiceTemplate.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -379,7 +381,7 @@ public class CommissioningController extends AbstractRestController {
notes = "Queries details of the requested commissioned tosca service template json schema, "
+ "returning all tosca service template json schema details",
response = ToscaServiceTemplate.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -521,7 +523,7 @@ public class CommissioningController extends AbstractRestController {
notes = "Queries details of the requested commissioned control loop element definitions, "
+ "returning all control loop elements' details",
response = ToscaNodeTemplate.class,
- tags = {"Clamp Control Loop Commissioning API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java
index aba585e29..5a320e8dc 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/InstantiationController.java
@@ -59,6 +59,8 @@ public class InstantiationController extends AbstractRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(InstantiationController.class);
+ private static final String TAGS = "Clamp Control Loop Instantiation API";
+
// The CL provider for instantiation requests
private final ControlLoopInstantiationProvider provider;
@@ -86,7 +88,7 @@ public class InstantiationController extends AbstractRestController {
value = "Commissions control loop definitions",
notes = "Commissions control loop definitions, returning the control loop IDs",
response = InstantiationResponse.class,
- tags = {"Control Loop Instantiation API"},
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -154,9 +156,7 @@ public class InstantiationController extends AbstractRestController {
@ApiOperation(value = "Query details of the requested control loops",
notes = "Queries details of the requested control loops, returning all control loop details",
response = ControlLoops.class,
- tags = {
- "Clamp control loop Instantiation API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -220,9 +220,7 @@ public class InstantiationController extends AbstractRestController {
value = "Updates control loop definitions",
notes = "Updates control loop definitions, returning the updated control loop definition IDs",
response = InstantiationResponse.class,
- tags = {
- "Control Loop Instantiation API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -290,9 +288,7 @@ public class InstantiationController extends AbstractRestController {
@ApiOperation(value = "Delete a control loop",
notes = "Deletes a control loop, returning optional error details",
response = InstantiationResponse.class,
- tags = {
- "Clamp Control Loop Instantiation API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -362,9 +358,7 @@ public class InstantiationController extends AbstractRestController {
@ApiOperation(value = "Issue a command to the requested control loops",
notes = "Issues a command to a control loop, ordering a state change on the control loop",
response = InstantiationResponse.class,
- tags = {
- "Clamp Control Loop Instantiation API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java
index 86531597a..7ac95003e 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/MonitoringQueryController.java
@@ -52,6 +52,7 @@ import org.springframework.web.bind.annotation.RestController;
public class MonitoringQueryController extends AbstractRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(MonitoringQueryController.class);
+ private static final String TAGS = "Clamp Control Loop Monitoring API";
private final MonitoringProvider provider;
/**
@@ -80,9 +81,7 @@ public class MonitoringQueryController extends AbstractRestController {
@ApiOperation(value = "Query details of the requested participant stats",
notes = "Queries details of the requested participant stats, returning all participant stats",
response = ParticipantStatisticsList.class,
- tags = {
- "Clamp control loop Monitoring API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -168,9 +167,7 @@ public class MonitoringQueryController extends AbstractRestController {
@ApiOperation(value = "Query details of all the participant stats in a control loop",
notes = "Queries details of the participant stats, returning all participant stats",
response = ClElementStatisticsList.class,
- tags = {
- "Clamp control loop Monitoring API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -235,9 +232,7 @@ public class MonitoringQueryController extends AbstractRestController {
@ApiOperation(value = "Query details of the requested cl element stats in a control loop",
notes = "Queries details of the requested cl element stats, returning all clElement stats",
response = ClElementStatisticsList.class,
- tags = {
- "Clamp control loop Monitoring API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
@@ -306,9 +301,7 @@ public class MonitoringQueryController extends AbstractRestController {
@ApiOperation(value = "Query details of the requested cl element stats",
notes = "Queries details of the requested cl element stats, returning all clElement stats",
response = ClElementStatisticsList.class,
- tags = {
- "Clamp control loop Monitoring API"
- },
+ tags = {TAGS},
authorizations = @Authorization(value = AUTHORIZATION_TYPE),
responseHeaders = {
@ResponseHeader(
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java
index b63cbdf03..c23ed833d 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/MessageIntercept.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,15 +18,15 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+package org.onap.policy.clamp.controlloop.runtime.supervision;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange;
-import org.springframework.stereotype.Component;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
-/**
- * This class is used to send ParticipantStateChange messages to participants on DMaaP.
- */
-@Component
-public class ParticipantStateChangePublisher extends AbstractParticipantPublisher<ParticipantStateChange> {
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface MessageIntercept {
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
new file mode 100644
index 000000000..293b5d5da
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionAspect.java
@@ -0,0 +1,71 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import org.aspectj.lang.annotation.After;
+import org.aspectj.lang.annotation.Aspect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Aspect
+@Component
+@RequiredArgsConstructor
+public class SupervisionAspect implements Closeable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class);
+
+ private final SupervisionScanner supervisionScanner;
+
+ private ThreadPoolExecutor executor =
+ new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+
+ @Scheduled(
+ fixedRateString = "${runtime.participantParameters.heartBeatMs}",
+ initialDelayString = "${runtime.participantParameters.heartBeatMs}")
+ public void schedule() {
+ LOGGER.info("Add scheduled scanning");
+ executor.execute(() -> supervisionScanner.run(true));
+ }
+
+ /**
+ * Intercept Messages from participant and run Supervision Scan.
+ */
+ @After("@annotation(MessageIntercept)")
+ public void doCheck() {
+ if (executor.getQueue().size() < 2) {
+ LOGGER.debug("Add scanning Message");
+ executor.execute(() -> supervisionScanner.run(false));
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ executor.shutdown();
+ }
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
index baf0b176d..dadfe0de2 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
@@ -20,46 +20,27 @@
package org.onap.policy.clamp.controlloop.runtime.supervision;
-import java.time.Instant;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
-import java.util.UUID;
import javax.ws.rs.core.Response;
import lombok.AllArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
-import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
-import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
-import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
-import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher;
-import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStateChangePublisher;
-import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
-import org.onap.policy.common.utils.services.ServiceManager;
-import org.onap.policy.common.utils.services.ServiceManagerException;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
@@ -85,11 +66,10 @@ public class SupervisionHandler {
private final ControlLoopProvider controlLoopProvider;
private final ParticipantProvider participantProvider;
private final MonitoringProvider monitoringProvider;
- private final CommissioningProvider commissioningProvider;
// Publishers for participant communication
private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
- private final ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher;
+ private final ControlLoopStateChangePublisher controlLoopStateChangePublisher;
private final ParticipantRegisterAckPublisher participantRegisterAckPublisher;
private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
private final ParticipantUpdatePublisher participantUpdatePublisher;
@@ -131,6 +111,7 @@ public class SupervisionHandler {
*
* @param participantStatusMessage the ParticipantStatus message received from a participant
*/
+ @MessageIntercept
public void handleParticipantMessage(ParticipantStatus participantStatusMessage) {
LOGGER.debug("Participant Status received {}", participantStatusMessage);
try {
@@ -152,10 +133,14 @@ public class SupervisionHandler {
*
* @param participantRegisterMessage the ParticipantRegister message received from a participant
*/
+ @MessageIntercept
public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
LOGGER.debug("Participant Register received {}", participantRegisterMessage);
- sendParticipantAckMessage(participantRegisterMessage);
- sendParticipantUpdate(participantRegisterMessage);
+
+ participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId());
+
+ participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(),
+ participantRegisterMessage.getParticipantType());
}
/**
@@ -163,9 +148,10 @@ public class SupervisionHandler {
*
* @param participantDeregisterMessage the ParticipantDeregister message received from a participant
*/
+ @MessageIntercept
public void handleParticipantMessage(ParticipantDeregister participantDeregisterMessage) {
LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage);
- sendParticipantAckMessage(participantDeregisterMessage);
+ participantDeregisterAckPublisher.send(participantDeregisterMessage.getMessageId());
}
/**
@@ -173,6 +159,7 @@ public class SupervisionHandler {
*
* @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant
*/
+ @MessageIntercept
public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) {
LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage);
}
@@ -290,70 +277,6 @@ public class SupervisionHandler {
}
}
- private void sendControlLoopUpdate(ControlLoop controlLoop) throws PfModelException {
- var controlLoopUpdateMsg = new ControlLoopUpdate();
- controlLoopUpdateMsg.setControlLoopId(controlLoop.getKey().asIdentifier());
- controlLoopUpdateMsg.setControlLoop(controlLoop);
- // TODO: We should look up the correct TOSCA node template here for the control loop
- // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap
- controlLoopUpdateMsg.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
- controlLoopUpdatePublisher.send(controlLoopUpdateMsg);
- }
-
- private void sendControlLoopStateChange(ControlLoop controlLoop) {
- var clsc = new ParticipantControlLoopStateChange();
- clsc.setControlLoopId(controlLoop.getKey().asIdentifier());
- clsc.setMessageId(UUID.randomUUID());
- clsc.setOrderedState(controlLoop.getOrderedState());
- controlLoopStateChangePublisher.send(clsc);
- }
-
- private void sendParticipantUpdate(ParticipantRegister participantRegisterMessage) {
- var message = new ParticipantUpdate();
- message.setParticipantId(participantRegisterMessage.getParticipantId());
- message.setParticipantType(participantRegisterMessage.getParticipantType());
- message.setTimestamp(Instant.now());
-
- ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition();
- clDefinition.setId(UUID.randomUUID());
-
- try {
- clDefinition.setControlLoopElementToscaServiceTemplate(commissioningProvider
- .getToscaServiceTemplate(null, null));
- } catch (PfModelException pfme) {
- LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme);
- return;
- }
-
- Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap = new LinkedHashMap<>();
- controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition);
-
- Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>>
- participantDefinitionUpdateMap = new LinkedHashMap<>();
- participantDefinitionUpdateMap.put(participantRegisterMessage.getParticipantId(),
- controlLoopElementDefinitionMap);
- message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
-
- LOGGER.debug("Participant Update sent", message);
- participantUpdatePublisher.send(message);
- }
-
- private void sendParticipantAckMessage(ParticipantRegister participantRegisterMessage) {
- var message = new ParticipantRegisterAck();
- message.setResponseTo(participantRegisterMessage.getMessageId());
- message.setMessage("Participant Register Ack");
- message.setResult(true);
- participantRegisterAckPublisher.send(message);
- }
-
- private void sendParticipantAckMessage(ParticipantDeregister participantDeregisterMessage) {
- var message = new ParticipantDeregisterAck();
- message.setResponseTo(participantDeregisterMessage.getMessageId());
- message.setMessage("Participant Deregister Ack");
- message.setResult(true);
- participantDeregisterAckPublisher.send(message);
- }
-
private void superviseParticipant(ParticipantStatus participantStatusMessage)
throws PfModelException, ControlLoopException {
if (participantStatusMessage.getParticipantId() == null) {
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
index 68f5830c0..b360f6703 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
@@ -20,15 +20,21 @@
package org.onap.policy.clamp.controlloop.runtime.supervision;
-import java.io.Closeable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -37,35 +43,83 @@ import org.springframework.stereotype.Component;
* This class is used to scan the control loops in the database and check if they are in the correct state.
*/
@Component
-public class SupervisionScanner implements Runnable, Closeable {
+public class SupervisionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class);
- private ControlLoopProvider controlLoopProvider;
- private ScheduledExecutorService timerPool;
+ @Getter
+ @Setter
+ static class HandleCounter {
+ private int maxRetryCount;
+ private long maxWaitMs;
+ private Map<ToscaConceptIdentifier, Integer> mapCounter = new HashMap<>();
+ private Set<ToscaConceptIdentifier> mapFault = new HashSet<>();
+
+ public void clear(ToscaConceptIdentifier id) {
+ mapCounter.put(id, 0);
+ mapFault.remove(id);
+ }
+
+ public void setFault(ToscaConceptIdentifier id) {
+ mapCounter.put(id, 0);
+ mapFault.add(id);
+ }
+
+ public boolean count(ToscaConceptIdentifier id) {
+ int counter = mapCounter.getOrDefault(id, 0) + 1;
+ if (counter <= maxRetryCount) {
+ mapCounter.put(id, counter);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isFault(ToscaConceptIdentifier id) {
+ return mapFault.contains(id);
+ }
+
+ public int getCounter(ToscaConceptIdentifier id) {
+ return mapCounter.getOrDefault(id, 0);
+ }
+ }
+
+ private HandleCounter stateChange = new HandleCounter();
+
+ private final ControlLoopProvider controlLoopProvider;
+ private final ControlLoopStateChangePublisher controlLoopStateChangePublisher;
+ private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
/**
* Constructor for instantiating SupervisionScanner.
*
- * @param clRuntimeParameterGroup the parameters for the control loop runtime
* @param controlLoopProvider the provider to use to read control loops from the database
+ * @param controlLoopStateChangePublisher the ControlLoopStateChange Publisher
+ * @param clRuntimeParameterGroup the parameters for the control loop runtime
*/
public SupervisionScanner(final ControlLoopProvider controlLoopProvider,
- ClRuntimeParameterGroup clRuntimeParameterGroup) {
+ final ControlLoopStateChangePublisher controlLoopStateChangePublisher,
+ ControlLoopUpdatePublisher controlLoopUpdatePublisher,
+ final ClRuntimeParameterGroup clRuntimeParameterGroup) {
this.controlLoopProvider = controlLoopProvider;
+ this.controlLoopStateChangePublisher = controlLoopStateChangePublisher;
+ this.controlLoopUpdatePublisher = controlLoopUpdatePublisher;
- // Kick off the timer
- timerPool = makeTimerPool();
- timerPool.scheduleAtFixedRate(this, 0, clRuntimeParameterGroup.getSupervisionScannerIntervalSec(),
- TimeUnit.SECONDS);
+ stateChange.setMaxRetryCount(
+ clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount());
+ stateChange.setMaxWaitMs(
+ clRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs());
}
- @Override
- public void run() {
+ /**
+ * Run Scanning.
+ *
+ * @param counterCheck if true activate counter and retry
+ */
+ public void run(boolean counterCheck) {
LOGGER.debug("Scanning control loops in the database . . .");
try {
for (ControlLoop controlLoop : controlLoopProvider.getControlLoops(null, null)) {
- scanControlLoop(controlLoop);
+ scanControlLoop(controlLoop, counterCheck);
}
} catch (PfModelException pfme) {
LOGGER.warn("error reading control loops from database", pfme);
@@ -74,40 +128,65 @@ public class SupervisionScanner implements Runnable, Closeable {
LOGGER.debug("Control loop scan complete . . .");
}
- @Override
- public void close() {
- timerPool.shutdown();
- }
-
- private void scanControlLoop(final ControlLoop controlLoop) throws PfModelException {
+ private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException {
LOGGER.debug("scanning control loop {} . . .", controlLoop.getKey().asIdentifier());
if (controlLoop.getState().equals(controlLoop.getOrderedState().asState())) {
LOGGER.debug("control loop {} scanned, OK", controlLoop.getKey().asIdentifier());
+
+ // Clear missed report counter on Control Loop
+ clearFaultAndCounter(controlLoop);
return;
}
+ boolean completed = true;
for (ControlLoopElement element : controlLoop.getElements().values()) {
if (!element.getState().equals(element.getOrderedState().asState())) {
- LOGGER.debug("control loop scan: transitioning from state {} to {}", controlLoop.getState(),
- controlLoop.getOrderedState());
- return;
+ completed = false;
+ break;
}
}
- LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(),
- controlLoop.getOrderedState());
+ if (completed) {
+ LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(),
+ controlLoop.getOrderedState());
+
+ controlLoop.setState(controlLoop.getOrderedState().asState());
+ controlLoopProvider.updateControlLoop(controlLoop);
- controlLoop.setState(controlLoop.getOrderedState().asState());
- controlLoopProvider.updateControlLoop(controlLoop);
+ // Clear missed report counter on Control Loop
+ clearFaultAndCounter(controlLoop);
+ } else {
+ LOGGER.debug("control loop scan: transition from state {} to {} not completed", controlLoop.getState(),
+ controlLoop.getOrderedState());
+ if (counterCheck) {
+ handleCounter(controlLoop);
+ }
+ }
}
- /**
- * Makes a new timer pool.
- *
- * @return a new timer pool
- */
- protected ScheduledExecutorService makeTimerPool() {
- return Executors.newScheduledThreadPool(1);
+ private void clearFaultAndCounter(ControlLoop controlLoop) {
+ stateChange.clear(controlLoop.getKey().asIdentifier());
+ }
+
+ private void handleCounter(ControlLoop controlLoop) {
+ ToscaConceptIdentifier id = controlLoop.getKey().asIdentifier();
+ if (stateChange.isFault(id)) {
+ LOGGER.debug("report ControlLoop fault");
+ return;
+ }
+
+ if (stateChange.count(id)) {
+ if (ControlLoopState.UNINITIALISED2PASSIVE.equals(controlLoop.getState())) {
+ LOGGER.debug("retry message ControlLoopUpdate");
+ controlLoopUpdatePublisher.send(controlLoop);
+ } else {
+ LOGGER.debug("retry message ControlLoopStateChange");
+ controlLoopStateChangePublisher.send(controlLoop);
+ }
+ } else {
+ LOGGER.debug("report ControlLoop fault");
+ stateChange.setFault(id);
+ }
}
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopStateChangePublisher.java
index 734ccb842..79d113c14 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopStateChangePublisher.java
@@ -22,15 +22,15 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
import java.util.UUID;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
import org.springframework.stereotype.Component;
/**
- * This class is used to send ParticipantControlLoopStateChangePublisher messages to participants on DMaaP.
+ * This class is used to send ControlLoopStateChangePublisher messages to participants on DMaaP.
*/
@Component
-public class ParticipantControlLoopStateChangePublisher
- extends AbstractParticipantPublisher<ParticipantControlLoopStateChange> {
+public class ControlLoopStateChangePublisher
+ extends AbstractParticipantPublisher<ControlLoopStateChange> {
/**
* Send ControlLoopStateChange to Participant.
@@ -38,7 +38,7 @@ public class ParticipantControlLoopStateChangePublisher
* @param controlLoop the ControlLoop
*/
public void send(ControlLoop controlLoop) {
- var clsc = new ParticipantControlLoopStateChange();
+ var clsc = new ControlLoopStateChange();
clsc.setControlLoopId(controlLoop.getKey().asIdentifier());
clsc.setMessageId(UUID.randomUUID());
clsc.setOrderedState(controlLoop.getOrderedState());
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java
index e562343ff..e366ba49a 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ControlLoopUpdatePublisher.java
@@ -20,42 +20,43 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+import lombok.AllArgsConstructor;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
import org.onap.policy.models.base.PfModelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* This class is used to send ControlLoopUpdate messages to participants on DMaaP.
*/
@Component
+@AllArgsConstructor
public class ControlLoopUpdatePublisher extends AbstractParticipantPublisher<ControlLoopUpdate> {
- private final CommissioningProvider commissioningProvider;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdatePublisher.class);
- /**
- * Constructor.
- *
- * @param commissioningProvider the CommissioningProvider
- */
- public ControlLoopUpdatePublisher(CommissioningProvider commissioningProvider) {
- this.commissioningProvider = commissioningProvider;
- }
+ private final CommissioningProvider commissioningProvider;
/**
* Send ControlLoopUpdate to Participant.
*
* @param controlLoop the ControlLoop
- * @throws PfModelException on errors getting the Control Loop Definition
*/
- public void send(ControlLoop controlLoop) throws PfModelException {
- var pclu = new ControlLoopUpdate();
- pclu.setControlLoopId(controlLoop.getKey().asIdentifier());
- pclu.setControlLoop(controlLoop);
+ public void send(ControlLoop controlLoop) {
+ var controlLoopUpdateMsg = new ControlLoopUpdate();
+ controlLoopUpdateMsg.setControlLoopId(controlLoop.getKey().asIdentifier());
+ controlLoopUpdateMsg.setControlLoop(controlLoop);
// TODO: We should look up the correct TOSCA node template here for the control loop
// Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap
- pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
- super.send(pclu);
+ try {
+ controlLoopUpdateMsg.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
+ } catch (PfModelException pfme) {
+ LOGGER.warn("Get of tosca service template failed, cannot send ParticipantControlLoopUpdate", pfme);
+ return;
+ }
+ super.send(controlLoopUpdateMsg);
}
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
index c0fcb3e7d..e92b6ee1b 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+import java.util.UUID;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
import org.springframework.stereotype.Component;
@@ -29,4 +30,16 @@ import org.springframework.stereotype.Component;
@Component
public class ParticipantDeregisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantDeregisterAck> {
+ /**
+ * Sent ParticipantDeregisterAck to Participant.
+ *
+ * @param responseTo the original request id in the request.
+ */
+ public void send(UUID responseTo) {
+ var message = new ParticipantDeregisterAck();
+ message.setResponseTo(responseTo);
+ message.setMessage("Participant Deregister Ack");
+ message.setResult(true);
+ super.send(message);
+ }
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
index 2c0c4b393..73860b5c2 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
@@ -20,6 +20,7 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+import java.util.UUID;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
import org.springframework.stereotype.Component;
@@ -29,4 +30,16 @@ import org.springframework.stereotype.Component;
@Component
public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantRegisterAck> {
+ /**
+ * Send ParticipantRegisterAck to Participant.
+ *
+ * @param responseTo the original request id in the request.
+ */
+ public void send(UUID responseTo) {
+ var message = new ParticipantRegisterAck();
+ message.setResponseTo(responseTo);
+ message.setMessage("Participant Register Ack");
+ message.setResult(true);
+ super.send(message);
+ }
}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
index b8538b1f7..4eeb0a8ce 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
@@ -21,7 +21,6 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener;
import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
index 5af5f1f54..88cf90d02 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
@@ -20,13 +20,63 @@
package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import lombok.AllArgsConstructor;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
+import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* This class is used to send ParticipantUpdate messages to participants on DMaaP.
*/
@Component
+@AllArgsConstructor
public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<ParticipantUpdate> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantUpdatePublisher.class);
+
+ private final CommissioningProvider commissioningProvider;
+
+ /**
+ * Send ParticipantUpdate to Participant.
+ *
+ * @param participantId the participant Id
+ * @param participantType the participant Type
+ */
+ public void send(ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) {
+ var message = new ParticipantUpdate();
+ message.setParticipantId(participantId);
+ message.setParticipantType(participantType);
+ message.setTimestamp(Instant.now());
+
+ var clDefinition = new ControlLoopElementDefinition();
+ clDefinition.setId(UUID.randomUUID());
+
+ try {
+ clDefinition.setControlLoopElementToscaServiceTemplate(
+ commissioningProvider.getToscaServiceTemplate(null, null));
+ } catch (PfModelException pfme) {
+ LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme);
+ return;
+ }
+
+ Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap = new LinkedHashMap<>();
+ controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition);
+
+ Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>> participantDefinitionUpdateMap =
+ new LinkedHashMap<>();
+ participantDefinitionUpdateMap.put(participantId, controlLoopElementDefinitionMap);
+ message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
+
+ LOGGER.debug("Participant Update sent {}", message);
+ super.send(message);
+ }
}
diff --git a/runtime-controlloop/src/main/resources/application.yaml b/runtime-controlloop/src/main/resources/application.yaml
index 3bc2749ed..d0e5500d6 100644
--- a/runtime-controlloop/src/main/resources/application.yaml
+++ b/runtime-controlloop/src/main/resources/application.yaml
@@ -20,7 +20,6 @@ server:
runtime:
supervisionScannerIntervalSec: 1000
- participantStateChangeIntervalSec: 1000
participantClUpdateIntervalSec: 1000
participantClStateChangeIntervalSec: 1000
participantParameters:
@@ -28,24 +27,25 @@ runtime:
updateParameters:
maxRetryCount: 1
maxWaitMs: 30000
- stateChangeParameters:
- maxRetryCount: 1
- maxWaitMs: 30000
databaseProviderParameters:
name: PolicyProviderParameterGroup
implementation: org.onap.policy.models.provider.impl.DatabasePolicyModelsProviderImpl
databaseDriver: org.mariadb.jdbc.Driver
- databaseUrl: jdbc:mariadb://${mariadb.host:mariadb}:${mariadb.port:3306}/controlloop
+ databaseUrl: jdbc:mariadb://${mariadb.host:localhost}:${mariadb.port:3306}/controlloop
databaseUser: policy
databasePassword: P01icY
persistenceUnit: CommissioningMariaDb
topicParameterGroup:
- topicSources[0]:
- topic: POLICY-CLRUNTIME-PARTICIPANT
- servers[0]: ${topicServer:message-router}
- topicCommInfrastructure: dmaap
- fetchTimeout: 15000
- topicSinks[0]:
- topic: POLICY-CLRUNTIME-PARTICIPANT
- servers[0]: ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ topicSources:
+ -
+ topic: POLICY-CLRUNTIME-PARTICIPANT
+ servers:
+ - ${topicServer:localhost}
+ topicCommInfrastructure: dmaap
+ fetchTimeout: 15000
+ topicSinks:
+ -
+ topic: POLICY-CLRUNTIME-PARTICIPANT
+ servers:
+ - ${topicServer:localhost}
+ topicCommInfrastructure: dmaap
diff --git a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProviderTest.java b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProviderTest.java
index 9fd1ea35f..c414ffa02 100644
--- a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProviderTest.java
+++ b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProviderTest.java
@@ -43,8 +43,8 @@ import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProv
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider;
import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ControlLoopUpdatePublisher;
-import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
@@ -114,12 +114,12 @@ class ControlLoopInstantiationProviderTest {
new MonitoringProvider(participantStatisticsProvider, clElementStatisticsProvider, clProvider);
var participantProvider = new ParticipantProvider(controlLoopParameters.getDatabaseProviderParameters());
var controlLoopUpdatePublisher = Mockito.mock(ControlLoopUpdatePublisher.class);
- var controlLoopStateChangePublisher = Mockito.mock(ParticipantControlLoopStateChangePublisher.class);
+ var controlLoopStateChangePublisher = Mockito.mock(ControlLoopStateChangePublisher.class);
var participantRegisterAckPublisher = Mockito.mock(ParticipantRegisterAckPublisher.class);
var participantDeregisterAckPublisher = Mockito.mock(ParticipantDeregisterAckPublisher.class);
var participantUpdatePublisher = Mockito.mock(ParticipantUpdatePublisher.class);
supervisionHandler = new SupervisionHandler(clProvider, participantProvider, monitoringProvider,
- commissioningProvider, controlLoopUpdatePublisher, controlLoopStateChangePublisher,
+ controlLoopUpdatePublisher, controlLoopStateChangePublisher,
participantRegisterAckPublisher, participantDeregisterAckPublisher, participantUpdatePublisher);
}
diff --git a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/SupervisionMessagesTest.java b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/SupervisionMessagesTest.java
index ad6d7ccd6..5f00706e1 100644
--- a/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/SupervisionMessagesTest.java
+++ b/runtime-controlloop/src/test/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/SupervisionMessagesTest.java
@@ -91,12 +91,12 @@ class SupervisionMessagesTest extends CommonRestController {
new MonitoringProvider(participantStatisticsProvider, clElementStatisticsProvider, clProvider);
var participantProvider = new ParticipantProvider(controlLoopParameters.getDatabaseProviderParameters());
var controlLoopUpdatePublisher = Mockito.mock(ControlLoopUpdatePublisher.class);
- var controlLoopStateChangePublisher = Mockito.mock(ParticipantControlLoopStateChangePublisher.class);
+ var controlLoopStateChangePublisher = Mockito.mock(ControlLoopStateChangePublisher.class);
var participantRegisterAckPublisher = Mockito.mock(ParticipantRegisterAckPublisher.class);
var participantDeregisterAckPublisher = Mockito.mock(ParticipantDeregisterAckPublisher.class);
var participantUpdatePublisher = Mockito.mock(ParticipantUpdatePublisher.class);
supervisionHandler = new SupervisionHandler(clProvider, participantProvider, monitoringProvider,
- commissioningProvider, controlLoopUpdatePublisher, controlLoopStateChangePublisher,
+ controlLoopUpdatePublisher, controlLoopStateChangePublisher,
participantRegisterAckPublisher, participantDeregisterAckPublisher, participantUpdatePublisher);
}
@@ -196,7 +196,7 @@ class SupervisionMessagesTest extends CommonRestController {
participantUpdateMsg.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
synchronized (lockit) {
- ParticipantUpdatePublisher clUpdatePublisher = new ParticipantUpdatePublisher();
+ ParticipantUpdatePublisher clUpdatePublisher = new ParticipantUpdatePublisher(commissioningProvider);
clUpdatePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
clUpdatePublisher.send(participantUpdateMsg);
}
diff --git a/runtime-controlloop/src/test/resources/application_test.properties b/runtime-controlloop/src/test/resources/application_test.properties
index e51db25ce..ad2a4b12c 100644
--- a/runtime-controlloop/src/test/resources/application_test.properties
+++ b/runtime-controlloop/src/test/resources/application_test.properties
@@ -7,7 +7,6 @@ server.servlet.context-path=/onap/controlloop
server.error.path=/error
runtime.supervisionScannerIntervalSec=1000
-runtime.participantStateChangeIntervalSec=1000
runtime.participantClUpdateIntervalSec=1000
runtime.participantClStateChangeIntervalSec=1000
runtime.participantParameters.heartBeatMs=120000
diff --git a/runtime-controlloop/src/test/resources/parameters/TestParameters.json b/runtime-controlloop/src/test/resources/parameters/TestParameters.json
index 07c90d7b2..99fc43d21 100644
--- a/runtime-controlloop/src/test/resources/parameters/TestParameters.json
+++ b/runtime-controlloop/src/test/resources/parameters/TestParameters.json
@@ -1,7 +1,6 @@
{
"name": "ControlLoopRuntimeGroup",
"supervisionScannerIntervalSec": 1000,
- "participantStateChangeIntervalSec": 1000,
"participantClUpdateIntervalSec": 1000,
"participantClStateChangeIntervalSec": 1000,
"participantParameters": {