aboutsummaryrefslogtreecommitdiffstats
path: root/tosca-controlloop/runtime/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'tosca-controlloop/runtime/src/main/java/org')
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningHandler.java20
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java15
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/rest/CommissioningController.java41
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java24
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/InstantiationHandler.java28
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/rest/InstantiationController.java2
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java4
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java1
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java68
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeCommandLineArguments.java136
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringHandler.java20
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java6
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java450
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java116
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java75
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java75
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java74
-rw-r--r--tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java53
18 files changed, 978 insertions, 230 deletions
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningHandler.java
index ab917b74e..88e8b1df9 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningHandler.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningHandler.java
@@ -65,26 +65,6 @@ public final class CommissioningHandler extends ControlLoopHandler {
}
@Override
- public void startAndRegisterListeners(MessageTypeDispatcher msgDispatcher) {
- // No topic communication on this handler
- }
-
- @Override
- public void startAndRegisterPublishers(List<TopicSink> topicSinks) {
- // No topic communication on this handler
- }
-
- @Override
- public void stopAndUnregisterPublishers() {
- // No topic communication on this handler
- }
-
- @Override
- public void stopAndUnregisterListeners(MessageTypeDispatcher msgDispatcher) {
- // No topic communication on this handler
- }
-
- @Override
public void startProviders() {
provider = new CommissioningProvider(getDatabaseProviderParameters());
}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java
index 41d85726e..50f6787b9 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java
@@ -40,6 +40,7 @@ import org.onap.policy.models.provider.PolicyModelsProviderParameters;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplates;
import org.onap.policy.models.tosca.authorative.concepts.ToscaTypedEntityFilter;
/**
@@ -190,4 +191,18 @@ public class CommissioningProvider implements Closeable {
return controlLoopElementList;
}
+
+ /**
+ * Get the requested control loop definitions.
+ *
+ * @param name the name of the definition to get, null for all definitions
+ * @param version the version of the definition to get, null for all definitions
+ * @return the control loop definitions
+ * @throws PfModelException on errors getting control loop definitions
+ */
+ public ToscaServiceTemplate getToscaServiceTemplate(String name, String version) throws PfModelException {
+ ToscaServiceTemplates serviceTemplates = new ToscaServiceTemplates();
+ serviceTemplates.setServiceTemplates(modelsProvider.getServiceTemplateList(name, version));
+ return serviceTemplates.getServiceTemplates().get(0);
+ }
}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/rest/CommissioningController.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/rest/CommissioningController.java
index cd6c08e30..18e1f7787 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/rest/CommissioningController.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/rest/CommissioningController.java
@@ -44,6 +44,7 @@ import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProv
import org.onap.policy.clamp.controlloop.runtime.main.rest.RestController;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.base.PfModelRuntimeException;
+import org.onap.policy.models.errors.concepts.ErrorResponse;
import org.onap.policy.models.errors.concepts.ErrorResponseInfo;
import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
@@ -131,7 +132,9 @@ public class CommissioningController extends RestController {
} catch (PfModelRuntimeException | PfModelException e) {
LOGGER.warn("Commissioning of the control loops failed", e);
- return createCommissioningErrorResponse(e, requestId);
+ CommissioningResponse resp = new CommissioningResponse();
+ resp.setErrorDetails(e.getErrorResponse().getErrorMessage());
+ return returnResponse(e.getErrorResponse().getResponseCode(), requestId, resp);
}
}
@@ -201,7 +204,9 @@ public class CommissioningController extends RestController {
} catch (PfModelRuntimeException | PfModelException e) {
LOGGER.warn("Decommisssioning of control loop failed", e);
- return createCommissioningErrorResponse(e, requestId);
+ CommissioningResponse resp = new CommissioningResponse();
+ resp.setErrorDetails(e.getErrorResponse().getErrorMessage());
+ return returnResponse(e.getErrorResponse().getResponseCode(), requestId, resp);
}
}
@@ -255,9 +260,9 @@ public class CommissioningController extends RestController {
// @formatter:on
public Response query(@HeaderParam(REQUEST_ID_NAME) @ApiParam(REQUEST_ID_PARAM_DESCRIPTION) UUID requestId,
@ApiParam(value = "Control Loop definition name", required = true)
- @QueryParam("name") String name,
+ @QueryParam("name") String name,
@ApiParam(value = "Control Loop definition version", required = true)
- @QueryParam("version") String version) {
+ @QueryParam("version") String version) {
try {
List<ToscaNodeTemplate> response = provider.getControlLoopDefinitions(name, version);
@@ -266,7 +271,9 @@ public class CommissioningController extends RestController {
} catch (PfModelRuntimeException | PfModelException e) {
LOGGER.warn("Get of control loop definitions failed", e);
- return createCommissioningErrorResponse(e, requestId);
+ CommissioningResponse resp = new CommissioningResponse();
+ resp.setErrorDetails(e.getErrorResponse().getErrorMessage());
+ return returnResponse(e.getErrorResponse().getResponseCode(), requestId, resp);
}
}
@@ -319,33 +326,35 @@ public class CommissioningController extends RestController {
)
// @formatter:on
public Response queryElements(@HeaderParam(REQUEST_ID_NAME) @ApiParam(REQUEST_ID_PARAM_DESCRIPTION) UUID requestId,
- @ApiParam(value = "Control Loop definition name", required = true)
- @QueryParam("name") String name,
- @ApiParam(value = "Control Loop definition version", required = true)
- @QueryParam("version") String version) throws Exception {
+ @ApiParam(value = "Control Loop definition name", required = true)
+ @QueryParam("name") String name,
+ @ApiParam(value = "Control Loop definition version", required = true)
+ @QueryParam("version") String version) throws Exception {
try {
List<ToscaNodeTemplate> nodeTemplate = provider.getControlLoopDefinitions(name, version);
//Prevent ambiguous queries with multiple returns
if (nodeTemplate.size() > 1) {
- throw new Exception();
+ CommissioningResponse resp = new CommissioningResponse();
+ resp.setErrorDetails("Multiple ControlLoops are not supported");
+ return returnResponse(Response.Status.NOT_ACCEPTABLE, requestId, resp);
}
+
List<ToscaNodeTemplate> response = provider.getControlLoopElementDefinitions(nodeTemplate.get(0));
return addLoggingHeaders(addVersionControlHeaders(Response.status(Status.OK)), requestId).entity(response)
.build();
} catch (PfModelRuntimeException | PfModelException e) {
LOGGER.warn("Get of control loop element definitions failed", e);
- return createCommissioningErrorResponse(e, requestId);
+ CommissioningResponse resp = new CommissioningResponse();
+ resp.setErrorDetails(e.getErrorResponse().getErrorMessage());
+ return returnResponse(e.getErrorResponse().getResponseCode(), requestId, resp);
}
}
- private Response createCommissioningErrorResponse(ErrorResponseInfo e, UUID requestId) {
- CommissioningResponse resp = new CommissioningResponse();
- resp.setErrorDetails(e.getErrorResponse().getErrorMessage());
- return addLoggingHeaders(addVersionControlHeaders(Response.status(e.getErrorResponse().getResponseCode())),
+ private Response returnResponse(Response.Status status, UUID requestId, CommissioningResponse resp) {
+ return addLoggingHeaders(addVersionControlHeaders(Response.status(status)),
requestId).entity(resp).build();
}
-
}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java
index 6fd5f3225..eb72d9219 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java
@@ -39,6 +39,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider
import org.onap.policy.clamp.controlloop.models.messages.rest.instantiation.InstantiationCommand;
import org.onap.policy.clamp.controlloop.models.messages.rest.instantiation.InstantiationResponse;
import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.common.parameters.ObjectValidationResult;
import org.onap.policy.common.parameters.ValidationResult;
@@ -140,19 +141,21 @@ public class ControlLoopInstantiationProvider implements Closeable {
*/
private BeanValidationResult validateControlLoops(ControlLoops controlLoops) throws PfModelException {
- BeanValidationResult validationResult = new BeanValidationResult("ControlLoops", controlLoops);
+ BeanValidationResult result = new BeanValidationResult("ControlLoops", controlLoops);
for (ControlLoop controlLoop : controlLoops.getControlLoopList()) {
+ BeanValidationResult subResult = new BeanValidationResult(
+ "entry " + controlLoop.getDefinition().getName(), controlLoop);
List<ToscaNodeTemplate> toscaNodeTemplates = commissioningProvider.getControlLoopDefinitions(
controlLoop.getDefinition().getName(), controlLoop.getDefinition().getVersion());
if (toscaNodeTemplates.isEmpty()) {
- validationResult
+ subResult
.addResult(new ObjectValidationResult("ControlLoop", controlLoop.getDefinition().getName(),
ValidationStatus.INVALID, "Commissioned control loop definition not FOUND"));
} else if (toscaNodeTemplates.size() > 1) {
- validationResult
+ subResult
.addResult(new ObjectValidationResult("ControlLoop", controlLoop.getDefinition().getName(),
ValidationStatus.INVALID, "Commissioned control loop definition not VALID"));
} else {
@@ -167,12 +170,13 @@ public class ControlLoopInstantiationProvider implements Closeable {
.collect(Collectors.toMap(ToscaConceptIdentifier::getName, UnaryOperator.identity()));
// @formatter:on
- for (ControlLoopElement element : controlLoop.getElements()) {
- validationResult.addResult(validateDefinition(definitions, element.getDefinition()));
+ for (ControlLoopElement element : controlLoop.getElements().values()) {
+ subResult.addResult(validateDefinition(definitions, element.getDefinition()));
}
}
+ result.addResult(subResult);
}
- return validationResult;
+ return result;
}
/**
@@ -183,15 +187,13 @@ public class ControlLoopInstantiationProvider implements Closeable {
* @result result the validation result
*/
private ValidationResult validateDefinition(Map<String, ToscaConceptIdentifier> definitions,
- ToscaConceptIdentifier definition) {
- BeanValidationResult result = new BeanValidationResult(definition.getName(), definition);
+ ToscaConceptIdentifier definition) {
+ BeanValidationResult result = new BeanValidationResult("entry " + definition.getName(), definition);
ToscaConceptIdentifier identifier = definitions.get(definition.getName());
if (identifier == null) {
result.setResult(ValidationStatus.INVALID, "Not FOUND");
} else if (!identifier.equals(definition)) {
result.setResult(ValidationStatus.INVALID, "Version not matching");
- } else {
- result.setResult(ValidationStatus.CLEAN);
}
return (result.isClean() ? null : result);
}
@@ -264,6 +266,8 @@ public class ControlLoopInstantiationProvider implements Closeable {
controlLoopProvider.updateControlLoops(controlLoops);
}
+ SupervisionHandler supervisionHandler = SupervisionHandler.getInstance();
+ supervisionHandler.triggerControlLoopSupervision(command.getControlLoopIdentifierList());
InstantiationResponse response = new InstantiationResponse();
response.setAffectedControlLoops(command.getControlLoopIdentifierList());
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/InstantiationHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/InstantiationHandler.java
index fd5288fda..d81e54ccf 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/InstantiationHandler.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/InstantiationHandler.java
@@ -21,13 +21,11 @@
package org.onap.policy.clamp.controlloop.runtime.instantiation;
import java.io.IOException;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.ws.rs.core.Response;
import lombok.Getter;
import org.onap.policy.clamp.controlloop.common.handler.ControlLoopHandler;
-import org.onap.policy.clamp.controlloop.runtime.commissioning.rest.CommissioningController;
import org.onap.policy.clamp.controlloop.runtime.instantiation.rest.InstantiationController;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
@@ -36,11 +34,9 @@ import org.onap.policy.common.utils.services.Registry;
import org.onap.policy.models.base.PfModelRuntimeException;
/**
- * This class handles instantiation of control loop instances,
- * so only one object of this type should be built at a time.
+ * This class handles instantiation of control loop instances.
*
- * </p>
- * It is effectively a singleton that is started at system start
+ * <p/>It is effectively a singleton that is started at system start
*/
public final class InstantiationHandler extends ControlLoopHandler {
@@ -71,26 +67,6 @@ public final class InstantiationHandler extends ControlLoopHandler {
}
@Override
- public void startAndRegisterListeners(MessageTypeDispatcher msgDispatcher) {
- // No topic communication on this handler
- }
-
- @Override
- public void startAndRegisterPublishers(List<TopicSink> topicSinks) {
- // No topic communication on this handler
- }
-
- @Override
- public void stopAndUnregisterPublishers() {
- // No topic communication on this handler
- }
-
- @Override
- public void stopAndUnregisterListeners(MessageTypeDispatcher msgDispatcher) {
- // No topic communication on this handler
- }
-
- @Override
public void startProviders() {
controlLoopInstantiationProvider = new ControlLoopInstantiationProvider(getDatabaseProviderParameters());
}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/rest/InstantiationController.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/rest/InstantiationController.java
index 807da5d68..7581aaf74 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/rest/InstantiationController.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/rest/InstantiationController.java
@@ -412,5 +412,5 @@ public class InstantiationController extends RestController {
return addLoggingHeaders(addVersionControlHeaders(Response.status(e.getErrorResponse().getResponseCode())),
requestId).entity(resp).build();
}
-
}
+
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java
index a7f5ff34d..a463ad171 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java
@@ -24,7 +24,7 @@ import java.io.File;
import javax.ws.rs.core.Response;
import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException;
import org.onap.policy.clamp.controlloop.runtime.main.startstop.ClRuntimeCommandLineArguments;
-import org.onap.policy.common.parameters.GroupValidationResult;
+import org.onap.policy.common.parameters.ValidationResult;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
@@ -66,7 +66,7 @@ public class ClRuntimeParameterHandler {
}
// validate the parameters
- final GroupValidationResult validationResult = clRuntimeParameterGroup.validate();
+ final ValidationResult validationResult = clRuntimeParameterGroup.validate();
if (!validationResult.isValid()) {
throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, "validation error(s) on parameters from \""
+ arguments.getConfigurationFilePath() + "\"\n" + validationResult.getResult());
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java
index f70e7d590..2af5be534 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java
@@ -51,3 +51,4 @@ public class ParticipantUpdateParameters extends ParameterGroupImpl {
super(ParticipantUpdateParameters.class.getSimpleName());
}
}
+
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java
index 5959586da..a4238a9c4 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java
@@ -32,6 +32,7 @@ import org.onap.policy.clamp.controlloop.runtime.instantiation.InstantiationHand
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.clamp.controlloop.runtime.main.rest.ControlLoopAafFilter;
import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringHandler;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -65,6 +66,7 @@ public class ClRuntimeActivator extends ServiceManagerContainer {
* @param clRuntimeParameterGroup the parameters for the control loop runtime service
*/
public ClRuntimeActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup) {
+
if (clRuntimeParameterGroup == null || !clRuntimeParameterGroup.isValid()) {
throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, "ParameterGroup not valid");
}
@@ -86,28 +88,38 @@ public class ClRuntimeActivator extends ServiceManagerContainer {
final AtomicReference<ControlLoopHandler> commissioningHandler = new AtomicReference<>();
final AtomicReference<ControlLoopHandler> instantiationHandler = new AtomicReference<>();
+ final AtomicReference<ControlLoopHandler> supervisionHandler = new AtomicReference<>();
final AtomicReference<ControlLoopHandler> monitoringHandler = new AtomicReference<>();
-
final AtomicReference<RestServer> restServer = new AtomicReference<>();
+
// @formatter:off
addAction("Control loop runtime parameters",
- () -> ParameterService.register(clRuntimeParameterGroup),
- () -> ParameterService.deregister(clRuntimeParameterGroup.getName()));
+ () -> ParameterService.register(clRuntimeParameterGroup),
+ () -> ParameterService.deregister(clRuntimeParameterGroup.getName()));
+
addAction("Topic endpoint management",
- () -> TopicEndpointManager.getManager().start(),
- () -> TopicEndpointManager.getManager().shutdown());
+ () -> TopicEndpointManager.getManager().start(),
+ () -> TopicEndpointManager.getManager().shutdown());
+
addAction("Commissioning Handler",
() -> commissioningHandler.set(new CommissioningHandler(clRuntimeParameterGroup)),
() -> commissioningHandler.get().close());
+
addAction("Instantiation Handler",
- () -> instantiationHandler.set(new InstantiationHandler(clRuntimeParameterGroup)),
- () -> instantiationHandler.get().close());
+ () -> instantiationHandler.set(new InstantiationHandler(clRuntimeParameterGroup)),
+ () -> instantiationHandler.get().close());
+
+ addAction("Supervision Handler",
+ () -> supervisionHandler.set(new SupervisionHandler(clRuntimeParameterGroup)),
+ () -> supervisionHandler.get().close());
+
addAction("Monitoring Handler",
() -> monitoringHandler.set(new MonitoringHandler(clRuntimeParameterGroup)),
() -> monitoringHandler.get().close());
addHandlerActions("Commissioning", commissioningHandler);
addHandlerActions("Instantiation", instantiationHandler);
+ addHandlerActions("Supervision", supervisionHandler);
addHandlerActions("Monitoring", monitoringHandler);
addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
@@ -115,34 +127,34 @@ public class ClRuntimeActivator extends ServiceManagerContainer {
clRuntimeParameterGroup.getRestServerParameters().setName(clRuntimeParameterGroup.getName());
addAction("REST server",
- () -> {
- Set<Class<?>> providerClasses = new HashSet<>();
- providerClasses.addAll(commissioningHandler.get().getProviderClasses());
- providerClasses.addAll(instantiationHandler.get().getProviderClasses());
- providerClasses.addAll(monitoringHandler.get().getProviderClasses());
-
- RestServer server = new RestServer(clRuntimeParameterGroup.getRestServerParameters(),
- ControlLoopAafFilter.class,
- providerClasses.toArray(new Class<?>[providerClasses.size()]));
- restServer.set(server);
- restServer.get().start();
- },
- () -> restServer.get().stop());
+ () -> {
+ Set<Class<?>> providerClasses = new HashSet<>();
+ providerClasses.addAll(commissioningHandler.get().getProviderClasses());
+ providerClasses.addAll(instantiationHandler.get().getProviderClasses());
+ providerClasses.addAll(supervisionHandler.get().getProviderClasses());
+ providerClasses.addAll(monitoringHandler.get().getProviderClasses());
+
+ RestServer server = new RestServer(clRuntimeParameterGroup.getRestServerParameters(),
+ ControlLoopAafFilter.class,
+ providerClasses.toArray(new Class<?>[providerClasses.size()]));
+
+ restServer.set(server);
+ restServer.get().start();
+ },
+ () -> restServer.get().stop());
// @formatter:on
}
private void addHandlerActions(final String name, final AtomicReference<ControlLoopHandler> handler) {
addAction(name + " Providers",
- () -> handler.get().startProviders(),
- () -> handler.get().stopProviders());
-
+ () -> handler.get().startProviders(),
+ () -> handler.get().stopProviders());
addAction(name + " Listeners",
- () -> handler.get().startAndRegisterListeners(msgDispatcher),
- () -> handler.get().stopAndUnregisterListeners(msgDispatcher));
-
+ () -> handler.get().startAndRegisterListeners(msgDispatcher),
+ () -> handler.get().stopAndUnregisterListeners(msgDispatcher));
addAction(name + " Publishers",
- () -> handler.get().startAndRegisterPublishers(topicSinks),
- () -> handler.get().stopAndUnregisterPublishers());
+ () -> handler.get().startAndRegisterPublishers(topicSinks),
+ () -> handler.get().stopAndUnregisterPublishers());
}
/**
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeCommandLineArguments.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeCommandLineArguments.java
index fa25b6ddb..f36bb858b 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeCommandLineArguments.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeCommandLineArguments.java
@@ -26,55 +26,58 @@ import java.io.StringWriter;
import java.net.URL;
import java.util.Arrays;
import javax.ws.rs.core.Response;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
+import org.onap.policy.clamp.controlloop.common.startstop.CommonCommandLineArguments;
import org.onap.policy.common.utils.resources.ResourceUtils;
-
/**
* This class reads and handles command line parameters for the control loop runtime service.
- *
*/
public class ClRuntimeCommandLineArguments {
private static final String FILE_MESSAGE_PREAMBLE = " file \"";
private static final int HELP_LINE_LENGTH = 120;
private final Options options;
+ private final CommonCommandLineArguments commonCommandLineArguments;
+
+ @Getter()
+ @Setter()
private String configurationFilePath = null;
/**
* Construct the options for the control loop runtime component.
*/
public ClRuntimeCommandLineArguments() {
- //@formatter:off
options = new Options();
- options.addOption(Option.builder("h")
- .longOpt("help")
- .desc("outputs the usage of this command")
- .required(false)
- .type(Boolean.class)
- .build());
- options.addOption(Option.builder("v")
- .longOpt("version")
- .desc("outputs the version of control loop runtime")
- .required(false)
- .type(Boolean.class)
- .build());
- options.addOption(Option.builder("c")
- .longOpt("config-file")
- .desc("the full path to the configuration file to use, "
- + "the configuration file must be a Json file containing the control loop runtime parameters")
- .hasArg()
- .argName("CONFIG_FILE")
- .required(false)
- .type(String.class)
- .build());
- //@formatter:on
+ commonCommandLineArguments = new CommonCommandLineArguments(options);
+ }
+
+ /**
+ * Construct the options for the CLI editor and parse in the given arguments.
+ *
+ * @param args The command line arguments
+ */
+ public ClRuntimeCommandLineArguments(final String[] args) {
+ // Set up the options with the default constructor
+ this();
+
+ // Parse the arguments
+ try {
+ parse(args);
+ } catch (final ControlLoopException e) {
+ throw new ControlLoopRuntimeException(Response.Status.NOT_ACCEPTABLE,
+ "parse error on control loop runtime parameters", e);
+ }
}
/**
@@ -87,13 +90,12 @@ public class ClRuntimeCommandLineArguments {
public String parse(final String[] args) throws ControlLoopException {
// Clear all our arguments
setConfigurationFilePath(null);
-
CommandLine commandLine = null;
try {
commandLine = new DefaultParser().parse(options, args);
} catch (final ParseException e) {
throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE,
- "invalid command line arguments specified", e);
+ "invalid command line arguments specified : " + e.getMessage());
}
// Arguments left over after Commons CLI does its stuff
@@ -104,16 +106,12 @@ public class ClRuntimeCommandLineArguments {
"too many command line arguments specified : " + Arrays.toString(args));
}
- if (remainingArgs.length == 1) {
- configurationFilePath = remainingArgs[0];
- }
-
if (commandLine.hasOption('h')) {
- return help(Main.class.getName());
+ return commonCommandLineArguments.help(Main.class.getName(), options);
}
if (commandLine.hasOption('v')) {
- return version();
+ return commonCommandLineArguments.version();
}
if (commandLine.hasOption('c')) {
@@ -129,42 +127,7 @@ public class ClRuntimeCommandLineArguments {
* @throws ControlLoopException on command argument validation errors
*/
public void validate() throws ControlLoopException {
- validateReadableFile("control loop runtime configuration", configurationFilePath);
- }
-
- /**
- * Print version information for control loop runtime.
- *
- * @return the version string
- */
- public String version() {
- return ResourceUtils.getResourceAsString("version.txt");
- }
-
- /**
- * Print help information for control loop runtime.
- *
- * @param mainClassName the main class name
- * @return the help string
- */
- public String help(final String mainClassName) {
- final HelpFormatter helpFormatter = new HelpFormatter();
- final StringWriter stringWriter = new StringWriter();
- final PrintWriter printWriter = new PrintWriter(stringWriter);
-
- helpFormatter.printHelp(printWriter, HELP_LINE_LENGTH, mainClassName + " [options...]", "options", options, 0,
- 0, "");
-
- return stringWriter.toString();
- }
-
- /**
- * Gets the configuration file path.
- *
- * @return the configuration file path
- */
- public String getConfigurationFilePath() {
- return configurationFilePath;
+ commonCommandLineArguments.validate(configurationFilePath);
}
/**
@@ -185,39 +148,4 @@ public class ClRuntimeCommandLineArguments {
this.configurationFilePath = configurationFilePath;
}
-
- /**
- * Validate readable file.
- *
- * @param fileTag the file tag
- * @param fileName the file name
- * @throws ControlLoopException on the file name passed as a parameter
- */
- private void validateReadableFile(final String fileTag, final String fileName) throws ControlLoopException {
- if (fileName == null || fileName.length() == 0) {
- throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE,
- fileTag + " file was not specified as an argument");
- }
-
- // The file name refers to a resource on the local file system
- final URL fileUrl = ResourceUtils.getUrl4Resource(fileName);
- if (fileUrl == null) {
- throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE,
- fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" does not exist");
- }
-
- final File theFile = new File(fileUrl.getPath());
- if (!theFile.exists()) {
- throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE,
- fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" does not exist");
- }
- if (!theFile.isFile()) {
- throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE,
- fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is not a normal file");
- }
- if (!theFile.canRead()) {
- throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE,
- fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is ureadable");
- }
- }
}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringHandler.java
index 04f458e7d..a7ad9180a 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringHandler.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringHandler.java
@@ -69,26 +69,6 @@ public class MonitoringHandler extends ControlLoopHandler {
}
@Override
- public void startAndRegisterListeners(MessageTypeDispatcher msgDispatcher) {
- // No topic communication on this handler
- }
-
- @Override
- public void startAndRegisterPublishers(List<TopicSink> topicSinks) {
- // No topic communication on this handler
- }
-
- @Override
- public void stopAndUnregisterPublishers() {
- // No topic communication on this handler
- }
-
- @Override
- public void stopAndUnregisterListeners(MessageTypeDispatcher msgDispatcher) {
- // No topic communication on this handler
- }
-
- @Override
public void startProviders() {
monitoringProvider = new MonitoringProvider(getDatabaseProviderParameters());
}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java
index e46e66501..193f8d557 100644
--- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java
@@ -206,7 +206,7 @@ public class MonitoringProvider implements Closeable {
ControlLoop controlLoop = controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(name,
version));
if (controlLoop != null) {
- clElements.addAll(controlLoop.getElements());
+ clElements.addAll(controlLoop.getElements().values());
//Collect control loop element statistics for each cl element.
for (ControlLoopElement clElement : clElements) {
clElementStats.addAll(fetchFilteredClElementStatistics(clElement.getParticipantId().getName(),
@@ -235,7 +235,7 @@ public class MonitoringProvider implements Closeable {
List<ToscaConceptIdentifier> participantIds = new ArrayList<>();
ControlLoop controlLoop = controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(name, version));
if (controlLoop != null) {
- for (ControlLoopElement clElement : controlLoop.getElements()) {
+ for (ControlLoopElement clElement : controlLoop.getElements().values()) {
participantIds.add(clElement.getParticipantId());
}
}
@@ -256,7 +256,7 @@ public class MonitoringProvider implements Closeable {
Map<String, ToscaConceptIdentifier> clElementId = new HashMap<>();
ControlLoop controlLoop = controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(name, version));
if (controlLoop != null) {
- for (ControlLoopElement clElement : controlLoop.getElements()) {
+ for (ControlLoopElement clElement : controlLoop.getElements().values()) {
clElementId.put(clElement.getId().toString(), clElement.getParticipantId());
}
}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
new file mode 100644
index 000000000..63bff00fc
--- /dev/null
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
@@ -0,0 +1,450 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import org.apache.commons.collections4.CollectionUtils;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
+import org.onap.policy.clamp.controlloop.common.handler.ControlLoopHandler;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
+import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
+import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningHandler;
+import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
+import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
+import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringHandler;
+import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopStateChangePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopUpdatePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStateChangePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.onap.policy.common.utils.services.Registry;
+import org.onap.policy.common.utils.services.ServiceManager;
+import org.onap.policy.common.utils.services.ServiceManagerException;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles supervision of control loop instances, so only one object of this type should be built at a time.
+ *
+ * <p/> It is effectively a singleton that is started at system start.
+ */
+public class SupervisionHandler extends ControlLoopHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class);
+
+ private static final String CONTROL_LOOP_CANNOT_TRANSITION_FROM_STATE = "Control loop can't transition from state ";
+ private static final String CONTROL_LOOP_IS_ALREADY_IN_STATE = "Control loop is already in state ";
+ private static final String TO_STATE = " to state ";
+ private static final String AND_TRANSITIONING_TO_STATE = " and transitioning to state ";
+
+ private ControlLoopProvider controlLoopProvider;
+ private ParticipantProvider participantProvider;
+ private CommissioningProvider commissioningProvider;
+ private MonitoringProvider monitoringProvider;
+
+ // Publishers for participant communication
+ private ParticipantStateChangePublisher stateChangePublisher;
+ private ParticipantControlLoopUpdatePublisher controlLoopUpdatePublisher;
+ private ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher;
+
+ // Database scanner
+ private SupervisionScanner scanner;
+
+ /**
+ * Used to manage the services.
+ */
+ private ServiceManager manager;
+ private ServiceManager publisherManager;
+
+ /**
+ * Gets the SupervisionHandler.
+ *
+ * @return SupervisionHandler
+ */
+ public static SupervisionHandler getInstance() {
+ return Registry.get(SupervisionHandler.class.getName());
+ }
+
+ /**
+ * Create a handler.
+ *
+ * @param clRuntimeParameterGroup the parameters for the control loop runtime
+ */
+ public SupervisionHandler(ClRuntimeParameterGroup clRuntimeParameterGroup) {
+ super(clRuntimeParameterGroup.getDatabaseProviderParameters());
+ // @formatter:off
+ this.manager = new ServiceManager()
+ .addAction("ControlLoop Provider",
+ () -> controlLoopProvider = new ControlLoopProvider(getDatabaseProviderParameters()),
+ () -> controlLoopProvider = null)
+ .addAction("Participant Provider",
+ () -> participantProvider = new ParticipantProvider(getDatabaseProviderParameters()),
+ () -> participantProvider = null);
+ // @formatter:on
+ }
+
+ /**
+ * Supervision trigger called when a command is issued on control loops.
+ *
+ * </p> Causes supervision to start or continue supervision on the control loops in question.
+ *
+ * @param controlLoopIdentifierList the control loops for which the supervision command has been issued
+ * @throws ControlLoopException on supervision triggering exceptions
+ */
+ public void triggerControlLoopSupervision(List<ToscaConceptIdentifier> controlLoopIdentifierList)
+ throws ControlLoopException {
+
+ LOGGER.debug("triggering control loop supervision on control loops {}", controlLoopIdentifierList);
+
+ if (CollectionUtils.isEmpty(controlLoopIdentifierList)) {
+ // This is just to force throwing of the exception in certain circumstances.
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ "The list of control loops for supervision is empty");
+ }
+
+ for (ToscaConceptIdentifier controlLoopId : controlLoopIdentifierList) {
+ try {
+ ControlLoop controlLoop = controlLoopProvider.getControlLoop(controlLoopId);
+
+ superviseControlLoop(controlLoop);
+
+ controlLoopProvider.updateControlLoop(controlLoop);
+ } catch (PfModelException pfme) {
+ throw new ControlLoopException(pfme.getErrorResponse().getResponseCode(), pfme.getMessage(), pfme);
+ }
+ }
+ }
+
+ @Override
+ public void startAndRegisterListeners(MessageTypeDispatcher msgDispatcher) {
+ msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS.name(), new ParticipantStatusListener());
+ }
+
+ @Override
+ public void startAndRegisterPublishers(List<TopicSink> topicSinks) {
+ // TODO: Use a parameter for the timeout
+ // @formatter:off
+ this.publisherManager = new ServiceManager()
+ .addAction("Supervision scanner",
+ () -> scanner = new SupervisionScanner(controlLoopProvider, 10000),
+ () -> scanner = null)
+ .addAction("ControlLoopUpdate publisher",
+ () -> controlLoopUpdatePublisher = new ParticipantControlLoopUpdatePublisher(topicSinks, -1),
+ () -> controlLoopUpdatePublisher.terminate())
+ .addAction("StateChange Publisher",
+ () -> stateChangePublisher = new ParticipantStateChangePublisher(topicSinks, 10000),
+ () -> stateChangePublisher.terminate())
+ .addAction("ControlLoopStateChange Publisher",
+ () -> controlLoopStateChangePublisher =
+ new ParticipantControlLoopStateChangePublisher(topicSinks, -1),
+ () -> controlLoopStateChangePublisher.terminate());
+ // @formatter:on
+ try {
+ publisherManager.start();
+ } catch (final ServiceManagerException exp) {
+ throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
+ "Supervision handler start of publishers or scanner failed", exp);
+ }
+ }
+
+ @Override
+ public void stopAndUnregisterPublishers() {
+ try {
+ publisherManager.stop();
+ } catch (final ServiceManagerException exp) {
+ throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
+ "Supervision handler stop of publishers or scanner failed", exp);
+ }
+ }
+
+ @Override
+ public void stopAndUnregisterListeners(MessageTypeDispatcher msgDispatcher) {
+ msgDispatcher.unregister(ParticipantMessageType.PARTICIPANT_STATUS.name());
+ }
+
+ /**
+ * Handle a ParticipantStatus message from a participant.
+ *
+ * @param participantStatusMessage the ParticipantStatus message received from a participant
+ */
+ public void handleParticipantStatusMessage(ParticipantStatus participantStatusMessage) {
+ LOGGER.debug("Participant Status received {}", participantStatusMessage);
+
+ try {
+ superviseParticipant(participantStatusMessage);
+ } catch (PfModelException | ControlLoopException svExc) {
+ LOGGER.warn("error supervising participant {}", participantStatusMessage.getParticipantId(), svExc);
+ return;
+ }
+
+ try {
+ superviseControlLoops(participantStatusMessage);
+ } catch (PfModelException | ControlLoopException svExc) {
+ LOGGER.warn("error supervising participant {}", participantStatusMessage.getParticipantId(), svExc);
+ }
+ }
+
+ /**
+ * Supervise a control loop, performing whatever actions need to be performed on the control loop.
+ *
+ * @param controlLoop the control loop to supervises
+ * @throws ControlLoopException on supervision errors
+ */
+ private void superviseControlLoop(ControlLoop controlLoop) throws ControlLoopException, PfModelException {
+ switch (controlLoop.getOrderedState()) {
+ case UNINITIALISED:
+ superviseControlLoopUninitialization(controlLoop);
+ break;
+
+ case PASSIVE:
+ superviseControlLoopPassivation(controlLoop);
+ break;
+
+ case RUNNING:
+ superviseControlLoopActivation(controlLoop);
+ break;
+
+ default:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ "A control loop cannot be commanded to go into state " + controlLoop.getOrderedState().name());
+ }
+ }
+
+ /**
+ * Supervise a control loop uninitialisation, performing whatever actions need to be performed on the control loop,
+ * control loop ordered state is UNINITIALIZED.
+ *
+ * @param controlLoop the control loop to supervises
+ * @throws ControlLoopException on supervision errors
+ */
+ private void superviseControlLoopUninitialization(ControlLoop controlLoop) throws ControlLoopException {
+ switch (controlLoop.getState()) {
+ case UNINITIALISED:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ CONTROL_LOOP_IS_ALREADY_IN_STATE + controlLoop.getState().name());
+ break;
+
+ case UNINITIALISED2PASSIVE:
+ case PASSIVE:
+ controlLoop.setState(ControlLoopState.PASSIVE2UNINITIALISED);
+ sendControlLoopStateChange(controlLoop);
+ break;
+
+ case PASSIVE2UNINITIALISED:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_IS_ALREADY_IN_STATE
+ + controlLoop.getState().name() + AND_TRANSITIONING_TO_STATE + controlLoop.getOrderedState());
+ break;
+
+ default:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_CANNOT_TRANSITION_FROM_STATE
+ + controlLoop.getState().name() + TO_STATE + controlLoop.getOrderedState());
+ break;
+ }
+ }
+
+ private void superviseControlLoopPassivation(ControlLoop controlLoop)
+ throws ControlLoopException, PfModelException {
+ switch (controlLoop.getState()) {
+ case PASSIVE:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ CONTROL_LOOP_IS_ALREADY_IN_STATE + controlLoop.getState().name());
+ break;
+ case UNINITIALISED:
+ controlLoop.setState(ControlLoopState.UNINITIALISED2PASSIVE);
+ sendControlLoopUpdate(controlLoop);
+ break;
+
+ case UNINITIALISED2PASSIVE:
+ case RUNNING2PASSIVE:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_IS_ALREADY_IN_STATE
+ + controlLoop.getState().name() + AND_TRANSITIONING_TO_STATE + controlLoop.getOrderedState());
+ break;
+
+ case RUNNING:
+ controlLoop.setState(ControlLoopState.RUNNING2PASSIVE);
+ sendControlLoopStateChange(controlLoop);
+ break;
+
+ default:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_CANNOT_TRANSITION_FROM_STATE
+ + controlLoop.getState().name() + TO_STATE + controlLoop.getOrderedState());
+ break;
+ }
+ }
+
+ private void superviseControlLoopActivation(ControlLoop controlLoop) throws ControlLoopException {
+ switch (controlLoop.getState()) {
+ case RUNNING:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE,
+ CONTROL_LOOP_IS_ALREADY_IN_STATE + controlLoop.getState().name());
+ break;
+
+ case PASSIVE2RUNNING:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_IS_ALREADY_IN_STATE
+ + controlLoop.getState().name() + AND_TRANSITIONING_TO_STATE + controlLoop.getOrderedState());
+ break;
+
+ case PASSIVE:
+ controlLoop.setState(ControlLoopState.PASSIVE2RUNNING);
+ sendControlLoopStateChange(controlLoop);
+ break;
+
+ default:
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_CANNOT_TRANSITION_FROM_STATE
+ + controlLoop.getState().name() + TO_STATE + controlLoop.getOrderedState());
+ break;
+ }
+ }
+
+ private void sendControlLoopUpdate(ControlLoop controlLoop) throws PfModelException {
+ ParticipantControlLoopUpdate pclu = new ParticipantControlLoopUpdate();
+ pclu.setControlLoopId(controlLoop.getKey().asIdentifier());
+ pclu.setControlLoop(controlLoop);
+ // TODO: We should look up the correct TOSCA node template here for the control loop
+ // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap
+ commissioningProvider = CommissioningHandler.getInstance().getProvider();
+ pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
+ controlLoopUpdatePublisher.send(pclu);
+ }
+
+ private void sendControlLoopStateChange(ControlLoop controlLoop) {
+ ParticipantControlLoopStateChange clsc = new ParticipantControlLoopStateChange();
+ clsc.setControlLoopId(controlLoop.getKey().asIdentifier());
+ clsc.setMessageId(UUID.randomUUID());
+ clsc.setOrderedState(controlLoop.getOrderedState());
+
+ controlLoopStateChangePublisher.send(clsc);
+ }
+
+ private void superviseParticipant(ParticipantStatus participantStatusMessage)
+ throws PfModelException, ControlLoopException {
+ if (participantStatusMessage.getParticipantId() == null) {
+ exceptionOccured(Response.Status.NOT_FOUND,
+ "Participant ID on PARTICIPANT_STATUS message is null");
+ }
+
+ List<Participant> participantList =
+ participantProvider.getParticipants(participantStatusMessage.getParticipantId().getName(),
+ participantStatusMessage.getParticipantId().getVersion());
+
+ if (CollectionUtils.isEmpty(participantList)) {
+ Participant participant = new Participant();
+ participant.setName(participantStatusMessage.getParticipantId().getName());
+ participant.setVersion(participantStatusMessage.getParticipantId().getVersion());
+ participant.setDefinition(new ToscaConceptIdentifier("unknown", "0.0.0"));
+ participant.setParticipantState(participantStatusMessage.getState());
+ participant.setHealthStatus(participantStatusMessage.getHealthStatus());
+
+ participantList.add(participant);
+ participantProvider.createParticipants(participantList);
+ } else {
+ for (Participant participant : participantList) {
+ participant.setParticipantState(participantStatusMessage.getState());
+ participant.setHealthStatus(participantStatusMessage.getHealthStatus());
+ }
+ participantProvider.updateParticipants(participantList);
+ }
+
+ monitoringProvider = MonitoringHandler.getInstance().getMonitoringProvider();
+ monitoringProvider.createParticipantStatistics(
+ List.of(participantStatusMessage.getParticipantStatistics()));
+ }
+
+ private void superviseControlLoops(ParticipantStatus participantStatusMessage)
+ throws PfModelException, ControlLoopException {
+ if (CollectionUtils.isEmpty(participantStatusMessage.getControlLoops().getControlLoopList())) {
+ return;
+ }
+
+ for (ControlLoop controlLoop : participantStatusMessage.getControlLoops().getControlLoopList()) {
+ if (controlLoop == null) {
+ exceptionOccured(Response.Status.NOT_FOUND,
+ "PARTICIPANT_STATUS message references unknown control loop: " + controlLoop);
+ }
+
+ ControlLoop dbControlLoop = controlLoopProvider
+ .getControlLoop(new ToscaConceptIdentifier(controlLoop.getName(), controlLoop.getVersion()));
+ if (dbControlLoop == null) {
+ exceptionOccured(Response.Status.NOT_FOUND,
+ "PARTICIPANT_STATUS control loop not found in database: " + controlLoop);
+ }
+
+ for (ControlLoopElement element : controlLoop.getElements().values()) {
+ ControlLoopElement dbElement = dbControlLoop.getElements().get(element.getId());
+
+ if (dbElement == null) {
+ exceptionOccured(Response.Status.NOT_FOUND,
+ "PARTICIPANT_STATUS message references unknown control loop element: " + element);
+ }
+
+ // Replace element entry in the database
+ dbControlLoop.getElements().put(element.getId(), element);
+ }
+ controlLoopProvider.updateControlLoop(dbControlLoop);
+ }
+
+ monitoringProvider = MonitoringHandler.getInstance().getMonitoringProvider();
+ for (ControlLoop controlLoop : participantStatusMessage.getControlLoops().getControlLoopList()) {
+ monitoringProvider.createClElementStatistics(controlLoop.getControlLoopElementStatisticsList(controlLoop));
+ }
+ }
+
+ @Override
+ public void startProviders() {
+ try {
+ manager.start();
+ } catch (final ServiceManagerException exp) {
+ throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
+ "Supervision handler start of providers failed", exp);
+ }
+ }
+
+ @Override
+ public void stopProviders() {
+ try {
+ manager.stop();
+ } catch (final ServiceManagerException exp) {
+ throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
+ "Supervision handler stop of providers failed", exp);
+ }
+ }
+
+ private void exceptionOccured(Response.Status status, String reason) throws ControlLoopException {
+ throw new ControlLoopException(status, reason);
+ }
+}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
new file mode 100644
index 000000000..0ccfddff3
--- /dev/null
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java
@@ -0,0 +1,116 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
+import org.onap.policy.models.base.PfModelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to scan the control loops in the database and check if they are in the correct state.
+ */
+public class SupervisionScanner implements Runnable, Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class);
+
+ private ControlLoopProvider controlLoopProvider;
+ private ScheduledExecutorService timerPool;
+
+ /**
+ * Constructor for instantiating SupervisionScanner.
+ *
+ * @param controlLoopProvider the provider to use to read control loops from the database
+ * @param interval time interval to perform scans
+ */
+ public SupervisionScanner(final ControlLoopProvider controlLoopProvider, final long interval) {
+ this.controlLoopProvider = controlLoopProvider;
+
+ // Kick off the timer
+ timerPool = makeTimerPool();
+ timerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void run() {
+ LOGGER.debug("Scanning control loops in the database . . .");
+
+ try {
+ for (ControlLoop controlLoop : controlLoopProvider.getControlLoops(null, null)) {
+ scanControlLoop(controlLoop);
+ }
+ } catch (PfModelException pfme) {
+ LOGGER.warn("error reading control loops from database", pfme);
+ }
+
+ LOGGER.debug("Control loop scan complete . . .");
+ }
+
+ @Override
+ public void close() {
+ timerPool.shutdown();
+ }
+
+ private void scanControlLoop(final ControlLoop controlLoop) throws PfModelException {
+ LOGGER.debug("scanning control loop {} . . .", controlLoop.getKey().asIdentifier());
+
+ if (controlLoop.getState().equals(controlLoop.getOrderedState().asState())) {
+ LOGGER.debug("control loop {} scanned, OK", controlLoop.getKey().asIdentifier());
+ return;
+ }
+
+ for (ControlLoopElement element : controlLoop.getElements().values()) {
+ if (!element.getState().equals(element.getOrderedState().asState())) {
+ LOGGER.debug("control loop scan: transitioning from state {} to {}", controlLoop.getState(),
+ controlLoop.getOrderedState());
+ return;
+ }
+ }
+
+ LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(),
+ controlLoop.getOrderedState());
+
+ controlLoop.setState(controlLoop.getOrderedState().asState());
+ controlLoopProvider.updateControlLoop(controlLoop);
+ }
+
+ /**
+ * Makes a new timer pool.
+ *
+ * @return a new timer pool
+ */
+ protected ScheduledExecutorService makeTimerPool() {
+ return Executors.newScheduledThreadPool(1);
+ }
+}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java
new file mode 100644
index 000000000..c9c8ab851
--- /dev/null
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java
@@ -0,0 +1,75 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import java.util.List;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to send ParticipantControlLoopStateChangePublisher messages to participants on DMaaP.
+ */
+public class ParticipantControlLoopStateChangePublisher {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantControlLoopStateChangePublisher.class);
+
+ private TopicSinkClient topicSinkClient;
+
+ /**
+ * Constructor for instantiating ParticipantControlLoopStateChangePublisherPublisher.
+ *
+ * @param topicSinks the topic sinks
+ * @param interval time interval to send ParticipantControlLoopStateChangePublisher messages
+ */
+ public ParticipantControlLoopStateChangePublisher(final List<TopicSink> topicSinks, final long interval) {
+ // TODO: Should not be dependent on the order of topic sinks in the config
+ this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ }
+
+ /**
+ * Terminates the current timer.
+ */
+ public void terminate() {
+ // This is a user initiated message and doesn't need a timer.
+ }
+
+ /**
+ * Get the current time interval used by the timer task.
+ *
+ * @return interval the current time interval
+ */
+ public long getInterval() {
+ // This is a user initiated message and doesn't need a timer.
+ return -1;
+ }
+
+ /**
+ * Method to send ParticipantControlLoopStateChangePublisher status message to participants on demand.
+ *
+ * @param controlLoopStateChange the ParticipantControlLoopStateChangePublisher message
+ */
+ public void send(final ParticipantControlLoopStateChange controlLoopStateChange) {
+ topicSinkClient.send(controlLoopStateChange);
+ LOGGER.debug("Sent ParticipantControlLoopStateChange to Participants - {}", controlLoopStateChange);
+ }
+}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java
new file mode 100644
index 000000000..3c5d230c5
--- /dev/null
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java
@@ -0,0 +1,75 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import java.util.List;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to send ParticipantControlLoopUpdate messages to participants on DMaaP.
+ */
+public class ParticipantControlLoopUpdatePublisher {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantControlLoopUpdatePublisher.class);
+
+ private TopicSinkClient topicSinkClient;
+
+ /**
+ * Constructor for instantiating ParticipantUpdatePublisher.
+ *
+ * @param topicSinks the topic sinks
+ * @param interval time interval to send ParticipantControlLoopUpdate messages
+ */
+ public ParticipantControlLoopUpdatePublisher(final List<TopicSink> topicSinks, final long interval) {
+ // TODO: Should not be dependent on the order of topic sinks in the config
+ this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ }
+
+ /**
+ * Terminates the current timer.
+ */
+ public void terminate() {
+ // This is a user initiated message and doesn't need a timer.
+ }
+
+ /**
+ * Get the current time interval used by the timer task.
+ *
+ * @return interval the current time interval
+ */
+ public long getInterval() {
+ // This is a user initiated message and doesn't need a timer.
+ return -1;
+ }
+
+ /**
+ * Method to send ParticipantControlLoopUpdate status message to participants on demand.
+ *
+ * @param participantControlLoopUpdate the ParticipantControlLoopUpdate message
+ */
+ public void send(final ParticipantControlLoopUpdate participantControlLoopUpdate) {
+ topicSinkClient.send(participantControlLoopUpdate);
+ LOGGER.debug("Sent ParticipantControlLoopUpdate to Participants - {}", participantControlLoopUpdate);
+ }
+}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java
new file mode 100644
index 000000000..099039115
--- /dev/null
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java
@@ -0,0 +1,74 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import java.util.List;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to send ParticipantStateChange messages to participants on DMaaP.
+ */
+public class ParticipantStateChangePublisher {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStateChangePublisher.class);
+
+ private TopicSinkClient topicSinkClient;
+
+ /**
+ * Constructor for instantiating ParticipantStateChangePublisher.
+ *
+ * @param topicSinks the topic sinks
+ * @param interval time interval to send ParticipantStateChange messages
+ */
+ public ParticipantStateChangePublisher(final List<TopicSink> topicSinks, final long interval) {
+ // TODO: Should not be dependent on the order of topic sinks in the config
+ this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ }
+
+ /**
+ * Terminates the current timer.
+ */
+ public void terminate() {
+ // Nothing to terminate, this publisher does not have a timer
+ }
+
+ /**
+ * Get the current time interval used by the timer task.
+ *
+ * @return interval the current time interval
+ */
+ public long getInterval() {
+ return -1;
+ }
+
+ /**
+ * Method to send ParticipantStateChange status message to participants on demand.
+ *
+ * @param participantStateChange the ParticipantStateChange message
+ */
+ public void send(final ParticipantStateChange participantStateChange) {
+ topicSinkClient.send(participantStateChange);
+ LOGGER.debug("Sent ParticipantStateChange to Participants - {}", participantStateChange);
+ }
+}
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java
new file mode 100644
index 000000000..a05f4aa20
--- /dev/null
+++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java
@@ -0,0 +1,53 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.common.utils.services.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listener for ParticipantStatus messages sent by participants.
+ */
+public class ParticipantStatusListener extends ScoListener<ParticipantStatus> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusListener.class);
+
+ private final SupervisionHandler supervisionHandler = Registry.get(SupervisionHandler.class.getName());
+
+ /**
+ * Constructs the object.
+ */
+ public ParticipantStatusListener() {
+ super(ParticipantStatus.class);
+ }
+
+ @Override
+ public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
+ final ParticipantStatus participantStatusMessage) {
+ LOGGER.debug("ParticipantStatus message received from participant - {}", participantStatusMessage);
+ supervisionHandler.handleParticipantStatusMessage(participantStatusMessage);
+ }
+}