diff options
Diffstat (limited to 'participant/participant-intermediary/src')
3 files changed, 94 insertions, 159 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java index 680acd276..e11c883b4 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java @@ -25,52 +25,41 @@ import java.util.TimerTask; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; -import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; -import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; -import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; -import org.onap.policy.models.base.PfModelException; -import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** * This class sends messages from participants to CLAMP. */ +@Component public class MessageSender extends TimerTask implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class); private final ParticipantHandler participantHandler; - private final ParticipantMessagePublisher publisher; private ScheduledExecutorService timerPool; /** * Constructor, set the publisher. * * @param participantHandler the participant handler to use for gathering information - * @param publisher the publisher to use for sending messages - * @param interval time interval to send Participant Status periodic messages + * @param parameters the parameters of the participant */ - public MessageSender(ParticipantHandler participantHandler, ParticipantMessagePublisher publisher, - long interval) { + public MessageSender(ParticipantHandler participantHandler, ParticipantParameters parameters) { this.participantHandler = participantHandler; - this.publisher = publisher; // Kick off the timer timerPool = makeTimerPool(); + var interval = parameters.getIntermediaryParameters().getReportingTimeIntervalMs(); timerPool.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS); } @Override public void run() { LOGGER.debug("Sent heartbeat to CLAMP"); - this.sendHeartbeat(); + participantHandler.sendHeartbeat(); } @Override @@ -79,97 +68,6 @@ public class MessageSender extends TimerTask implements Closeable { } /** - * Send a response message for this participant. - * - * @param ackMessage the details to include in the response message - */ - public void sendAckResponse(ControlLoopAck ackMessage) { - sendAckResponse(null, ackMessage); - } - - /** - * Dispatch a response message for this participant. - * - * @param controlLoopId the control loop to which this message is a response - * @param ackMessage the details to include in the response message - */ - public void sendAckResponse(ToscaConceptIdentifier controlLoopId, ControlLoopAck ackMessage) { - // Participant related fields - ackMessage.setParticipantType(participantHandler.getParticipantType()); - ackMessage.setParticipantId(participantHandler.getParticipantId()); - publisher.sendControlLoopAck(ackMessage); - } - - /** - * Send a ParticipantRegister message for this participant. - * - * @param message the participantRegister message - */ - public void sendParticipantRegister(ParticipantRegister message) { - publisher.sendParticipantRegister(message); - } - - /** - * Send a ParticipantDeregister message for this participant. - * - * @param message the participantDeRegister message - */ - public void sendParticipantDeregister(ParticipantDeregister message) { - publisher.sendParticipantDeregister(message); - } - - /** - * Send a ParticipantUpdateAck message for this participant update. - * - * @param message the participantUpdateAck message - */ - public void sendParticipantUpdateAck(ParticipantUpdateAck message) { - publisher.sendParticipantUpdateAck(message); - } - - /** - * Send a ParticipantStatus message for this participant. - * - * @param participantStatus the ParticipantStatus message - */ - public void sendParticipantStatus(ParticipantStatus participantStatus) { - var controlLoops = participantHandler.getControlLoopHandler().getControlLoops(); - for (ControlLoopElementListener clElementListener : - participantHandler.getControlLoopHandler().getListeners()) { - updateClElementStatistics(controlLoops, clElementListener); - } - - publisher.sendParticipantStatus(participantStatus); - } - - /** - * Dispatch a heartbeat for this participant. - */ - public void sendHeartbeat() { - publisher.sendHeartbeat(participantHandler.makeHeartbeat(false)); - } - - /** - * Update ControlLoopElement statistics. The control loop elements listening will be - * notified to retrieve statistics from respective controlloop elements, and controlloopelements - * data on the handler will be updated. - * - * @param controlLoops the control loops - * @param clElementListener control loop element listener - */ - public void updateClElementStatistics(ControlLoops controlLoops, ControlLoopElementListener clElementListener) { - for (ControlLoop controlLoop : controlLoops.getControlLoopList()) { - for (ControlLoopElement element : controlLoop.getElements().values()) { - try { - clElementListener.handleStatistics(element.getId()); - } catch (PfModelException e) { - LOGGER.debug("Getting statistics for Control loop element failed"); - } - } - } - } - - /** * Makes a new timer pool. * * @return a new timer pool diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java index 0e276f390..8b4c61dca 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java @@ -29,12 +29,11 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import lombok.Getter; -import lombok.NoArgsConstructor; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.tuple.Pair; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatistics; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementAck; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; @@ -45,24 +44,25 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Contr import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; -import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /* * This class is responsible for managing the state of all control loops in the participant. */ -@NoArgsConstructor +@Component public class ControlLoopHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopHandler.class); - private ToscaConceptIdentifier participantType = null; - private ToscaConceptIdentifier participantId = null; - private MessageSender messageSender = null; + private final ToscaConceptIdentifier participantType; + private final ToscaConceptIdentifier participantId; + private final ParticipantMessagePublisher publisher; @Getter private final Map<ToscaConceptIdentifier, ControlLoop> controlLoopMap = new LinkedHashMap<>(); @@ -77,12 +77,12 @@ public class ControlLoopHandler { * Constructor, set the participant ID and messageSender. * * @param parameters the parameters of the participant - * @param messageSender the messageSender for sending responses to messages + * @param publisher the ParticipantMessage Publisher */ - public ControlLoopHandler(ParticipantIntermediaryParameters parameters, MessageSender messageSender) { - this.participantType = parameters.getParticipantType(); - this.participantId = parameters.getParticipantId(); - this.messageSender = messageSender; + public ControlLoopHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher) { + this.participantType = parameters.getIntermediaryParameters().getParticipantType(); + this.participantId = parameters.getIntermediaryParameters().getParticipantId(); + this.publisher = publisher; } public void registerControlLoopElementListener(ControlLoopElementListener listener) { @@ -104,18 +104,20 @@ public class ControlLoopHandler { LOGGER.warn("Cannot update Control loop element state, id is null"); } - var controlLoopStateChangeAck = - new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_STATECHANGE_ACK); ControlLoopElement clElement = elementsOnThisParticipant.get(id); if (clElement != null) { + var controlLoopStateChangeAck = + new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_STATECHANGE_ACK); + controlLoopStateChangeAck.setParticipantId(participantId); + controlLoopStateChangeAck.setParticipantType(participantType); clElement.setOrderedState(orderedState); clElement.setState(newState); controlLoopStateChangeAck.getControlLoopResultMap().put(clElement.getId(), - Pair.of(true, "Control loop element {} state changed to {}\", id, newState)")); + new ControlLoopElementAck(true, "Control loop element {} state changed to {}\", id, newState)")); LOGGER.debug("Control loop element {} state changed to {}", id, newState); controlLoopStateChangeAck.setMessage("ControlLoopElement state changed to {} " + newState); controlLoopStateChangeAck.setResult(true); - messageSender.sendAckResponse(controlLoopStateChangeAck); + publisher.sendControlLoopAck(controlLoopStateChangeAck); return clElement; } return null; @@ -147,15 +149,17 @@ public class ControlLoopHandler { } var controlLoop = controlLoopMap.get(stateChangeMsg.getControlLoopId()); - var controlLoopAck = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE); if (controlLoop == null) { + var controlLoopAck = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE); + controlLoopAck.setParticipantId(participantId); + controlLoopAck.setParticipantType(participantType); controlLoopAck.setMessage("Control loop " + stateChangeMsg.getControlLoopId() + " does not use this participant " + participantId); controlLoopAck.setResult(false); controlLoopAck.setResponseTo(stateChangeMsg.getMessageId()); controlLoopAck.setControlLoopId(stateChangeMsg.getControlLoopId()); - messageSender.sendAckResponse(controlLoopAck); + publisher.sendControlLoopAck(controlLoopAck); LOGGER.debug("Control loop {} does not use this participant", stateChangeMsg.getControlLoopId()); return; } @@ -200,17 +204,19 @@ public class ControlLoopHandler { var controlLoop = controlLoopMap.get(updateMsg.getControlLoopId()); - var controlLoopUpdateAck = new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_UPDATE_ACK); - // TODO: Updates to existing ControlLoops are not supported yet (Addition/Removal of ControlLoop // elements to existing ControlLoop has to be supported). if (controlLoop != null) { + var controlLoopUpdateAck = new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_UPDATE_ACK); + controlLoopUpdateAck.setParticipantId(participantId); + controlLoopUpdateAck.setParticipantType(participantType); + controlLoopUpdateAck.setMessage("Control loop " + updateMsg.getControlLoopId() + " already defined on participant " + participantId); controlLoopUpdateAck.setResult(false); controlLoopUpdateAck.setResponseTo(updateMsg.getMessageId()); controlLoopUpdateAck.setControlLoopId(updateMsg.getControlLoopId()); - messageSender.sendAckResponse(controlLoopUpdateAck); + publisher.sendControlLoopAck(controlLoopUpdateAck); return; } @@ -319,10 +325,12 @@ public class ControlLoopHandler { if (orderedState.equals(controlLoop.getOrderedState())) { var controlLoopAck = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE); + controlLoopAck.setParticipantId(participantId); + controlLoopAck.setParticipantType(participantType); controlLoopAck.setMessage("Control loop is already in state" + orderedState); controlLoopAck.setResult(false); controlLoopAck.setControlLoopId(controlLoop.getDefinition()); - messageSender.sendAckResponse(controlLoopAck); + publisher.sendControlLoopAck(controlLoopAck); return; } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java index 66e09e7f6..d3f6c4d25 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -22,7 +22,6 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; -import java.io.Closeable; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -33,9 +32,11 @@ import lombok.Getter; import lombok.Setter; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatisticsList; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopInfo; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopStatistics; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantDefinition; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus; @@ -53,9 +54,10 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Parti import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; +import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; import org.slf4j.Logger; @@ -67,14 +69,14 @@ import org.springframework.stereotype.Component; */ @Getter @Component -public class ParticipantHandler implements Closeable { +public class ParticipantHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHandler.class); private final ToscaConceptIdentifier participantType; private final ToscaConceptIdentifier participantId; - private final MessageSender sender; private final ControlLoopHandler controlLoopHandler; private final ParticipantStatistics participantStatistics; + private final ParticipantMessagePublisher publisher; @Setter private ParticipantState state = ParticipantState.UNKNOWN; @@ -82,10 +84,9 @@ public class ParticipantHandler implements Closeable { @Setter private ParticipantHealthStatus healthStatus = ParticipantHealthStatus.UNKNOWN; - private List<ControlLoopElementDefinition> clElementDefsOnThisParticipant = - new ArrayList<>(); + private final List<ControlLoopElementDefinition> clElementDefsOnThisParticipant = new ArrayList<>(); - public ToscaServiceTemplate toscaServiceTemplate = new ToscaServiceTemplate(); + private ToscaServiceTemplate toscaServiceTemplate = new ToscaServiceTemplate(); /** * Constructor, set the participant ID and sender. @@ -93,13 +94,12 @@ public class ParticipantHandler implements Closeable { * @param parameters the parameters of the participant * @param publisher the publisher for sending responses to messages */ - public ParticipantHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher) { + public ParticipantHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher, + ControlLoopHandler controlLoopHandler) { this.participantType = parameters.getIntermediaryParameters().getParticipantType(); this.participantId = parameters.getIntermediaryParameters().getParticipantId(); - this.sender = - new MessageSender(this, publisher, - parameters.getIntermediaryParameters().getReportingTimeIntervalMs()); - this.controlLoopHandler = new ControlLoopHandler(parameters.getIntermediaryParameters(), sender); + this.publisher = publisher; + this.controlLoopHandler = controlLoopHandler; this.participantStatistics = new ParticipantStatistics(); this.participantStatistics.setParticipantId(participantId); this.participantStatistics.setState(state); @@ -107,18 +107,40 @@ public class ParticipantHandler implements Closeable { this.participantStatistics.setTimeStamp(Instant.now()); } - @Override - public void close() { - sender.close(); - } - /** * Method which handles a participant health check event from clamp. * * @param participantStatusReqMsg participant participantStatusReq message */ public void handleParticipantStatusReq(final ParticipantStatusReq participantStatusReqMsg) { - sender.sendParticipantStatus(makeHeartbeat(true)); + var controlLoops = controlLoopHandler.getControlLoops(); + for (ControlLoopElementListener clElementListener : controlLoopHandler.getListeners()) { + updateClElementStatistics(controlLoops, clElementListener); + } + + var participantStatus = makeHeartbeat(true); + publisher.sendParticipantStatus(participantStatus); + } + + /** + * Update ControlLoopElement statistics. The control loop elements listening will be + * notified to retrieve statistics from respective controlloop elements, and controlloopelements + * data on the handler will be updated. + * + * @param controlLoops the control loops + * @param clElementListener control loop element listener + */ + private void updateClElementStatistics(ControlLoops controlLoops, ControlLoopElementListener clElementListener) { + for (ControlLoop controlLoop : controlLoops.getControlLoopList()) { + for (ControlLoopElement element : controlLoop.getElements().values()) { + try { + clElementListener.handleStatistics(element.getId()); + } catch (PfModelException e) { + LOGGER.debug("Getting statistics for Control loop element failed for element ID {}", + element.getId(), e); + } + } + } } /** @@ -165,7 +187,7 @@ public class ParticipantHandler implements Closeable { var participantUpdateAck = new ParticipantUpdateAck(); handleStateChange(participantState, participantUpdateAck); - sender.sendParticipantUpdateAck(participantUpdateAck); + publisher.sendParticipantUpdateAck(participantUpdateAck); return getParticipant(definition.getName(), definition.getVersion()); } @@ -215,7 +237,7 @@ public class ParticipantHandler implements Closeable { participantRegister.setParticipantId(participantId); participantRegister.setParticipantType(participantType); - sender.sendParticipantRegister(participantRegister); + publisher.sendParticipantRegister(participantRegister); } /** @@ -236,7 +258,7 @@ public class ParticipantHandler implements Closeable { participantDeregister.setParticipantId(participantId); participantDeregister.setParticipantType(participantType); - sender.sendParticipantDeregister(participantDeregister); + publisher.sendParticipantDeregister(participantDeregister); } /** @@ -267,7 +289,8 @@ public class ParticipantHandler implements Closeable { // This message is to commission the controlloop for (ParticipantDefinition participantDefinition : participantUpdateMsg.getParticipantDefinitionUpdates()) { if (participantDefinition.getParticipantId().equals(participantType)) { - clElementDefsOnThisParticipant = participantDefinition.getControlLoopElementDefinitionList(); + clElementDefsOnThisParticipant.clear(); + clElementDefsOnThisParticipant.addAll(participantDefinition.getControlLoopElementDefinitionList()); break; } } @@ -289,7 +312,14 @@ public class ParticipantHandler implements Closeable { participantUpdateAck.setParticipantId(participantId); participantUpdateAck.setParticipantType(participantType); - sender.sendParticipantUpdateAck(participantUpdateAck); + publisher.sendParticipantUpdateAck(participantUpdateAck); + } + + /** + * Dispatch a heartbeat for this participant. + */ + public void sendHeartbeat() { + publisher.sendHeartbeat(makeHeartbeat(false)); } /** @@ -322,15 +352,14 @@ public class ParticipantHandler implements Closeable { private List<ControlLoopInfo> getControlLoopInfoList() { List<ControlLoopInfo> controlLoopInfoList = new ArrayList<>(); - for (Map.Entry<ToscaConceptIdentifier, ControlLoop> entry : - controlLoopHandler.getControlLoopMap().entrySet()) { + for (Map.Entry<ToscaConceptIdentifier, ControlLoop> entry : controlLoopHandler.getControlLoopMap().entrySet()) { ControlLoopInfo clInfo = new ControlLoopInfo(); clInfo.setControlLoopId(entry.getKey()); ControlLoopStatistics clStatitistics = new ControlLoopStatistics(); clStatitistics.setControlLoopId(entry.getKey()); ClElementStatisticsList clElementStatisticsList = new ClElementStatisticsList(); - clElementStatisticsList.setClElementStatistics( - entry.getValue().getControlLoopElementStatisticsList(entry.getValue())); + clElementStatisticsList + .setClElementStatistics(entry.getValue().getControlLoopElementStatisticsList(entry.getValue())); clStatitistics.setClElementStatisticsList(clElementStatisticsList); clInfo.setControlLoopStatistics(clStatitistics); clInfo.setState(entry.getValue().getState()); |