diff options
Diffstat (limited to 'tosca-controlloop/runtime/src/main/java/org/onap/policy')
18 files changed, 978 insertions, 230 deletions
diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningHandler.java index ab917b74e..88e8b1df9 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningHandler.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningHandler.java @@ -65,26 +65,6 @@ public final class CommissioningHandler extends ControlLoopHandler { } @Override - public void startAndRegisterListeners(MessageTypeDispatcher msgDispatcher) { - // No topic communication on this handler - } - - @Override - public void startAndRegisterPublishers(List<TopicSink> topicSinks) { - // No topic communication on this handler - } - - @Override - public void stopAndUnregisterPublishers() { - // No topic communication on this handler - } - - @Override - public void stopAndUnregisterListeners(MessageTypeDispatcher msgDispatcher) { - // No topic communication on this handler - } - - @Override public void startProviders() { provider = new CommissioningProvider(getDatabaseProviderParameters()); } diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java index 41d85726e..50f6787b9 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java @@ -40,6 +40,7 @@ import org.onap.policy.models.provider.PolicyModelsProviderParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; +import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplates; import org.onap.policy.models.tosca.authorative.concepts.ToscaTypedEntityFilter; /** @@ -190,4 +191,18 @@ public class CommissioningProvider implements Closeable { return controlLoopElementList; } + + /** + * Get the requested control loop definitions. + * + * @param name the name of the definition to get, null for all definitions + * @param version the version of the definition to get, null for all definitions + * @return the control loop definitions + * @throws PfModelException on errors getting control loop definitions + */ + public ToscaServiceTemplate getToscaServiceTemplate(String name, String version) throws PfModelException { + ToscaServiceTemplates serviceTemplates = new ToscaServiceTemplates(); + serviceTemplates.setServiceTemplates(modelsProvider.getServiceTemplateList(name, version)); + return serviceTemplates.getServiceTemplates().get(0); + } } diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/rest/CommissioningController.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/rest/CommissioningController.java index cd6c08e30..18e1f7787 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/rest/CommissioningController.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/rest/CommissioningController.java @@ -44,6 +44,7 @@ import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProv import org.onap.policy.clamp.controlloop.runtime.main.rest.RestController; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.base.PfModelRuntimeException; +import org.onap.policy.models.errors.concepts.ErrorResponse; import org.onap.policy.models.errors.concepts.ErrorResponseInfo; import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; @@ -131,7 +132,9 @@ public class CommissioningController extends RestController { } catch (PfModelRuntimeException | PfModelException e) { LOGGER.warn("Commissioning of the control loops failed", e); - return createCommissioningErrorResponse(e, requestId); + CommissioningResponse resp = new CommissioningResponse(); + resp.setErrorDetails(e.getErrorResponse().getErrorMessage()); + return returnResponse(e.getErrorResponse().getResponseCode(), requestId, resp); } } @@ -201,7 +204,9 @@ public class CommissioningController extends RestController { } catch (PfModelRuntimeException | PfModelException e) { LOGGER.warn("Decommisssioning of control loop failed", e); - return createCommissioningErrorResponse(e, requestId); + CommissioningResponse resp = new CommissioningResponse(); + resp.setErrorDetails(e.getErrorResponse().getErrorMessage()); + return returnResponse(e.getErrorResponse().getResponseCode(), requestId, resp); } } @@ -255,9 +260,9 @@ public class CommissioningController extends RestController { // @formatter:on public Response query(@HeaderParam(REQUEST_ID_NAME) @ApiParam(REQUEST_ID_PARAM_DESCRIPTION) UUID requestId, @ApiParam(value = "Control Loop definition name", required = true) - @QueryParam("name") String name, + @QueryParam("name") String name, @ApiParam(value = "Control Loop definition version", required = true) - @QueryParam("version") String version) { + @QueryParam("version") String version) { try { List<ToscaNodeTemplate> response = provider.getControlLoopDefinitions(name, version); @@ -266,7 +271,9 @@ public class CommissioningController extends RestController { } catch (PfModelRuntimeException | PfModelException e) { LOGGER.warn("Get of control loop definitions failed", e); - return createCommissioningErrorResponse(e, requestId); + CommissioningResponse resp = new CommissioningResponse(); + resp.setErrorDetails(e.getErrorResponse().getErrorMessage()); + return returnResponse(e.getErrorResponse().getResponseCode(), requestId, resp); } } @@ -319,33 +326,35 @@ public class CommissioningController extends RestController { ) // @formatter:on public Response queryElements(@HeaderParam(REQUEST_ID_NAME) @ApiParam(REQUEST_ID_PARAM_DESCRIPTION) UUID requestId, - @ApiParam(value = "Control Loop definition name", required = true) - @QueryParam("name") String name, - @ApiParam(value = "Control Loop definition version", required = true) - @QueryParam("version") String version) throws Exception { + @ApiParam(value = "Control Loop definition name", required = true) + @QueryParam("name") String name, + @ApiParam(value = "Control Loop definition version", required = true) + @QueryParam("version") String version) throws Exception { try { List<ToscaNodeTemplate> nodeTemplate = provider.getControlLoopDefinitions(name, version); //Prevent ambiguous queries with multiple returns if (nodeTemplate.size() > 1) { - throw new Exception(); + CommissioningResponse resp = new CommissioningResponse(); + resp.setErrorDetails("Multiple ControlLoops are not supported"); + return returnResponse(Response.Status.NOT_ACCEPTABLE, requestId, resp); } + List<ToscaNodeTemplate> response = provider.getControlLoopElementDefinitions(nodeTemplate.get(0)); return addLoggingHeaders(addVersionControlHeaders(Response.status(Status.OK)), requestId).entity(response) .build(); } catch (PfModelRuntimeException | PfModelException e) { LOGGER.warn("Get of control loop element definitions failed", e); - return createCommissioningErrorResponse(e, requestId); + CommissioningResponse resp = new CommissioningResponse(); + resp.setErrorDetails(e.getErrorResponse().getErrorMessage()); + return returnResponse(e.getErrorResponse().getResponseCode(), requestId, resp); } } - private Response createCommissioningErrorResponse(ErrorResponseInfo e, UUID requestId) { - CommissioningResponse resp = new CommissioningResponse(); - resp.setErrorDetails(e.getErrorResponse().getErrorMessage()); - return addLoggingHeaders(addVersionControlHeaders(Response.status(e.getErrorResponse().getResponseCode())), + private Response returnResponse(Response.Status status, UUID requestId, CommissioningResponse resp) { + return addLoggingHeaders(addVersionControlHeaders(Response.status(status)), requestId).entity(resp).build(); } - } diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java index 6fd5f3225..eb72d9219 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java @@ -39,6 +39,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider import org.onap.policy.clamp.controlloop.models.messages.rest.instantiation.InstantiationCommand; import org.onap.policy.clamp.controlloop.models.messages.rest.instantiation.InstantiationResponse; import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider; +import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ObjectValidationResult; import org.onap.policy.common.parameters.ValidationResult; @@ -140,19 +141,21 @@ public class ControlLoopInstantiationProvider implements Closeable { */ private BeanValidationResult validateControlLoops(ControlLoops controlLoops) throws PfModelException { - BeanValidationResult validationResult = new BeanValidationResult("ControlLoops", controlLoops); + BeanValidationResult result = new BeanValidationResult("ControlLoops", controlLoops); for (ControlLoop controlLoop : controlLoops.getControlLoopList()) { + BeanValidationResult subResult = new BeanValidationResult( + "entry " + controlLoop.getDefinition().getName(), controlLoop); List<ToscaNodeTemplate> toscaNodeTemplates = commissioningProvider.getControlLoopDefinitions( controlLoop.getDefinition().getName(), controlLoop.getDefinition().getVersion()); if (toscaNodeTemplates.isEmpty()) { - validationResult + subResult .addResult(new ObjectValidationResult("ControlLoop", controlLoop.getDefinition().getName(), ValidationStatus.INVALID, "Commissioned control loop definition not FOUND")); } else if (toscaNodeTemplates.size() > 1) { - validationResult + subResult .addResult(new ObjectValidationResult("ControlLoop", controlLoop.getDefinition().getName(), ValidationStatus.INVALID, "Commissioned control loop definition not VALID")); } else { @@ -167,12 +170,13 @@ public class ControlLoopInstantiationProvider implements Closeable { .collect(Collectors.toMap(ToscaConceptIdentifier::getName, UnaryOperator.identity())); // @formatter:on - for (ControlLoopElement element : controlLoop.getElements()) { - validationResult.addResult(validateDefinition(definitions, element.getDefinition())); + for (ControlLoopElement element : controlLoop.getElements().values()) { + subResult.addResult(validateDefinition(definitions, element.getDefinition())); } } + result.addResult(subResult); } - return validationResult; + return result; } /** @@ -183,15 +187,13 @@ public class ControlLoopInstantiationProvider implements Closeable { * @result result the validation result */ private ValidationResult validateDefinition(Map<String, ToscaConceptIdentifier> definitions, - ToscaConceptIdentifier definition) { - BeanValidationResult result = new BeanValidationResult(definition.getName(), definition); + ToscaConceptIdentifier definition) { + BeanValidationResult result = new BeanValidationResult("entry " + definition.getName(), definition); ToscaConceptIdentifier identifier = definitions.get(definition.getName()); if (identifier == null) { result.setResult(ValidationStatus.INVALID, "Not FOUND"); } else if (!identifier.equals(definition)) { result.setResult(ValidationStatus.INVALID, "Version not matching"); - } else { - result.setResult(ValidationStatus.CLEAN); } return (result.isClean() ? null : result); } @@ -264,6 +266,8 @@ public class ControlLoopInstantiationProvider implements Closeable { controlLoopProvider.updateControlLoops(controlLoops); } + SupervisionHandler supervisionHandler = SupervisionHandler.getInstance(); + supervisionHandler.triggerControlLoopSupervision(command.getControlLoopIdentifierList()); InstantiationResponse response = new InstantiationResponse(); response.setAffectedControlLoops(command.getControlLoopIdentifierList()); diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/InstantiationHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/InstantiationHandler.java index fd5288fda..d81e54ccf 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/InstantiationHandler.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/InstantiationHandler.java @@ -21,13 +21,11 @@ package org.onap.policy.clamp.controlloop.runtime.instantiation; import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Set; import javax.ws.rs.core.Response; import lombok.Getter; import org.onap.policy.clamp.controlloop.common.handler.ControlLoopHandler; -import org.onap.policy.clamp.controlloop.runtime.commissioning.rest.CommissioningController; import org.onap.policy.clamp.controlloop.runtime.instantiation.rest.InstantiationController; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.common.endpoints.event.comm.TopicSink; @@ -36,11 +34,9 @@ import org.onap.policy.common.utils.services.Registry; import org.onap.policy.models.base.PfModelRuntimeException; /** - * This class handles instantiation of control loop instances, - * so only one object of this type should be built at a time. + * This class handles instantiation of control loop instances. * - * </p> - * It is effectively a singleton that is started at system start + * <p/>It is effectively a singleton that is started at system start */ public final class InstantiationHandler extends ControlLoopHandler { @@ -71,26 +67,6 @@ public final class InstantiationHandler extends ControlLoopHandler { } @Override - public void startAndRegisterListeners(MessageTypeDispatcher msgDispatcher) { - // No topic communication on this handler - } - - @Override - public void startAndRegisterPublishers(List<TopicSink> topicSinks) { - // No topic communication on this handler - } - - @Override - public void stopAndUnregisterPublishers() { - // No topic communication on this handler - } - - @Override - public void stopAndUnregisterListeners(MessageTypeDispatcher msgDispatcher) { - // No topic communication on this handler - } - - @Override public void startProviders() { controlLoopInstantiationProvider = new ControlLoopInstantiationProvider(getDatabaseProviderParameters()); } diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/rest/InstantiationController.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/rest/InstantiationController.java index 807da5d68..7581aaf74 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/rest/InstantiationController.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/rest/InstantiationController.java @@ -412,5 +412,5 @@ public class InstantiationController extends RestController { return addLoggingHeaders(addVersionControlHeaders(Response.status(e.getErrorResponse().getResponseCode())), requestId).entity(resp).build(); } - } + diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java index a7f5ff34d..a463ad171 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java @@ -24,7 +24,7 @@ import java.io.File; import javax.ws.rs.core.Response; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException; import org.onap.policy.clamp.controlloop.runtime.main.startstop.ClRuntimeCommandLineArguments; -import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.ValidationResult; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; @@ -66,7 +66,7 @@ public class ClRuntimeParameterHandler { } // validate the parameters - final GroupValidationResult validationResult = clRuntimeParameterGroup.validate(); + final ValidationResult validationResult = clRuntimeParameterGroup.validate(); if (!validationResult.isValid()) { throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, "validation error(s) on parameters from \"" + arguments.getConfigurationFilePath() + "\"\n" + validationResult.getResult()); diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java index f70e7d590..2af5be534 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java @@ -51,3 +51,4 @@ public class ParticipantUpdateParameters extends ParameterGroupImpl { super(ParticipantUpdateParameters.class.getSimpleName()); } } + diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java index 5959586da..a4238a9c4 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java @@ -32,6 +32,7 @@ import org.onap.policy.clamp.controlloop.runtime.instantiation.InstantiationHand import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.clamp.controlloop.runtime.main.rest.ControlLoopAafFilter; import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringHandler; +import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; @@ -65,6 +66,7 @@ public class ClRuntimeActivator extends ServiceManagerContainer { * @param clRuntimeParameterGroup the parameters for the control loop runtime service */ public ClRuntimeActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup) { + if (clRuntimeParameterGroup == null || !clRuntimeParameterGroup.isValid()) { throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, "ParameterGroup not valid"); } @@ -86,28 +88,38 @@ public class ClRuntimeActivator extends ServiceManagerContainer { final AtomicReference<ControlLoopHandler> commissioningHandler = new AtomicReference<>(); final AtomicReference<ControlLoopHandler> instantiationHandler = new AtomicReference<>(); + final AtomicReference<ControlLoopHandler> supervisionHandler = new AtomicReference<>(); final AtomicReference<ControlLoopHandler> monitoringHandler = new AtomicReference<>(); - final AtomicReference<RestServer> restServer = new AtomicReference<>(); + // @formatter:off addAction("Control loop runtime parameters", - () -> ParameterService.register(clRuntimeParameterGroup), - () -> ParameterService.deregister(clRuntimeParameterGroup.getName())); + () -> ParameterService.register(clRuntimeParameterGroup), + () -> ParameterService.deregister(clRuntimeParameterGroup.getName())); + addAction("Topic endpoint management", - () -> TopicEndpointManager.getManager().start(), - () -> TopicEndpointManager.getManager().shutdown()); + () -> TopicEndpointManager.getManager().start(), + () -> TopicEndpointManager.getManager().shutdown()); + addAction("Commissioning Handler", () -> commissioningHandler.set(new CommissioningHandler(clRuntimeParameterGroup)), () -> commissioningHandler.get().close()); + addAction("Instantiation Handler", - () -> instantiationHandler.set(new InstantiationHandler(clRuntimeParameterGroup)), - () -> instantiationHandler.get().close()); + () -> instantiationHandler.set(new InstantiationHandler(clRuntimeParameterGroup)), + () -> instantiationHandler.get().close()); + + addAction("Supervision Handler", + () -> supervisionHandler.set(new SupervisionHandler(clRuntimeParameterGroup)), + () -> supervisionHandler.get().close()); + addAction("Monitoring Handler", () -> monitoringHandler.set(new MonitoringHandler(clRuntimeParameterGroup)), () -> monitoringHandler.get().close()); addHandlerActions("Commissioning", commissioningHandler); addHandlerActions("Instantiation", instantiationHandler); + addHandlerActions("Supervision", supervisionHandler); addHandlerActions("Monitoring", monitoringHandler); addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); @@ -115,34 +127,34 @@ public class ClRuntimeActivator extends ServiceManagerContainer { clRuntimeParameterGroup.getRestServerParameters().setName(clRuntimeParameterGroup.getName()); addAction("REST server", - () -> { - Set<Class<?>> providerClasses = new HashSet<>(); - providerClasses.addAll(commissioningHandler.get().getProviderClasses()); - providerClasses.addAll(instantiationHandler.get().getProviderClasses()); - providerClasses.addAll(monitoringHandler.get().getProviderClasses()); - - RestServer server = new RestServer(clRuntimeParameterGroup.getRestServerParameters(), - ControlLoopAafFilter.class, - providerClasses.toArray(new Class<?>[providerClasses.size()])); - restServer.set(server); - restServer.get().start(); - }, - () -> restServer.get().stop()); + () -> { + Set<Class<?>> providerClasses = new HashSet<>(); + providerClasses.addAll(commissioningHandler.get().getProviderClasses()); + providerClasses.addAll(instantiationHandler.get().getProviderClasses()); + providerClasses.addAll(supervisionHandler.get().getProviderClasses()); + providerClasses.addAll(monitoringHandler.get().getProviderClasses()); + + RestServer server = new RestServer(clRuntimeParameterGroup.getRestServerParameters(), + ControlLoopAafFilter.class, + providerClasses.toArray(new Class<?>[providerClasses.size()])); + + restServer.set(server); + restServer.get().start(); + }, + () -> restServer.get().stop()); // @formatter:on } private void addHandlerActions(final String name, final AtomicReference<ControlLoopHandler> handler) { addAction(name + " Providers", - () -> handler.get().startProviders(), - () -> handler.get().stopProviders()); - + () -> handler.get().startProviders(), + () -> handler.get().stopProviders()); addAction(name + " Listeners", - () -> handler.get().startAndRegisterListeners(msgDispatcher), - () -> handler.get().stopAndUnregisterListeners(msgDispatcher)); - + () -> handler.get().startAndRegisterListeners(msgDispatcher), + () -> handler.get().stopAndUnregisterListeners(msgDispatcher)); addAction(name + " Publishers", - () -> handler.get().startAndRegisterPublishers(topicSinks), - () -> handler.get().stopAndUnregisterPublishers()); + () -> handler.get().startAndRegisterPublishers(topicSinks), + () -> handler.get().stopAndUnregisterPublishers()); } /** diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeCommandLineArguments.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeCommandLineArguments.java index fa25b6ddb..f36bb858b 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeCommandLineArguments.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeCommandLineArguments.java @@ -26,55 +26,58 @@ import java.io.StringWriter; import java.net.URL; import java.util.Arrays; import javax.ws.rs.core.Response; +import lombok.Getter; +import lombok.Setter; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.StringUtils; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; +import org.onap.policy.clamp.controlloop.common.startstop.CommonCommandLineArguments; import org.onap.policy.common.utils.resources.ResourceUtils; - /** * This class reads and handles command line parameters for the control loop runtime service. - * */ public class ClRuntimeCommandLineArguments { private static final String FILE_MESSAGE_PREAMBLE = " file \""; private static final int HELP_LINE_LENGTH = 120; private final Options options; + private final CommonCommandLineArguments commonCommandLineArguments; + + @Getter() + @Setter() private String configurationFilePath = null; /** * Construct the options for the control loop runtime component. */ public ClRuntimeCommandLineArguments() { - //@formatter:off options = new Options(); - options.addOption(Option.builder("h") - .longOpt("help") - .desc("outputs the usage of this command") - .required(false) - .type(Boolean.class) - .build()); - options.addOption(Option.builder("v") - .longOpt("version") - .desc("outputs the version of control loop runtime") - .required(false) - .type(Boolean.class) - .build()); - options.addOption(Option.builder("c") - .longOpt("config-file") - .desc("the full path to the configuration file to use, " - + "the configuration file must be a Json file containing the control loop runtime parameters") - .hasArg() - .argName("CONFIG_FILE") - .required(false) - .type(String.class) - .build()); - //@formatter:on + commonCommandLineArguments = new CommonCommandLineArguments(options); + } + + /** + * Construct the options for the CLI editor and parse in the given arguments. + * + * @param args The command line arguments + */ + public ClRuntimeCommandLineArguments(final String[] args) { + // Set up the options with the default constructor + this(); + + // Parse the arguments + try { + parse(args); + } catch (final ControlLoopException e) { + throw new ControlLoopRuntimeException(Response.Status.NOT_ACCEPTABLE, + "parse error on control loop runtime parameters", e); + } } /** @@ -87,13 +90,12 @@ public class ClRuntimeCommandLineArguments { public String parse(final String[] args) throws ControlLoopException { // Clear all our arguments setConfigurationFilePath(null); - CommandLine commandLine = null; try { commandLine = new DefaultParser().parse(options, args); } catch (final ParseException e) { throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, - "invalid command line arguments specified", e); + "invalid command line arguments specified : " + e.getMessage()); } // Arguments left over after Commons CLI does its stuff @@ -104,16 +106,12 @@ public class ClRuntimeCommandLineArguments { "too many command line arguments specified : " + Arrays.toString(args)); } - if (remainingArgs.length == 1) { - configurationFilePath = remainingArgs[0]; - } - if (commandLine.hasOption('h')) { - return help(Main.class.getName()); + return commonCommandLineArguments.help(Main.class.getName(), options); } if (commandLine.hasOption('v')) { - return version(); + return commonCommandLineArguments.version(); } if (commandLine.hasOption('c')) { @@ -129,42 +127,7 @@ public class ClRuntimeCommandLineArguments { * @throws ControlLoopException on command argument validation errors */ public void validate() throws ControlLoopException { - validateReadableFile("control loop runtime configuration", configurationFilePath); - } - - /** - * Print version information for control loop runtime. - * - * @return the version string - */ - public String version() { - return ResourceUtils.getResourceAsString("version.txt"); - } - - /** - * Print help information for control loop runtime. - * - * @param mainClassName the main class name - * @return the help string - */ - public String help(final String mainClassName) { - final HelpFormatter helpFormatter = new HelpFormatter(); - final StringWriter stringWriter = new StringWriter(); - final PrintWriter printWriter = new PrintWriter(stringWriter); - - helpFormatter.printHelp(printWriter, HELP_LINE_LENGTH, mainClassName + " [options...]", "options", options, 0, - 0, ""); - - return stringWriter.toString(); - } - - /** - * Gets the configuration file path. - * - * @return the configuration file path - */ - public String getConfigurationFilePath() { - return configurationFilePath; + commonCommandLineArguments.validate(configurationFilePath); } /** @@ -185,39 +148,4 @@ public class ClRuntimeCommandLineArguments { this.configurationFilePath = configurationFilePath; } - - /** - * Validate readable file. - * - * @param fileTag the file tag - * @param fileName the file name - * @throws ControlLoopException on the file name passed as a parameter - */ - private void validateReadableFile(final String fileTag, final String fileName) throws ControlLoopException { - if (fileName == null || fileName.length() == 0) { - throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, - fileTag + " file was not specified as an argument"); - } - - // The file name refers to a resource on the local file system - final URL fileUrl = ResourceUtils.getUrl4Resource(fileName); - if (fileUrl == null) { - throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, - fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" does not exist"); - } - - final File theFile = new File(fileUrl.getPath()); - if (!theFile.exists()) { - throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, - fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" does not exist"); - } - if (!theFile.isFile()) { - throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, - fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is not a normal file"); - } - if (!theFile.canRead()) { - throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, - fileTag + FILE_MESSAGE_PREAMBLE + fileName + "\" is ureadable"); - } - } } diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringHandler.java index 04f458e7d..a7ad9180a 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringHandler.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringHandler.java @@ -69,26 +69,6 @@ public class MonitoringHandler extends ControlLoopHandler { } @Override - public void startAndRegisterListeners(MessageTypeDispatcher msgDispatcher) { - // No topic communication on this handler - } - - @Override - public void startAndRegisterPublishers(List<TopicSink> topicSinks) { - // No topic communication on this handler - } - - @Override - public void stopAndUnregisterPublishers() { - // No topic communication on this handler - } - - @Override - public void stopAndUnregisterListeners(MessageTypeDispatcher msgDispatcher) { - // No topic communication on this handler - } - - @Override public void startProviders() { monitoringProvider = new MonitoringProvider(getDatabaseProviderParameters()); } diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java index e46e66501..193f8d557 100644 --- a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java @@ -206,7 +206,7 @@ public class MonitoringProvider implements Closeable { ControlLoop controlLoop = controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(name, version)); if (controlLoop != null) { - clElements.addAll(controlLoop.getElements()); + clElements.addAll(controlLoop.getElements().values()); //Collect control loop element statistics for each cl element. for (ControlLoopElement clElement : clElements) { clElementStats.addAll(fetchFilteredClElementStatistics(clElement.getParticipantId().getName(), @@ -235,7 +235,7 @@ public class MonitoringProvider implements Closeable { List<ToscaConceptIdentifier> participantIds = new ArrayList<>(); ControlLoop controlLoop = controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(name, version)); if (controlLoop != null) { - for (ControlLoopElement clElement : controlLoop.getElements()) { + for (ControlLoopElement clElement : controlLoop.getElements().values()) { participantIds.add(clElement.getParticipantId()); } } @@ -256,7 +256,7 @@ public class MonitoringProvider implements Closeable { Map<String, ToscaConceptIdentifier> clElementId = new HashMap<>(); ControlLoop controlLoop = controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(name, version)); if (controlLoop != null) { - for (ControlLoopElement clElement : controlLoop.getElements()) { + for (ControlLoopElement clElement : controlLoop.getElements().values()) { clElementId.put(clElement.getId().toString(), clElement.getParticipantId()); } } diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java new file mode 100644 index 000000000..63bff00fc --- /dev/null +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java @@ -0,0 +1,450 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.commons.collections4.CollectionUtils; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; +import org.onap.policy.clamp.controlloop.common.handler.ControlLoopHandler; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; +import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; +import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningHandler; +import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider; +import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; +import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringHandler; +import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopStateChangePublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopUpdatePublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStateChangePublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; +import org.onap.policy.common.utils.services.Registry; +import org.onap.policy.common.utils.services.ServiceManager; +import org.onap.policy.common.utils.services.ServiceManagerException; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.base.PfModelRuntimeException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class handles supervision of control loop instances, so only one object of this type should be built at a time. + * + * <p/> It is effectively a singleton that is started at system start. + */ +public class SupervisionHandler extends ControlLoopHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class); + + private static final String CONTROL_LOOP_CANNOT_TRANSITION_FROM_STATE = "Control loop can't transition from state "; + private static final String CONTROL_LOOP_IS_ALREADY_IN_STATE = "Control loop is already in state "; + private static final String TO_STATE = " to state "; + private static final String AND_TRANSITIONING_TO_STATE = " and transitioning to state "; + + private ControlLoopProvider controlLoopProvider; + private ParticipantProvider participantProvider; + private CommissioningProvider commissioningProvider; + private MonitoringProvider monitoringProvider; + + // Publishers for participant communication + private ParticipantStateChangePublisher stateChangePublisher; + private ParticipantControlLoopUpdatePublisher controlLoopUpdatePublisher; + private ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher; + + // Database scanner + private SupervisionScanner scanner; + + /** + * Used to manage the services. + */ + private ServiceManager manager; + private ServiceManager publisherManager; + + /** + * Gets the SupervisionHandler. + * + * @return SupervisionHandler + */ + public static SupervisionHandler getInstance() { + return Registry.get(SupervisionHandler.class.getName()); + } + + /** + * Create a handler. + * + * @param clRuntimeParameterGroup the parameters for the control loop runtime + */ + public SupervisionHandler(ClRuntimeParameterGroup clRuntimeParameterGroup) { + super(clRuntimeParameterGroup.getDatabaseProviderParameters()); + // @formatter:off + this.manager = new ServiceManager() + .addAction("ControlLoop Provider", + () -> controlLoopProvider = new ControlLoopProvider(getDatabaseProviderParameters()), + () -> controlLoopProvider = null) + .addAction("Participant Provider", + () -> participantProvider = new ParticipantProvider(getDatabaseProviderParameters()), + () -> participantProvider = null); + // @formatter:on + } + + /** + * Supervision trigger called when a command is issued on control loops. + * + * </p> Causes supervision to start or continue supervision on the control loops in question. + * + * @param controlLoopIdentifierList the control loops for which the supervision command has been issued + * @throws ControlLoopException on supervision triggering exceptions + */ + public void triggerControlLoopSupervision(List<ToscaConceptIdentifier> controlLoopIdentifierList) + throws ControlLoopException { + + LOGGER.debug("triggering control loop supervision on control loops {}", controlLoopIdentifierList); + + if (CollectionUtils.isEmpty(controlLoopIdentifierList)) { + // This is just to force throwing of the exception in certain circumstances. + exceptionOccured(Response.Status.NOT_ACCEPTABLE, + "The list of control loops for supervision is empty"); + } + + for (ToscaConceptIdentifier controlLoopId : controlLoopIdentifierList) { + try { + ControlLoop controlLoop = controlLoopProvider.getControlLoop(controlLoopId); + + superviseControlLoop(controlLoop); + + controlLoopProvider.updateControlLoop(controlLoop); + } catch (PfModelException pfme) { + throw new ControlLoopException(pfme.getErrorResponse().getResponseCode(), pfme.getMessage(), pfme); + } + } + } + + @Override + public void startAndRegisterListeners(MessageTypeDispatcher msgDispatcher) { + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS.name(), new ParticipantStatusListener()); + } + + @Override + public void startAndRegisterPublishers(List<TopicSink> topicSinks) { + // TODO: Use a parameter for the timeout + // @formatter:off + this.publisherManager = new ServiceManager() + .addAction("Supervision scanner", + () -> scanner = new SupervisionScanner(controlLoopProvider, 10000), + () -> scanner = null) + .addAction("ControlLoopUpdate publisher", + () -> controlLoopUpdatePublisher = new ParticipantControlLoopUpdatePublisher(topicSinks, -1), + () -> controlLoopUpdatePublisher.terminate()) + .addAction("StateChange Publisher", + () -> stateChangePublisher = new ParticipantStateChangePublisher(topicSinks, 10000), + () -> stateChangePublisher.terminate()) + .addAction("ControlLoopStateChange Publisher", + () -> controlLoopStateChangePublisher = + new ParticipantControlLoopStateChangePublisher(topicSinks, -1), + () -> controlLoopStateChangePublisher.terminate()); + // @formatter:on + try { + publisherManager.start(); + } catch (final ServiceManagerException exp) { + throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, + "Supervision handler start of publishers or scanner failed", exp); + } + } + + @Override + public void stopAndUnregisterPublishers() { + try { + publisherManager.stop(); + } catch (final ServiceManagerException exp) { + throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, + "Supervision handler stop of publishers or scanner failed", exp); + } + } + + @Override + public void stopAndUnregisterListeners(MessageTypeDispatcher msgDispatcher) { + msgDispatcher.unregister(ParticipantMessageType.PARTICIPANT_STATUS.name()); + } + + /** + * Handle a ParticipantStatus message from a participant. + * + * @param participantStatusMessage the ParticipantStatus message received from a participant + */ + public void handleParticipantStatusMessage(ParticipantStatus participantStatusMessage) { + LOGGER.debug("Participant Status received {}", participantStatusMessage); + + try { + superviseParticipant(participantStatusMessage); + } catch (PfModelException | ControlLoopException svExc) { + LOGGER.warn("error supervising participant {}", participantStatusMessage.getParticipantId(), svExc); + return; + } + + try { + superviseControlLoops(participantStatusMessage); + } catch (PfModelException | ControlLoopException svExc) { + LOGGER.warn("error supervising participant {}", participantStatusMessage.getParticipantId(), svExc); + } + } + + /** + * Supervise a control loop, performing whatever actions need to be performed on the control loop. + * + * @param controlLoop the control loop to supervises + * @throws ControlLoopException on supervision errors + */ + private void superviseControlLoop(ControlLoop controlLoop) throws ControlLoopException, PfModelException { + switch (controlLoop.getOrderedState()) { + case UNINITIALISED: + superviseControlLoopUninitialization(controlLoop); + break; + + case PASSIVE: + superviseControlLoopPassivation(controlLoop); + break; + + case RUNNING: + superviseControlLoopActivation(controlLoop); + break; + + default: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, + "A control loop cannot be commanded to go into state " + controlLoop.getOrderedState().name()); + } + } + + /** + * Supervise a control loop uninitialisation, performing whatever actions need to be performed on the control loop, + * control loop ordered state is UNINITIALIZED. + * + * @param controlLoop the control loop to supervises + * @throws ControlLoopException on supervision errors + */ + private void superviseControlLoopUninitialization(ControlLoop controlLoop) throws ControlLoopException { + switch (controlLoop.getState()) { + case UNINITIALISED: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, + CONTROL_LOOP_IS_ALREADY_IN_STATE + controlLoop.getState().name()); + break; + + case UNINITIALISED2PASSIVE: + case PASSIVE: + controlLoop.setState(ControlLoopState.PASSIVE2UNINITIALISED); + sendControlLoopStateChange(controlLoop); + break; + + case PASSIVE2UNINITIALISED: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_IS_ALREADY_IN_STATE + + controlLoop.getState().name() + AND_TRANSITIONING_TO_STATE + controlLoop.getOrderedState()); + break; + + default: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_CANNOT_TRANSITION_FROM_STATE + + controlLoop.getState().name() + TO_STATE + controlLoop.getOrderedState()); + break; + } + } + + private void superviseControlLoopPassivation(ControlLoop controlLoop) + throws ControlLoopException, PfModelException { + switch (controlLoop.getState()) { + case PASSIVE: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, + CONTROL_LOOP_IS_ALREADY_IN_STATE + controlLoop.getState().name()); + break; + case UNINITIALISED: + controlLoop.setState(ControlLoopState.UNINITIALISED2PASSIVE); + sendControlLoopUpdate(controlLoop); + break; + + case UNINITIALISED2PASSIVE: + case RUNNING2PASSIVE: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_IS_ALREADY_IN_STATE + + controlLoop.getState().name() + AND_TRANSITIONING_TO_STATE + controlLoop.getOrderedState()); + break; + + case RUNNING: + controlLoop.setState(ControlLoopState.RUNNING2PASSIVE); + sendControlLoopStateChange(controlLoop); + break; + + default: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_CANNOT_TRANSITION_FROM_STATE + + controlLoop.getState().name() + TO_STATE + controlLoop.getOrderedState()); + break; + } + } + + private void superviseControlLoopActivation(ControlLoop controlLoop) throws ControlLoopException { + switch (controlLoop.getState()) { + case RUNNING: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, + CONTROL_LOOP_IS_ALREADY_IN_STATE + controlLoop.getState().name()); + break; + + case PASSIVE2RUNNING: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_IS_ALREADY_IN_STATE + + controlLoop.getState().name() + AND_TRANSITIONING_TO_STATE + controlLoop.getOrderedState()); + break; + + case PASSIVE: + controlLoop.setState(ControlLoopState.PASSIVE2RUNNING); + sendControlLoopStateChange(controlLoop); + break; + + default: + exceptionOccured(Response.Status.NOT_ACCEPTABLE, CONTROL_LOOP_CANNOT_TRANSITION_FROM_STATE + + controlLoop.getState().name() + TO_STATE + controlLoop.getOrderedState()); + break; + } + } + + private void sendControlLoopUpdate(ControlLoop controlLoop) throws PfModelException { + ParticipantControlLoopUpdate pclu = new ParticipantControlLoopUpdate(); + pclu.setControlLoopId(controlLoop.getKey().asIdentifier()); + pclu.setControlLoop(controlLoop); + // TODO: We should look up the correct TOSCA node template here for the control loop + // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap + commissioningProvider = CommissioningHandler.getInstance().getProvider(); + pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null)); + controlLoopUpdatePublisher.send(pclu); + } + + private void sendControlLoopStateChange(ControlLoop controlLoop) { + ParticipantControlLoopStateChange clsc = new ParticipantControlLoopStateChange(); + clsc.setControlLoopId(controlLoop.getKey().asIdentifier()); + clsc.setMessageId(UUID.randomUUID()); + clsc.setOrderedState(controlLoop.getOrderedState()); + + controlLoopStateChangePublisher.send(clsc); + } + + private void superviseParticipant(ParticipantStatus participantStatusMessage) + throws PfModelException, ControlLoopException { + if (participantStatusMessage.getParticipantId() == null) { + exceptionOccured(Response.Status.NOT_FOUND, + "Participant ID on PARTICIPANT_STATUS message is null"); + } + + List<Participant> participantList = + participantProvider.getParticipants(participantStatusMessage.getParticipantId().getName(), + participantStatusMessage.getParticipantId().getVersion()); + + if (CollectionUtils.isEmpty(participantList)) { + Participant participant = new Participant(); + participant.setName(participantStatusMessage.getParticipantId().getName()); + participant.setVersion(participantStatusMessage.getParticipantId().getVersion()); + participant.setDefinition(new ToscaConceptIdentifier("unknown", "0.0.0")); + participant.setParticipantState(participantStatusMessage.getState()); + participant.setHealthStatus(participantStatusMessage.getHealthStatus()); + + participantList.add(participant); + participantProvider.createParticipants(participantList); + } else { + for (Participant participant : participantList) { + participant.setParticipantState(participantStatusMessage.getState()); + participant.setHealthStatus(participantStatusMessage.getHealthStatus()); + } + participantProvider.updateParticipants(participantList); + } + + monitoringProvider = MonitoringHandler.getInstance().getMonitoringProvider(); + monitoringProvider.createParticipantStatistics( + List.of(participantStatusMessage.getParticipantStatistics())); + } + + private void superviseControlLoops(ParticipantStatus participantStatusMessage) + throws PfModelException, ControlLoopException { + if (CollectionUtils.isEmpty(participantStatusMessage.getControlLoops().getControlLoopList())) { + return; + } + + for (ControlLoop controlLoop : participantStatusMessage.getControlLoops().getControlLoopList()) { + if (controlLoop == null) { + exceptionOccured(Response.Status.NOT_FOUND, + "PARTICIPANT_STATUS message references unknown control loop: " + controlLoop); + } + + ControlLoop dbControlLoop = controlLoopProvider + .getControlLoop(new ToscaConceptIdentifier(controlLoop.getName(), controlLoop.getVersion())); + if (dbControlLoop == null) { + exceptionOccured(Response.Status.NOT_FOUND, + "PARTICIPANT_STATUS control loop not found in database: " + controlLoop); + } + + for (ControlLoopElement element : controlLoop.getElements().values()) { + ControlLoopElement dbElement = dbControlLoop.getElements().get(element.getId()); + + if (dbElement == null) { + exceptionOccured(Response.Status.NOT_FOUND, + "PARTICIPANT_STATUS message references unknown control loop element: " + element); + } + + // Replace element entry in the database + dbControlLoop.getElements().put(element.getId(), element); + } + controlLoopProvider.updateControlLoop(dbControlLoop); + } + + monitoringProvider = MonitoringHandler.getInstance().getMonitoringProvider(); + for (ControlLoop controlLoop : participantStatusMessage.getControlLoops().getControlLoopList()) { + monitoringProvider.createClElementStatistics(controlLoop.getControlLoopElementStatisticsList(controlLoop)); + } + } + + @Override + public void startProviders() { + try { + manager.start(); + } catch (final ServiceManagerException exp) { + throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, + "Supervision handler start of providers failed", exp); + } + } + + @Override + public void stopProviders() { + try { + manager.stop(); + } catch (final ServiceManagerException exp) { + throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, + "Supervision handler stop of providers failed", exp); + } + } + + private void exceptionOccured(Response.Status status, String reason) throws ControlLoopException { + throw new ControlLoopException(status, reason); + } +} diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java new file mode 100644 index 000000000..0ccfddff3 --- /dev/null +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java @@ -0,0 +1,116 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision; + +import java.io.Closeable; +import java.util.Collection; +import java.util.List; +import java.util.TimerTask; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; +import org.onap.policy.models.base.PfModelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to scan the control loops in the database and check if they are in the correct state. + */ +public class SupervisionScanner implements Runnable, Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class); + + private ControlLoopProvider controlLoopProvider; + private ScheduledExecutorService timerPool; + + /** + * Constructor for instantiating SupervisionScanner. + * + * @param controlLoopProvider the provider to use to read control loops from the database + * @param interval time interval to perform scans + */ + public SupervisionScanner(final ControlLoopProvider controlLoopProvider, final long interval) { + this.controlLoopProvider = controlLoopProvider; + + // Kick off the timer + timerPool = makeTimerPool(); + timerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.SECONDS); + } + + @Override + public void run() { + LOGGER.debug("Scanning control loops in the database . . ."); + + try { + for (ControlLoop controlLoop : controlLoopProvider.getControlLoops(null, null)) { + scanControlLoop(controlLoop); + } + } catch (PfModelException pfme) { + LOGGER.warn("error reading control loops from database", pfme); + } + + LOGGER.debug("Control loop scan complete . . ."); + } + + @Override + public void close() { + timerPool.shutdown(); + } + + private void scanControlLoop(final ControlLoop controlLoop) throws PfModelException { + LOGGER.debug("scanning control loop {} . . .", controlLoop.getKey().asIdentifier()); + + if (controlLoop.getState().equals(controlLoop.getOrderedState().asState())) { + LOGGER.debug("control loop {} scanned, OK", controlLoop.getKey().asIdentifier()); + return; + } + + for (ControlLoopElement element : controlLoop.getElements().values()) { + if (!element.getState().equals(element.getOrderedState().asState())) { + LOGGER.debug("control loop scan: transitioning from state {} to {}", controlLoop.getState(), + controlLoop.getOrderedState()); + return; + } + } + + LOGGER.debug("control loop scan: transition from state {} to {} completed", controlLoop.getState(), + controlLoop.getOrderedState()); + + controlLoop.setState(controlLoop.getOrderedState().asState()); + controlLoopProvider.updateControlLoop(controlLoop); + } + + /** + * Makes a new timer pool. + * + * @return a new timer pool + */ + protected ScheduledExecutorService makeTimerPool() { + return Executors.newScheduledThreadPool(1); + } +} diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java new file mode 100644 index 000000000..c9c8ab851 --- /dev/null +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java @@ -0,0 +1,75 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import java.util.List; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to send ParticipantControlLoopStateChangePublisher messages to participants on DMaaP. + */ +public class ParticipantControlLoopStateChangePublisher { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantControlLoopStateChangePublisher.class); + + private TopicSinkClient topicSinkClient; + + /** + * Constructor for instantiating ParticipantControlLoopStateChangePublisherPublisher. + * + * @param topicSinks the topic sinks + * @param interval time interval to send ParticipantControlLoopStateChangePublisher messages + */ + public ParticipantControlLoopStateChangePublisher(final List<TopicSink> topicSinks, final long interval) { + // TODO: Should not be dependent on the order of topic sinks in the config + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + } + + /** + * Terminates the current timer. + */ + public void terminate() { + // This is a user initiated message and doesn't need a timer. + } + + /** + * Get the current time interval used by the timer task. + * + * @return interval the current time interval + */ + public long getInterval() { + // This is a user initiated message and doesn't need a timer. + return -1; + } + + /** + * Method to send ParticipantControlLoopStateChangePublisher status message to participants on demand. + * + * @param controlLoopStateChange the ParticipantControlLoopStateChangePublisher message + */ + public void send(final ParticipantControlLoopStateChange controlLoopStateChange) { + topicSinkClient.send(controlLoopStateChange); + LOGGER.debug("Sent ParticipantControlLoopStateChange to Participants - {}", controlLoopStateChange); + } +} diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java new file mode 100644 index 000000000..3c5d230c5 --- /dev/null +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java @@ -0,0 +1,75 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import java.util.List; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to send ParticipantControlLoopUpdate messages to participants on DMaaP. + */ +public class ParticipantControlLoopUpdatePublisher { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantControlLoopUpdatePublisher.class); + + private TopicSinkClient topicSinkClient; + + /** + * Constructor for instantiating ParticipantUpdatePublisher. + * + * @param topicSinks the topic sinks + * @param interval time interval to send ParticipantControlLoopUpdate messages + */ + public ParticipantControlLoopUpdatePublisher(final List<TopicSink> topicSinks, final long interval) { + // TODO: Should not be dependent on the order of topic sinks in the config + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + } + + /** + * Terminates the current timer. + */ + public void terminate() { + // This is a user initiated message and doesn't need a timer. + } + + /** + * Get the current time interval used by the timer task. + * + * @return interval the current time interval + */ + public long getInterval() { + // This is a user initiated message and doesn't need a timer. + return -1; + } + + /** + * Method to send ParticipantControlLoopUpdate status message to participants on demand. + * + * @param participantControlLoopUpdate the ParticipantControlLoopUpdate message + */ + public void send(final ParticipantControlLoopUpdate participantControlLoopUpdate) { + topicSinkClient.send(participantControlLoopUpdate); + LOGGER.debug("Sent ParticipantControlLoopUpdate to Participants - {}", participantControlLoopUpdate); + } +} diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java new file mode 100644 index 000000000..099039115 --- /dev/null +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import java.util.List; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to send ParticipantStateChange messages to participants on DMaaP. + */ +public class ParticipantStateChangePublisher { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStateChangePublisher.class); + + private TopicSinkClient topicSinkClient; + + /** + * Constructor for instantiating ParticipantStateChangePublisher. + * + * @param topicSinks the topic sinks + * @param interval time interval to send ParticipantStateChange messages + */ + public ParticipantStateChangePublisher(final List<TopicSink> topicSinks, final long interval) { + // TODO: Should not be dependent on the order of topic sinks in the config + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + } + + /** + * Terminates the current timer. + */ + public void terminate() { + // Nothing to terminate, this publisher does not have a timer + } + + /** + * Get the current time interval used by the timer task. + * + * @return interval the current time interval + */ + public long getInterval() { + return -1; + } + + /** + * Method to send ParticipantStateChange status message to participants on demand. + * + * @param participantStateChange the ParticipantStateChange message + */ + public void send(final ParticipantStateChange participantStateChange) { + topicSinkClient.send(participantStateChange); + LOGGER.debug("Sent ParticipantStateChange to Participants - {}", participantStateChange); + } +} diff --git a/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java new file mode 100644 index 000000000..a05f4aa20 --- /dev/null +++ b/tosca-controlloop/runtime/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java @@ -0,0 +1,53 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.common.utils.services.Registry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listener for ParticipantStatus messages sent by participants. + */ +public class ParticipantStatusListener extends ScoListener<ParticipantStatus> { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusListener.class); + + private final SupervisionHandler supervisionHandler = Registry.get(SupervisionHandler.class.getName()); + + /** + * Constructs the object. + */ + public ParticipantStatusListener() { + super(ParticipantStatus.class); + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantStatus participantStatusMessage) { + LOGGER.debug("ParticipantStatus message received from participant - {}", participantStatusMessage); + supervisionHandler.handleParticipantStatusMessage(participantStatusMessage); + } +} |