diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-06-24 16:30:23 +0100 |
---|---|---|
committer | FrancescoFioraEst <francesco.fiora@est.tech> | 2021-06-25 16:28:59 +0100 |
commit | a3482504ead21591265027629da810dac2290603 (patch) | |
tree | 78eaf5ec2eb27f668df95ea447109413a37eeb2d /runtime-controlloop/src/main/java | |
parent | c2f0aac6784db56706931a5fa409e18a3dc872cd (diff) |
Refactor participant event handling
Issue-ID: POLICY-3268
Change-Id: I02b03fd9f9f4399e045e766baace203f42e678cf
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-controlloop/src/main/java')
6 files changed, 98 insertions, 123 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java index ba25a6dad..d1fa31261 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java @@ -20,10 +20,8 @@ package org.onap.policy.clamp.controlloop.runtime.main.parameters; -import java.util.List; import javax.validation.constraints.NotBlank; import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.parameters.RestServerParameters; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.parameters.ParameterGroupImpl; @@ -43,6 +41,11 @@ public class ClRuntimeParameterGroup extends ParameterGroupImpl { private ParticipantParameters participantParameters; private TopicParameterGroup topicParameterGroup; + private long supervisionScannerIntervalSec; + private long participantStateChangeIntervalSec; + private long participantClUpdateIntervalSec; + private long participantClStateChangeIntervalSec; + /** * Create the Control Loop parameter group. * diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java index 7c7dc3a69..511185da1 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java @@ -78,6 +78,11 @@ public class SupervisionHandler extends ControlLoopHandler { private ParticipantControlLoopUpdatePublisher controlLoopUpdatePublisher; private ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher; + private long supervisionScannerIntervalSec; + private long participantStateChangeIntervalSec; + private long participantClUpdateIntervalSec; + private long participantClStateChangeIntervalSec; + // Database scanner private SupervisionScanner scanner; @@ -112,6 +117,12 @@ public class SupervisionHandler extends ControlLoopHandler { () -> participantProvider = new ParticipantProvider(getDatabaseProviderParameters()), () -> participantProvider = null); // @formatter:on + + supervisionScannerIntervalSec = clRuntimeParameterGroup.getSupervisionScannerIntervalSec(); + participantStateChangeIntervalSec = clRuntimeParameterGroup.getParticipantClStateChangeIntervalSec(); + participantClUpdateIntervalSec = clRuntimeParameterGroup.getParticipantClUpdateIntervalSec(); + participantClStateChangeIntervalSec = clRuntimeParameterGroup.getParticipantClStateChangeIntervalSec(); + } /** @@ -129,8 +140,7 @@ public class SupervisionHandler extends ControlLoopHandler { if (CollectionUtils.isEmpty(controlLoopIdentifierList)) { // This is just to force throwing of the exception in certain circumstances. - exceptionOccured(Response.Status.NOT_ACCEPTABLE, - "The list of control loops for supervision is empty"); + exceptionOccured(Response.Status.NOT_ACCEPTABLE, "The list of control loops for supervision is empty"); } for (ToscaConceptIdentifier controlLoopId : controlLoopIdentifierList) { @@ -153,21 +163,23 @@ public class SupervisionHandler extends ControlLoopHandler { @Override public void startAndRegisterPublishers(List<TopicSink> topicSinks) { - // TODO: Use a parameter for the timeout // @formatter:off this.publisherManager = new ServiceManager() .addAction("Supervision scanner", - () -> scanner = new SupervisionScanner(controlLoopProvider, 10000), - () -> scanner = null) + () -> scanner = + new SupervisionScanner(controlLoopProvider, supervisionScannerIntervalSec), + () -> scanner.close()) .addAction("ControlLoopUpdate publisher", - () -> controlLoopUpdatePublisher = new ParticipantControlLoopUpdatePublisher(topicSinks, -1), + () -> controlLoopUpdatePublisher = + new ParticipantControlLoopUpdatePublisher(topicSinks, participantClUpdateIntervalSec), () -> controlLoopUpdatePublisher.terminate()) .addAction("StateChange Publisher", - () -> stateChangePublisher = new ParticipantStateChangePublisher(topicSinks, 10000), + () -> stateChangePublisher = + new ParticipantStateChangePublisher(topicSinks, participantStateChangeIntervalSec), () -> stateChangePublisher.terminate()) .addAction("ControlLoopStateChange Publisher", () -> controlLoopStateChangePublisher = - new ParticipantControlLoopStateChangePublisher(topicSinks, -1), + new ParticipantControlLoopStateChangePublisher(topicSinks, participantClStateChangeIntervalSec), () -> controlLoopStateChangePublisher.terminate()); // @formatter:on try { @@ -222,7 +234,7 @@ public class SupervisionHandler extends ControlLoopHandler { * @throws PfModelException on accessing models in the database * @throws ControlLoopException on supervision errors */ - private void superviseControlLoop(ControlLoop controlLoop) throws ControlLoopException, PfModelException { + private void superviseControlLoop(ControlLoop controlLoop) throws ControlLoopException, PfModelException { switch (controlLoop.getOrderedState()) { case UNINITIALISED: superviseControlLoopUninitialization(controlLoop); @@ -351,8 +363,7 @@ public class SupervisionHandler extends ControlLoopHandler { private void superviseParticipant(ParticipantStatus participantStatusMessage) throws PfModelException, ControlLoopException { if (participantStatusMessage.getParticipantId() == null) { - exceptionOccured(Response.Status.NOT_FOUND, - "Participant ID on PARTICIPANT_STATUS message is null"); + exceptionOccured(Response.Status.NOT_FOUND, "Participant ID on PARTICIPANT_STATUS message is null"); } List<Participant> participantList = @@ -378,8 +389,7 @@ public class SupervisionHandler extends ControlLoopHandler { } monitoringProvider = MonitoringHandler.getInstance().getMonitoringProvider(); - monitoringProvider.createParticipantStatistics( - List.of(participantStatusMessage.getParticipantStatistics())); + monitoringProvider.createParticipantStatistics(List.of(participantStatusMessage.getParticipantStatistics())); } private void superviseControlLoops(ParticipantStatus participantStatusMessage) diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantPublisher.java new file mode 100644 index 000000000..c54856101 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantPublisher.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import java.util.List; +import lombok.Getter; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; + +public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> { + + private final TopicSinkClient topicSinkClient; + + @Getter + private final long intervalSec; + + /** + * Constructor. + * + * @param topicSinks the topic sinks + * @param intervalSec time interval to send ParticipantStateChange messages + */ + protected AbstractParticipantPublisher(final List<TopicSink> topicSinks, long intervalSec) { + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + this.intervalSec = intervalSec; + } + + /** + * Terminates the current timer. + */ + public void terminate() { + // Nothing to terminate, this publisher does not have a timer + } + + /** + * Method to send Participant message to participants on demand. + * + * @param participantMessage the Participant message + */ + public void send(final E participantMessage) { + topicSinkClient.send(participantMessage); + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java index c9c8ab851..c9d0a4fe4 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopStateChangePublisher.java @@ -23,17 +23,12 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; import java.util.List; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This class is used to send ParticipantControlLoopStateChangePublisher messages to participants on DMaaP. */ -public class ParticipantControlLoopStateChangePublisher { - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantControlLoopStateChangePublisher.class); - - private TopicSinkClient topicSinkClient; +public class ParticipantControlLoopStateChangePublisher + extends AbstractParticipantPublisher<ParticipantControlLoopStateChange> { /** * Constructor for instantiating ParticipantControlLoopStateChangePublisherPublisher. @@ -42,34 +37,6 @@ public class ParticipantControlLoopStateChangePublisher { * @param interval time interval to send ParticipantControlLoopStateChangePublisher messages */ public ParticipantControlLoopStateChangePublisher(final List<TopicSink> topicSinks, final long interval) { - // TODO: Should not be dependent on the order of topic sinks in the config - this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); - } - - /** - * Terminates the current timer. - */ - public void terminate() { - // This is a user initiated message and doesn't need a timer. - } - - /** - * Get the current time interval used by the timer task. - * - * @return interval the current time interval - */ - public long getInterval() { - // This is a user initiated message and doesn't need a timer. - return -1; - } - - /** - * Method to send ParticipantControlLoopStateChangePublisher status message to participants on demand. - * - * @param controlLoopStateChange the ParticipantControlLoopStateChangePublisher message - */ - public void send(final ParticipantControlLoopStateChange controlLoopStateChange) { - topicSinkClient.send(controlLoopStateChange); - LOGGER.debug("Sent ParticipantControlLoopStateChange to Participants - {}", controlLoopStateChange); + super(topicSinks, interval); } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java index 3c5d230c5..fbbd95fbc 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantControlLoopUpdatePublisher.java @@ -23,17 +23,11 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; import java.util.List; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This class is used to send ParticipantControlLoopUpdate messages to participants on DMaaP. */ -public class ParticipantControlLoopUpdatePublisher { - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantControlLoopUpdatePublisher.class); - - private TopicSinkClient topicSinkClient; +public class ParticipantControlLoopUpdatePublisher extends AbstractParticipantPublisher<ParticipantControlLoopUpdate> { /** * Constructor for instantiating ParticipantUpdatePublisher. @@ -42,34 +36,6 @@ public class ParticipantControlLoopUpdatePublisher { * @param interval time interval to send ParticipantControlLoopUpdate messages */ public ParticipantControlLoopUpdatePublisher(final List<TopicSink> topicSinks, final long interval) { - // TODO: Should not be dependent on the order of topic sinks in the config - this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); - } - - /** - * Terminates the current timer. - */ - public void terminate() { - // This is a user initiated message and doesn't need a timer. - } - - /** - * Get the current time interval used by the timer task. - * - * @return interval the current time interval - */ - public long getInterval() { - // This is a user initiated message and doesn't need a timer. - return -1; - } - - /** - * Method to send ParticipantControlLoopUpdate status message to participants on demand. - * - * @param participantControlLoopUpdate the ParticipantControlLoopUpdate message - */ - public void send(final ParticipantControlLoopUpdate participantControlLoopUpdate) { - topicSinkClient.send(participantControlLoopUpdate); - LOGGER.debug("Sent ParticipantControlLoopUpdate to Participants - {}", participantControlLoopUpdate); + super(topicSinks, interval); } } diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java index 099039115..20cdea6f4 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStateChangePublisher.java @@ -23,17 +23,11 @@ package org.onap.policy.clamp.controlloop.runtime.supervision.comm; import java.util.List; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This class is used to send ParticipantStateChange messages to participants on DMaaP. */ -public class ParticipantStateChangePublisher { - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStateChangePublisher.class); - - private TopicSinkClient topicSinkClient; +public class ParticipantStateChangePublisher extends AbstractParticipantPublisher<ParticipantStateChange> { /** * Constructor for instantiating ParticipantStateChangePublisher. @@ -41,34 +35,7 @@ public class ParticipantStateChangePublisher { * @param topicSinks the topic sinks * @param interval time interval to send ParticipantStateChange messages */ - public ParticipantStateChangePublisher(final List<TopicSink> topicSinks, final long interval) { - // TODO: Should not be dependent on the order of topic sinks in the config - this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); - } - - /** - * Terminates the current timer. - */ - public void terminate() { - // Nothing to terminate, this publisher does not have a timer - } - - /** - * Get the current time interval used by the timer task. - * - * @return interval the current time interval - */ - public long getInterval() { - return -1; - } - - /** - * Method to send ParticipantStateChange status message to participants on demand. - * - * @param participantStateChange the ParticipantStateChange message - */ - public void send(final ParticipantStateChange participantStateChange) { - topicSinkClient.send(participantStateChange); - LOGGER.debug("Sent ParticipantStateChange to Participants - {}", participantStateChange); + public ParticipantStateChangePublisher(List<TopicSink> topicSinks, long interval) { + super(topicSinks, interval); } } |