diff options
Diffstat (limited to 'runtime-controlloop/src/main/java')
28 files changed, 742 insertions, 466 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java index a2b6f62d4..28814b354 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/Application.java @@ -22,8 +22,13 @@ package org.onap.policy.clamp.controlloop.runtime; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; +import org.springframework.context.annotation.ComponentScan; @SpringBootApplication +@ComponentScan({"org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider", + "org.onap.policy.clamp.controlloop.runtime"}) +@ConfigurationPropertiesScan("org.onap.policy.clamp.controlloop.runtime.main.parameters") public class Application { public static void main(String[] args) { diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java index f291c4e89..d9dee50bc 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/commissioning/CommissioningProvider.java @@ -25,8 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.fasterxml.jackson.module.jsonSchema.JsonSchema; import com.fasterxml.jackson.module.jsonSchema.factories.SchemaFactoryWrapper; -import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -36,11 +34,8 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; import org.onap.policy.clamp.controlloop.models.messages.rest.commissioning.CommissioningResponse; -import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.models.base.PfModelException; -import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.provider.PolicyModelsProvider; -import org.onap.policy.models.provider.PolicyModelsProviderFactory; import org.onap.policy.models.tosca.authorative.concepts.ToscaCapabilityType; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.onap.policy.models.tosca.authorative.concepts.ToscaDataType; @@ -59,7 +54,7 @@ import org.springframework.stereotype.Component; * the callers. */ @Component -public class CommissioningProvider implements Closeable { +public class CommissioningProvider { public static final String CONTROL_LOOP_NODE_TYPE = "org.onap.policy.clamp.controlloop.ControlLoop"; private final PolicyModelsProvider modelsProvider; @@ -70,32 +65,12 @@ public class CommissioningProvider implements Closeable { /** * Create a commissioning provider. * - * @param controlLoopParameters the parameters for access to the database - * @throws PfModelRuntimeException on errors creating the database provider + * @param modelsProvider the PolicyModelsProvider + * @param clProvider the ControlLoopProvider */ - public CommissioningProvider(ClRuntimeParameterGroup controlLoopParameters) { - try { - modelsProvider = new PolicyModelsProviderFactory() - .createPolicyModelsProvider(controlLoopParameters.getDatabaseProviderParameters()); - } catch (PfModelException e) { - throw new PfModelRuntimeException(e); - } - - try { - clProvider = new ControlLoopProvider(controlLoopParameters.getDatabaseProviderParameters()); - } catch (PfModelException e) { - throw new PfModelRuntimeException(e); - } - } - - @Override - public void close() throws IOException { - try { - modelsProvider.close(); - clProvider.close(); - } catch (PfModelException e) { - throw new IOException("error closing modelsProvider", e); - } + public CommissioningProvider(PolicyModelsProvider modelsProvider, ControlLoopProvider clProvider) { + this.modelsProvider = modelsProvider; + this.clProvider = clProvider; } /** @@ -252,7 +227,8 @@ public class CommissioningProvider implements Closeable { mapper.acceptJsonFormatVisitor(mapper.constructType(ToscaTopologyTemplate.class), visitor); break; case "node_templates": - mapper.acceptJsonFormatVisitor(mapper.constructType(ToscaNodeTemplate.class), visitor); + mapper.acceptJsonFormatVisitor(mapper.getTypeFactory() + .constructCollectionType(List.class, ToscaNodeTemplate.class), visitor); break; default: mapper.acceptJsonFormatVisitor(mapper.constructType(ToscaServiceTemplate.class), visitor); diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/ActivatorConfig.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/PolicyModelConfig.java index 1d6b92e77..8a151d886 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/ActivatorConfig.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/PolicyModelConfig.java @@ -21,26 +21,25 @@ package org.onap.policy.clamp.controlloop.runtime.config; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; -import org.onap.policy.clamp.controlloop.runtime.main.startstop.ClRuntimeActivator; -import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.provider.PolicyModelsProvider; +import org.onap.policy.models.provider.PolicyModelsProviderFactory; +import org.onap.policy.models.provider.PolicyModelsProviderParameters; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration -public class ActivatorConfig { +public class PolicyModelConfig { - /** - * Create and start ClRuntimeActivator. - * - * @param clRuntimeParameterGroup the parameters for the control loop runtime service - * @param supervisionHandler the SupervisionHandler - * @return ClRuntimeActivator - */ @Bean - public ClRuntimeActivator clRuntimeActivator(ClRuntimeParameterGroup clRuntimeParameterGroup, - SupervisionHandler supervisionHandler) { - var clRuntimeActivator = new ClRuntimeActivator(clRuntimeParameterGroup, supervisionHandler); - clRuntimeActivator.start(); - return clRuntimeActivator; + public PolicyModelsProviderParameters policyModelsProviderParameters( + ClRuntimeParameterGroup clRuntimeParameterGroup) { + return clRuntimeParameterGroup.getDatabaseProviderParameters(); + } + + @Bean + public PolicyModelsProvider policyModelsProvider(PolicyModelsProviderParameters policyModelsProviderParameters) + throws PfModelException { + return new PolicyModelsProviderFactory().createPolicyModelsProvider(policyModelsProviderParameters); } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/PropertiesConfig.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/Listener.java index 04bd35da3..b67ddf2a7 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/PropertiesConfig.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/Listener.java @@ -18,21 +18,23 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.clamp.controlloop.runtime.config; +package org.onap.policy.clamp.controlloop.runtime.config.messaging; -import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException; -import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; -import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterHandler; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; +import org.onap.policy.common.endpoints.listeners.ScoListener; -@Configuration -public class PropertiesConfig { +public interface Listener { - @Bean - public ClRuntimeParameterGroup clRuntimeParameterGroup(@Value("${runtime.file}") String file) - throws ControlLoopException { - return new ClRuntimeParameterHandler().getParameters(file); - } + /** + * Get the type of message of interest to the listener. + * + * @return type of message of interest to the listener + */ + String getType(); + + /** + * Get listener to register. + * + * @return listener to register + */ + <T> ScoListener<T> getScoListener(); } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java index 323f76178..891dab9ae 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/startstop/ClRuntimeActivator.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/MessageDispatcherActivator.java @@ -18,53 +18,47 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.clamp.controlloop.runtime.main.startstop; +package org.onap.policy.clamp.controlloop.runtime.config.messaging; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.stream.Stream; import javax.ws.rs.core.Response.Status; import lombok.Getter; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; -import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; -/** - * This class activates the control loop runtime component as a complete service together with all its controllers, - * listeners & handlers. - */ -public class ClRuntimeActivator extends ServiceManagerContainer implements Closeable { - // Name of the message type for messages on topics - private static final String[] MSG_TYPE_NAMES = {"messageType"}; +@Component +public class MessageDispatcherActivator extends ServiceManagerContainer implements Closeable { - @Getter - private final ClRuntimeParameterGroup parameterGroup; + private static final String[] MSG_TYPE_NAMES = {"messageType"}; // Topics from which the application receives and to which the application sends messages private List<TopicSink> topicSinks; private List<TopicSource> topicSources; - /** - * Listens for messages on the topic, decodes them into a message, and then dispatches them. - */ + @Getter private final MessageTypeDispatcher msgDispatcher; /** - * Instantiate the activator for the control loop runtime as a complete service. + * Constructor. * * @param clRuntimeParameterGroup the parameters for the control loop runtime service - * @param supervisionHandler SupervisionHandler + * @param publishers array of Publishers + * @param listeners array of Listeners * @throws ControlLoopRuntimeException if the activator does not start */ - public ClRuntimeActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, - SupervisionHandler supervisionHandler) { - this.parameterGroup = clRuntimeParameterGroup; - + public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, Publisher[] publishers, + Listener[] listeners) { topicSinks = TopicEndpointManager.getManager() .addTopicSinks(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSinks()); @@ -80,15 +74,18 @@ public class ClRuntimeActivator extends ServiceManagerContainer implements Close // @formatter:off addAction("Topic endpoint management", - () -> TopicEndpointManager.getManager().start(), - () -> TopicEndpointManager.getManager().shutdown()); + () -> TopicEndpointManager.getManager().start(), + () -> TopicEndpointManager.getManager().shutdown()); - addAction("Supervision Providers", () -> supervisionHandler.startProviders(), - () -> supervisionHandler.stopProviders()); - addAction("Supervision Listeners", () -> supervisionHandler.startAndRegisterListeners(msgDispatcher), - () -> supervisionHandler.stopAndUnregisterListeners(msgDispatcher)); - addAction("Supervision Publishers", () -> supervisionHandler.startAndRegisterPublishers(topicSinks), - () -> supervisionHandler.stopAndUnregisterPublishers()); + Stream.of(publishers).forEach(publisher -> + addAction("Publisher " + publisher.getClass().getSimpleName(), + () -> publisher.active(topicSinks), + () -> publisher.stop())); + + Stream.of(listeners).forEach(listener -> + addAction("Listener " + listener.getClass().getSimpleName(), + () -> msgDispatcher.register(listener.getType(), listener.getScoListener()), + () -> msgDispatcher.unregister(listener.getType()))); addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); // @formatter:on @@ -112,6 +109,18 @@ public class ClRuntimeActivator extends ServiceManagerContainer implements Close } } + /** + * Start Manager after the application is Started. + * + * @param cre Refreshed Event + */ + @EventListener + public void handleContextStart(ContextRefreshedEvent cre) { + if (!isAlive()) { + start(); + } + } + @Override public void close() throws IOException { if (isAlive()) { diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/Publisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/Publisher.java new file mode 100644 index 000000000..3cd4dff85 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/config/messaging/Publisher.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.config.messaging; + +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.TopicSink; + +/** + * Publisher. + */ +public interface Publisher { + + void active(List<TopicSink> topicSinks); + + void stop(); +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java index c01a0b989..1011f620c 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/instantiation/ControlLoopInstantiationProvider.java @@ -20,8 +20,6 @@ package org.onap.policy.clamp.controlloop.runtime.instantiation; -import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -30,6 +28,7 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import lombok.AllArgsConstructor; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; @@ -39,14 +38,12 @@ import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider import org.onap.policy.clamp.controlloop.models.messages.rest.instantiation.InstantiationCommand; import org.onap.policy.clamp.controlloop.models.messages.rest.instantiation.InstantiationResponse; import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider; -import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ObjectValidationResult; import org.onap.policy.common.parameters.ValidationResult; import org.onap.policy.common.parameters.ValidationStatus; import org.onap.policy.models.base.PfModelException; -import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; import org.springframework.stereotype.Component; @@ -55,7 +52,8 @@ import org.springframework.stereotype.Component; * This class is dedicated to the Instantiation of Commissioned control loop. */ @Component -public class ControlLoopInstantiationProvider implements Closeable { +@AllArgsConstructor +public class ControlLoopInstantiationProvider { private final ControlLoopProvider controlLoopProvider; private final CommissioningProvider commissioningProvider; private final SupervisionHandler supervisionHandler; @@ -63,30 +61,6 @@ public class ControlLoopInstantiationProvider implements Closeable { private static final Object lockit = new Object(); /** - * Create a instantiation provider. - * - * @param controlLoopParameters the parameters for access to the database - * @param commissioningProvider CommissioningProvider - * @param supervisionHandler SupervisionHandler - * @throws PfModelRuntimeException on errors creating a provider - */ - public ControlLoopInstantiationProvider(ClRuntimeParameterGroup controlLoopParameters, - CommissioningProvider commissioningProvider, SupervisionHandler supervisionHandler) { - this.commissioningProvider = commissioningProvider; - this.supervisionHandler = supervisionHandler; - try { - controlLoopProvider = new ControlLoopProvider(controlLoopParameters.getDatabaseProviderParameters()); - } catch (PfModelException e) { - throw new PfModelRuntimeException(e); - } - } - - @Override - public void close() throws IOException { - controlLoopProvider.close(); - } - - /** * Create control loops. * * @param controlLoops the control loop diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java index d1fa31261..f46b90294 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java @@ -20,38 +20,52 @@ package org.onap.policy.clamp.controlloop.runtime.main.parameters; -import javax.validation.constraints.NotBlank; +import javax.validation.Valid; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; import lombok.Getter; -import org.onap.policy.common.endpoints.parameters.RestServerParameters; +import lombok.Setter; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; -import org.onap.policy.common.parameters.ParameterGroupImpl; -import org.onap.policy.common.parameters.annotations.NotNull; +import org.onap.policy.common.parameters.validation.ParameterGroupConstraint; import org.onap.policy.models.provider.PolicyModelsProviderParameters; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; /** * Class to hold all parameters needed for the Control Loop runtime component. * */ -@NotNull -@NotBlank +@Validated @Getter -public class ClRuntimeParameterGroup extends ParameterGroupImpl { - private RestServerParameters restServerParameters; +@Setter +@ConfigurationProperties(prefix = "runtime") +public class ClRuntimeParameterGroup { + + @NotNull + @ParameterGroupConstraint private PolicyModelsProviderParameters databaseProviderParameters; + + @Valid + @NotNull private ParticipantParameters participantParameters; + + @NotNull + @ParameterGroupConstraint private TopicParameterGroup topicParameterGroup; + @Min(value = 0) private long supervisionScannerIntervalSec; + + @Min(value = 0) private long participantStateChangeIntervalSec; + + @Min(value = 0) private long participantClUpdateIntervalSec; + + @Min(value = 0) private long participantClStateChangeIntervalSec; + private long participantRegisterAckIntervalSec; + private long participantDeregisterAckIntervalSec; + private long participantUpdateIntervalSec; - /** - * Create the Control Loop parameter group. - * - * @param name the parameter group name - */ - public ClRuntimeParameterGroup(final String name) { - super(name); - } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java deleted file mode 100644 index bcf1124d2..000000000 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterHandler.java +++ /dev/null @@ -1,72 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.clamp.controlloop.runtime.main.parameters; - -import java.io.File; -import javax.ws.rs.core.Response; -import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException; -import org.onap.policy.common.parameters.ValidationResult; -import org.onap.policy.common.utils.coder.Coder; -import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.common.utils.coder.StandardCoder; - -/** - * This class handles reading, parsing and validating of control loop runtime parameters from JSON files. - */ -public class ClRuntimeParameterHandler { - - private static final Coder CODER = new StandardCoder(); - - /** - * Read the parameters from the parameter file. - * - * @param path the path passed to control loop runtime - * @return the parameters read from the configuration file - * @throws ControlLoopException on parameter exceptions - */ - public ClRuntimeParameterGroup getParameters(final String path) throws ControlLoopException { - ClRuntimeParameterGroup clRuntimeParameterGroup = null; - - // Read the parameters - try { - // Read the parameters from JSON - File file = new File(path); - clRuntimeParameterGroup = CODER.decode(file, ClRuntimeParameterGroup.class); - } catch (final CoderException e) { - throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, - "error reading parameters from \"" + path + "\"\n" + "(" + e.getClass().getSimpleName() + ")", e); - } - - // The JSON processing returns null if there is an empty file - if (clRuntimeParameterGroup == null) { - throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, "no parameters found in \"" + path + "\""); - } - - // validate the parameters - final ValidationResult validationResult = clRuntimeParameterGroup.validate(); - if (!validationResult.isValid()) { - throw new ControlLoopException(Response.Status.NOT_ACCEPTABLE, - "validation error(s) on parameters from \"" + path + "\"\n" + validationResult.getResult()); - } - - return clRuntimeParameterGroup; - } -} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantParameters.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantParameters.java index dfc1b2806..a4e84af0d 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantParameters.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantParameters.java @@ -19,19 +19,20 @@ package org.onap.policy.clamp.controlloop.runtime.main.parameters; import java.util.concurrent.TimeUnit; +import javax.validation.Valid; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; import lombok.Getter; -import org.onap.policy.common.parameters.ParameterGroupImpl; -import org.onap.policy.common.parameters.annotations.Min; -import org.onap.policy.common.parameters.annotations.NotBlank; -import org.onap.policy.common.parameters.annotations.NotNull; +import lombok.Setter; +import org.springframework.validation.annotation.Validated; /** * Parameters for communicating with participants. */ -@NotNull -@NotBlank @Getter -public class ParticipantParameters extends ParameterGroupImpl { +@Setter +@Validated +public class ParticipantParameters { /** * Default maximum message age, in milliseconds, that should be examined. Any message @@ -39,21 +40,18 @@ public class ParticipantParameters extends ParameterGroupImpl { */ public static final long DEFAULT_MAX_AGE_MS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); - @Min(1) private long heartBeatMs; @Min(1) private long maxMessageAgeMs = DEFAULT_MAX_AGE_MS; + @Valid + @NotNull private ParticipantUpdateParameters updateParameters; - private ParticipantStateChangeParameters stateChangeParameters; + @Valid + @NotNull + private ParticipantStateChangeParameters stateChangeParameters; - /** - * Constructs the object. - */ - public ParticipantParameters() { - super(ParticipantParameters.class.getSimpleName()); - } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantStateChangeParameters.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantStateChangeParameters.java index 2eea4ab51..a3e2eee2e 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantStateChangeParameters.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantStateChangeParameters.java @@ -18,19 +18,18 @@ package org.onap.policy.clamp.controlloop.runtime.main.parameters; +import javax.validation.constraints.Min; import lombok.Getter; -import org.onap.policy.common.parameters.ParameterGroupImpl; -import org.onap.policy.common.parameters.annotations.Min; -import org.onap.policy.common.parameters.annotations.NotBlank; -import org.onap.policy.common.parameters.annotations.NotNull; +import lombok.Setter; +import org.springframework.validation.annotation.Validated; /** * Parameters for Participant STATE-CHANGE requests. */ -@NotNull -@NotBlank @Getter -public class ParticipantStateChangeParameters extends ParameterGroupImpl { +@Setter +@Validated +public class ParticipantStateChangeParameters { /** * Maximum number of times to re-send a request to a PDP. @@ -44,10 +43,4 @@ public class ParticipantStateChangeParameters extends ParameterGroupImpl { @Min(value = 0) private long maxWaitMs; - /** - * Constructs the object. - */ - public ParticipantStateChangeParameters() { - super(ParticipantStateChangeParameters.class.getSimpleName()); - } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java index 2af5be534..8102fe90e 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ParticipantUpdateParameters.java @@ -18,19 +18,18 @@ package org.onap.policy.clamp.controlloop.runtime.main.parameters; +import javax.validation.constraints.Min; import lombok.Getter; -import org.onap.policy.common.parameters.ParameterGroupImpl; -import org.onap.policy.common.parameters.annotations.Min; -import org.onap.policy.common.parameters.annotations.NotBlank; -import org.onap.policy.common.parameters.annotations.NotNull; +import lombok.Setter; +import org.springframework.validation.annotation.Validated; /** * Parameters for Participant UPDATE requests. */ -@NotNull -@NotBlank @Getter -public class ParticipantUpdateParameters extends ParameterGroupImpl { +@Setter +@Validated +public class ParticipantUpdateParameters { /** * Maximum number of times to re-send a request to a PDP. @@ -44,11 +43,4 @@ public class ParticipantUpdateParameters extends ParameterGroupImpl { @Min(value = 0) private long maxWaitMs; - /** - * Constructs the object. - */ - public ParticipantUpdateParameters() { - super(ParticipantUpdateParameters.class.getSimpleName()); - } } - diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java index b50e7a0ed..74548e724 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/rest/CommissioningController.java @@ -23,8 +23,6 @@ package org.onap.policy.clamp.controlloop.runtime.main.rest; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.module.jsonSchema.JsonSchema; -import com.fasterxml.jackson.module.jsonSchema.factories.SchemaFactoryWrapper; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; @@ -347,15 +345,24 @@ public class CommissioningController extends AbstractRestController { required = false) String version) { try { - return ResponseEntity.ok().body(provider.getToscaServiceTemplate(name, version)); + ObjectMapper mapper = new ObjectMapper(); + mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + String response = mapper.writerWithDefaultPrettyPrinter() + .writeValueAsString(provider.getToscaServiceTemplate(name, version)); + + return ResponseEntity.ok().body(response); } catch (PfModelRuntimeException | PfModelException e) { LOGGER.warn("Get of tosca service template failed", e); var resp = new CommissioningResponse(); resp.setErrorDetails(e.getErrorResponse().getErrorMessage()); return ResponseEntity.status(e.getErrorResponse().getResponseCode().getStatusCode()).body(resp); + } catch (JsonProcessingException e) { + LOGGER.warn("Get of tosca service template failed", e); + var resp = new CommissioningResponse(); + resp.setErrorDetails(e.getMessage()); + return ResponseEntity.status(Status.BAD_REQUEST.getStatusCode()).body(resp); } - } /** diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java index a68505877..1f6246bd6 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/monitoring/MonitoringProvider.java @@ -20,13 +20,12 @@ package org.onap.policy.clamp.controlloop.runtime.monitoring; -import java.io.Closeable; -import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import lombok.AllArgsConstructor; import lombok.NonNull; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatistics; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatisticsList; @@ -36,7 +35,6 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ClElementStatisticsProvider; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantStatisticsProvider; -import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; @@ -46,7 +44,8 @@ import org.springframework.stereotype.Component; * This class provides information about statistics data of CL elements and CL Participants in database to callers. */ @Component -public class MonitoringProvider implements Closeable { +@AllArgsConstructor +public class MonitoringProvider { private static final String DESC_ORDER = "DESC"; private final ParticipantStatisticsProvider participantStatisticsProvider; @@ -54,32 +53,6 @@ public class MonitoringProvider implements Closeable { private final ControlLoopProvider controlLoopProvider; /** - * Create a Monitoring provider. - * - * @param controlLoopParameters the parameters for access to the database - * @throws PfModelRuntimeException on errors creating the provider - */ - public MonitoringProvider(ClRuntimeParameterGroup controlLoopParameters) { - - try { - participantStatisticsProvider = - new ParticipantStatisticsProvider(controlLoopParameters.getDatabaseProviderParameters()); - clElementStatisticsProvider = - new ClElementStatisticsProvider(controlLoopParameters.getDatabaseProviderParameters()); - controlLoopProvider = new ControlLoopProvider(controlLoopParameters.getDatabaseProviderParameters()); - } catch (PfModelException e) { - throw new PfModelRuntimeException(e); - } - } - - @Override - public void close() throws IOException { - controlLoopProvider.close(); - clElementStatisticsProvider.close(); - participantStatisticsProvider.close(); - } - - /** * Create participant statistics. * * @param participantStatistics the participant statistics diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java index e1b4be48b..5e94d293e 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java @@ -20,31 +20,42 @@ package org.onap.policy.clamp.controlloop.runtime.supervision; +import java.time.Instant; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; +import lombok.AllArgsConstructor; import org.apache.commons.collections4.CollectionUtils; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException; -import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; -import org.onap.policy.clamp.controlloop.common.handler.ControlLoopHandler; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider; import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopStateChangePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopUpdatePublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStateChangePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.utils.services.ServiceManager; @@ -58,10 +69,12 @@ import org.springframework.stereotype.Component; /** * This class handles supervision of control loop instances, so only one object of this type should be built at a time. * - * <p/> It is effectively a singleton that is started at system start. + * <p/> + * It is effectively a singleton that is started at system start. */ @Component -public class SupervisionHandler extends ControlLoopHandler { +@AllArgsConstructor +public class SupervisionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class); private static final String CONTROL_LOOP_CANNOT_TRANSITION_FROM_STATE = "Control loop can't transition from state "; @@ -69,59 +82,17 @@ public class SupervisionHandler extends ControlLoopHandler { private static final String TO_STATE = " to state "; private static final String AND_TRANSITIONING_TO_STATE = " and transitioning to state "; - private ControlLoopProvider controlLoopProvider; - private ParticipantProvider participantProvider; + private final ControlLoopProvider controlLoopProvider; + private final ParticipantProvider participantProvider; private final MonitoringProvider monitoringProvider; private final CommissioningProvider commissioningProvider; // Publishers for participant communication - private ParticipantStateChangePublisher stateChangePublisher; - private ParticipantControlLoopUpdatePublisher controlLoopUpdatePublisher; - private ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher; - - private long supervisionScannerIntervalSec; - private long participantStateChangeIntervalSec; - private long participantClUpdateIntervalSec; - private long participantClStateChangeIntervalSec; - - // Database scanner - private SupervisionScanner scanner; - - /** - * Used to manage the services. - */ - private ServiceManager manager; - private ServiceManager publisherManager; - - /** - * Create a handler. - * - * @param clRuntimeParameterGroup the parameters for the control loop runtime - * @param monitoringProvider the MonitoringProvider - * @param commissioningProvider the CommissioningProvider - */ - public SupervisionHandler(ClRuntimeParameterGroup clRuntimeParameterGroup, MonitoringProvider monitoringProvider, - CommissioningProvider commissioningProvider) { - super(clRuntimeParameterGroup.getDatabaseProviderParameters()); - this.monitoringProvider = monitoringProvider; - this.commissioningProvider = commissioningProvider; - - // @formatter:off - this.manager = new ServiceManager() - .addAction("ControlLoop Provider", - () -> controlLoopProvider = new ControlLoopProvider(getDatabaseProviderParameters()), - () -> controlLoopProvider = null) - .addAction("Participant Provider", - () -> participantProvider = new ParticipantProvider(getDatabaseProviderParameters()), - () -> participantProvider = null); - // @formatter:on - - supervisionScannerIntervalSec = clRuntimeParameterGroup.getSupervisionScannerIntervalSec(); - participantStateChangeIntervalSec = clRuntimeParameterGroup.getParticipantClStateChangeIntervalSec(); - participantClUpdateIntervalSec = clRuntimeParameterGroup.getParticipantClUpdateIntervalSec(); - participantClStateChangeIntervalSec = clRuntimeParameterGroup.getParticipantClStateChangeIntervalSec(); - - } + private final ParticipantControlLoopUpdatePublisher controlLoopUpdatePublisher; + private final ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher; + private final ParticipantRegisterAckPublisher participantRegisterAckPublisher; + private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher; + private final ParticipantUpdatePublisher participantUpdatePublisher; /** * Supervision trigger called when a command is issued on control loops. @@ -155,63 +126,13 @@ public class SupervisionHandler extends ControlLoopHandler { } } - @Override - public void startAndRegisterListeners(MessageTypeDispatcher msgDispatcher) { - msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS.name(), new ParticipantStatusListener(this)); - } - - @Override - public void startAndRegisterPublishers(List<TopicSink> topicSinks) { - // @formatter:off - this.publisherManager = new ServiceManager() - .addAction("Supervision scanner", - () -> scanner = - new SupervisionScanner(controlLoopProvider, supervisionScannerIntervalSec), - () -> scanner.close()) - .addAction("ControlLoopUpdate publisher", - () -> controlLoopUpdatePublisher = - new ParticipantControlLoopUpdatePublisher(topicSinks, participantClUpdateIntervalSec), - () -> controlLoopUpdatePublisher.terminate()) - .addAction("StateChange Publisher", - () -> stateChangePublisher = - new ParticipantStateChangePublisher(topicSinks, participantStateChangeIntervalSec), - () -> stateChangePublisher.terminate()) - .addAction("ControlLoopStateChange Publisher", - () -> controlLoopStateChangePublisher = - new ParticipantControlLoopStateChangePublisher(topicSinks, participantClStateChangeIntervalSec), - () -> controlLoopStateChangePublisher.terminate()); - // @formatter:on - try { - publisherManager.start(); - } catch (final ServiceManagerException exp) { - throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, - "Supervision handler start of publishers or scanner failed", exp); - } - } - - @Override - public void stopAndUnregisterPublishers() { - try { - publisherManager.stop(); - } catch (final ServiceManagerException exp) { - throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, - "Supervision handler stop of publishers or scanner failed", exp); - } - } - - @Override - public void stopAndUnregisterListeners(MessageTypeDispatcher msgDispatcher) { - msgDispatcher.unregister(ParticipantMessageType.PARTICIPANT_STATUS.name()); - } - /** * Handle a ParticipantStatus message from a participant. * * @param participantStatusMessage the ParticipantStatus message received from a participant */ - public void handleParticipantStatusMessage(ParticipantStatus participantStatusMessage) { + public void handleParticipantMessage(ParticipantStatus participantStatusMessage) { LOGGER.debug("Participant Status received {}", participantStatusMessage); - try { superviseParticipant(participantStatusMessage); } catch (PfModelException | ControlLoopException svExc) { @@ -227,6 +148,36 @@ public class SupervisionHandler extends ControlLoopHandler { } /** + * Handle a ParticipantRegister message from a participant. + * + * @param participantRegisterMessage the ParticipantRegister message received from a participant + */ + public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) { + LOGGER.debug("Participant Register received {}", participantRegisterMessage); + sendParticipantAckMessage(participantRegisterMessage); + sendParticipantUpdate(participantRegisterMessage); + } + + /** + * Handle a ParticipantDeregister message from a participant. + * + * @param participantDeregisterMessage the ParticipantDeregister message received from a participant + */ + public void handleParticipantMessage(ParticipantDeregister participantDeregisterMessage) { + LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage); + sendParticipantAckMessage(participantDeregisterMessage); + } + + /** + * Handle a ParticipantUpdateAck message from a participant. + * + * @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant + */ + public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) { + LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage); + } + + /** * Supervise a control loop, performing whatever actions need to be performed on the control loop. * * @param controlLoop the control loop to supervises @@ -270,7 +221,7 @@ public class SupervisionHandler extends ControlLoopHandler { case UNINITIALISED2PASSIVE: case PASSIVE: controlLoop.setState(ControlLoopState.PASSIVE2UNINITIALISED); - sendControlLoopStateChange(controlLoop); + controlLoopStateChangePublisher.send(controlLoop); break; case PASSIVE2UNINITIALISED: @@ -294,7 +245,7 @@ public class SupervisionHandler extends ControlLoopHandler { break; case UNINITIALISED: controlLoop.setState(ControlLoopState.UNINITIALISED2PASSIVE); - sendControlLoopUpdate(controlLoop); + controlLoopUpdatePublisher.send(controlLoop); break; case UNINITIALISED2PASSIVE: @@ -305,7 +256,7 @@ public class SupervisionHandler extends ControlLoopHandler { case RUNNING: controlLoop.setState(ControlLoopState.RUNNING2PASSIVE); - sendControlLoopStateChange(controlLoop); + controlLoopStateChangePublisher.send(controlLoop); break; default: @@ -329,7 +280,7 @@ public class SupervisionHandler extends ControlLoopHandler { case PASSIVE: controlLoop.setState(ControlLoopState.PASSIVE2RUNNING); - sendControlLoopStateChange(controlLoop); + controlLoopStateChangePublisher.send(controlLoop); break; default: @@ -354,10 +305,55 @@ public class SupervisionHandler extends ControlLoopHandler { clsc.setControlLoopId(controlLoop.getKey().asIdentifier()); clsc.setMessageId(UUID.randomUUID()); clsc.setOrderedState(controlLoop.getOrderedState()); - controlLoopStateChangePublisher.send(clsc); } + private void sendParticipantUpdate(ParticipantRegister participantRegisterMessage) { + var message = new ParticipantUpdate(); + message.setParticipantId(participantRegisterMessage.getParticipantId()); + message.setParticipantType(participantRegisterMessage.getParticipantType()); + message.setTimestamp(Instant.now()); + + ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition(); + clDefinition.setId(UUID.randomUUID()); + + try { + clDefinition.setControlLoopElementToscaServiceTemplate(commissioningProvider + .getToscaServiceTemplate(null, null)); + } catch (PfModelException pfme) { + LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme); + return; + } + + Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap = new LinkedHashMap<>(); + controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition); + + Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>> + participantDefinitionUpdateMap = new LinkedHashMap<>(); + participantDefinitionUpdateMap.put(participantRegisterMessage.getParticipantId(), + controlLoopElementDefinitionMap); + message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap); + + LOGGER.debug("Participant Update sent", message); + participantUpdatePublisher.send(message); + } + + private void sendParticipantAckMessage(ParticipantRegister participantRegisterMessage) { + var message = new ParticipantRegisterAck(); + message.setResponseTo(participantRegisterMessage.getMessageId()); + message.setMessage("Participant Register Ack"); + message.setResult(true); + participantRegisterAckPublisher.send(message); + } + + private void sendParticipantAckMessage(ParticipantDeregister participantDeregisterMessage) { + var message = new ParticipantDeregisterAck(); + message.setResponseTo(participantDeregisterMessage.getMessageId()); + message.setMessage("Participant Deregister Ack"); + message.setResult(true); + participantDeregisterAckPublisher.send(message); + } + private void superviseParticipant(ParticipantStatus participantStatusMessage) throws PfModelException, ControlLoopException { if (participantStatusMessage.getParticipantId() == null) { @@ -427,26 +423,6 @@ public class SupervisionHandler extends ControlLoopHandler { } } - @Override - public void startProviders() { - try { - manager.start(); - } catch (final ServiceManagerException exp) { - throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, - "Supervision handler start of providers failed", exp); - } - } - - @Override - public void stopProviders() { - try { - manager.stop(); - } catch (final ServiceManagerException exp) { - throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR, - "Supervision handler stop of providers failed", exp); - } - } - private void exceptionOccured(Response.Status status, String reason) throws ControlLoopException { throw new ControlLoopException(status, reason); } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java index 4f3faf8af..68f5830c0 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionScanner.java @@ -27,13 +27,16 @@ import java.util.concurrent.TimeUnit; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; +import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.models.base.PfModelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * This class is used to scan the control loops in the database and check if they are in the correct state. */ +@Component public class SupervisionScanner implements Runnable, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class); @@ -43,15 +46,17 @@ public class SupervisionScanner implements Runnable, Closeable { /** * Constructor for instantiating SupervisionScanner. * + * @param clRuntimeParameterGroup the parameters for the control loop runtime * @param controlLoopProvider the provider to use to read control loops from the database - * @param interval time interval to perform scans */ - public SupervisionScanner(final ControlLoopProvider controlLoopProvider, final long interval) { + public SupervisionScanner(final ControlLoopProvider controlLoopProvider, + ClRuntimeParameterGroup clRuntimeParameterGroup) { this.controlLoopProvider = controlLoopProvider; // Kick off the timer timerPool = makeTimerPool(); - timerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.SECONDS); + timerPool.scheduleAtFixedRate(this, 0, clRuntimeParameterGroup.getSupervisionScannerIntervalSec(), + TimeUnit.SECONDS); } @Override diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java new file mode 100644 index 000000000..4b4ca9915 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import java.util.List; +import javax.ws.rs.core.Response.Status; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Publisher; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; + +public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMessage> implements Publisher { + + private TopicSinkClient topicSinkClient; + private boolean active = false; + + /** + * Method to send Participant message to participants on demand. + * + * @param participantMessage the Participant message + */ + public void send(final E participantMessage) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } + topicSinkClient.send(participantMessage); + } + + + @Override + public void active(List<TopicSink> topicSinks) { + if (topicSinks.size() != 1) { + throw new IllegalArgumentException("Topic Sink must be one"); + } + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + active = true; + } + + @Override + public void stop() { + active = false; + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantPublisher.java index c54856101..3c87b05b4 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantPublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantPublisher.java @@ -21,35 +21,17 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; import java.util.List; -import lombok.Getter; +import javax.ws.rs.core.Response.Status; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Publisher; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> { +public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> implements Publisher { - private final TopicSinkClient topicSinkClient; - - @Getter - private final long intervalSec; - - /** - * Constructor. - * - * @param topicSinks the topic sinks - * @param intervalSec time interval to send ParticipantStateChange messages - */ - protected AbstractParticipantPublisher(final List<TopicSink> topicSinks, long intervalSec) { - this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); - this.intervalSec = intervalSec; - } - - /** - * Terminates the current timer. - */ - public void terminate() { - // Nothing to terminate, this publisher does not have a timer - } + private TopicSinkClient topicSinkClient; + private boolean active = false; /** * Method to send Participant message to participants on demand. @@ -57,6 +39,24 @@ public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> * @param participantMessage the Participant message */ public void send(final E participantMessage) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } topicSinkClient.send(participantMessage); } + + + @Override + public void active(List<TopicSink> topicSinks) { + if (topicSinks.size() != 1) { + throw new IllegalArgumentException("Topic Sink must be one"); + } + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + active = true; + } + + @Override + public void stop() { + active = false; + } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java index c9d0a4fe4..734ccb842 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java @@ -20,23 +20,29 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; -import java.util.List; +import java.util.UUID; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; -import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.springframework.stereotype.Component; /** * This class is used to send ParticipantControlLoopStateChangePublisher messages to participants on DMaaP. */ +@Component public class ParticipantControlLoopStateChangePublisher extends AbstractParticipantPublisher<ParticipantControlLoopStateChange> { /** - * Constructor for instantiating ParticipantControlLoopStateChangePublisherPublisher. + * Send ControlLoopStateChange to Participant. * - * @param topicSinks the topic sinks - * @param interval time interval to send ParticipantControlLoopStateChangePublisher messages + * @param controlLoop the ControlLoop */ - public ParticipantControlLoopStateChangePublisher(final List<TopicSink> topicSinks, final long interval) { - super(topicSinks, interval); + public void send(ControlLoop controlLoop) { + var clsc = new ParticipantControlLoopStateChange(); + clsc.setControlLoopId(controlLoop.getKey().asIdentifier()); + clsc.setMessageId(UUID.randomUUID()); + clsc.setOrderedState(controlLoop.getOrderedState()); + + super.send(clsc); } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java index fbbd95fbc..8d40c5e69 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java @@ -20,22 +20,42 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; -import java.util.List; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; -import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider; +import org.onap.policy.models.base.PfModelException; +import org.springframework.stereotype.Component; /** * This class is used to send ParticipantControlLoopUpdate messages to participants on DMaaP. */ +@Component public class ParticipantControlLoopUpdatePublisher extends AbstractParticipantPublisher<ParticipantControlLoopUpdate> { + private final CommissioningProvider commissioningProvider; + + /** + * Constructor. + * + * @param commissioningProvider the CommissioningProvider + */ + public ParticipantControlLoopUpdatePublisher(CommissioningProvider commissioningProvider) { + this.commissioningProvider = commissioningProvider; + } + /** - * Constructor for instantiating ParticipantUpdatePublisher. + * Send ControlLoopUpdate to Participant. * - * @param topicSinks the topic sinks - * @param interval time interval to send ParticipantControlLoopUpdate messages + * @param controlLoop the ControlLoop + * @throws PfModelException on errors getting the Control Loop Definition */ - public ParticipantControlLoopUpdatePublisher(final List<TopicSink> topicSinks, final long interval) { - super(topicSinks, interval); + public void send(ControlLoop controlLoop) throws PfModelException { + var pclu = new ParticipantControlLoopUpdate(); + pclu.setControlLoopId(controlLoop.getKey().asIdentifier()); + pclu.setControlLoop(controlLoop); + // TODO: We should look up the correct TOSCA node template here for the control loop + // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap + pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null)); + super.send(pclu); } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java new file mode 100644 index 000000000..c0fcb3e7d --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; +import org.springframework.stereotype.Component; + +/** + * This class is used to send ParticipantDeregisterAck messages to participants on DMaaP. + */ +@Component +public class ParticipantDeregisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantDeregisterAck> { + +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java new file mode 100644 index 000000000..a03ff0a63 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener; +import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Listener for ParticipantDeregister messages sent by participants. + */ +@Component +public class ParticipantDeregisterListener extends ScoListener<ParticipantDeregister> implements Listener { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantDeregisterListener.class); + + private final SupervisionHandler supervisionHandler; + + /** + * Constructs the object. + */ + public ParticipantDeregisterListener(SupervisionHandler supervisionHandler) { + super(ParticipantDeregister.class); + this.supervisionHandler = supervisionHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantDeregister participantDeregisterMessage) { + LOGGER.debug("ParticipantDeregister message received from participant - {}", participantDeregisterMessage); + supervisionHandler.handleParticipantMessage(participantDeregisterMessage); + } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_DEREGISTER.name(); + } + + @Override + public ScoListener<ParticipantDeregister> getScoListener() { + return this; + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java new file mode 100644 index 000000000..2c0c4b393 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; +import org.springframework.stereotype.Component; + +/** + * This class is used to send ParticipantRegisterAck messages to participants on DMaaP. + */ +@Component +public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantRegisterAck> { + +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java new file mode 100644 index 000000000..a4d8c7697 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener; +import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Listener for ParticipantRegister messages sent by participants. + */ +@Component +public class ParticipantRegisterListener extends ScoListener<ParticipantRegister> implements Listener { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantRegisterListener.class); + + private final SupervisionHandler supervisionHandler; + + /** + * Constructs the object. + */ + public ParticipantRegisterListener(SupervisionHandler supervisionHandler) { + super(ParticipantRegister.class); + this.supervisionHandler = supervisionHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantRegister participantRegisterMessage) { + LOGGER.debug("ParticipantRegister message received from participant - {}", participantRegisterMessage); + supervisionHandler.handleParticipantMessage(participantRegisterMessage); + } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_REGISTER.name(); + } + + @Override + public ScoListener<ParticipantRegister> getScoListener() { + return this; + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java index 20cdea6f4..b63cbdf03 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java @@ -20,22 +20,13 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; -import java.util.List; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; -import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.springframework.stereotype.Component; /** * This class is used to send ParticipantStateChange messages to participants on DMaaP. */ +@Component public class ParticipantStateChangePublisher extends AbstractParticipantPublisher<ParticipantStateChange> { - /** - * Constructor for instantiating ParticipantStateChangePublisher. - * - * @param topicSinks the topic sinks - * @param interval time interval to send ParticipantStateChange messages - */ - public ParticipantStateChangePublisher(List<TopicSink> topicSinks, long interval) { - super(topicSinks, interval); - } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java index 88b838613..9da886026 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java @@ -20,18 +20,22 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener; import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * Listener for ParticipantStatus messages sent by participants. */ -public class ParticipantStatusListener extends ScoListener<ParticipantStatus> { +@Component +public class ParticipantStatusListener extends ScoListener<ParticipantStatus> implements Listener { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusListener.class); private final SupervisionHandler supervisionHandler; @@ -48,6 +52,16 @@ public class ParticipantStatusListener extends ScoListener<ParticipantStatus> { public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, final ParticipantStatus participantStatusMessage) { LOGGER.debug("ParticipantStatus message received from participant - {}", participantStatusMessage); - supervisionHandler.handleParticipantStatusMessage(participantStatusMessage); + supervisionHandler.handleParticipantMessage(participantStatusMessage); + } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_STATUS.name(); + } + + @Override + public ScoListener<ParticipantStatus> getScoListener() { + return this; } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java new file mode 100644 index 000000000..b8538b1f7 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener; +import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Listener for ParticipantUpdateAck messages sent by participants. + */ +@Component +public class ParticipantUpdateAckListener extends ScoListener<ParticipantUpdateAck> implements Listener { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantUpdateAckListener.class); + + private final SupervisionHandler supervisionHandler; + + /** + * Constructs the object. + */ + public ParticipantUpdateAckListener(SupervisionHandler supervisionHandler) { + super(ParticipantUpdateAck.class); + this.supervisionHandler = supervisionHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantUpdateAck participantUpdateAckMessage) { + LOGGER.debug("ParticipantUpdateAck message received from participant - {}", participantUpdateAckMessage); + supervisionHandler.handleParticipantMessage(participantUpdateAckMessage); + } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_UPDATE_ACK.name(); + } + + @Override + public ScoListener<ParticipantUpdateAck> getScoListener() { + return this; + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java new file mode 100644 index 000000000..5af5f1f54 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.springframework.stereotype.Component; + +/** + * This class is used to send ParticipantUpdate messages to participants on DMaaP. + */ +@Component +public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<ParticipantUpdate> { + +} |