aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--models/src/main/java/org/onap/policy/clamp/controlloop/models/controlloop/concepts/ControlLoopElementAck.java40
-rw-r--r--models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAck.java4
-rw-r--r--models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAckTest.java17
-rw-r--r--models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopUpdateTest.java5
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java116
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java54
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java83
7 files changed, 150 insertions, 169 deletions
diff --git a/models/src/main/java/org/onap/policy/clamp/controlloop/models/controlloop/concepts/ControlLoopElementAck.java b/models/src/main/java/org/onap/policy/clamp/controlloop/models/controlloop/concepts/ControlLoopElementAck.java
new file mode 100644
index 000000000..a5918fe94
--- /dev/null
+++ b/models/src/main/java/org/onap/policy/clamp/controlloop/models/controlloop/concepts/ControlLoopElementAck.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.models.controlloop.concepts;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@AllArgsConstructor
+@ToString
+public class ControlLoopElementAck {
+
+ // Result: Success/Fail.
+ private Boolean result;
+
+ // Message indicating reason for failure
+ private String message;
+
+}
diff --git a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAck.java b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAck.java
index 55ba7faf5..8e3604916 100644
--- a/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAck.java
+++ b/models/src/main/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAck.java
@@ -27,7 +27,7 @@ import java.util.function.UnaryOperator;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import org.apache.commons.lang3.tuple.Pair;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementAck;
import org.onap.policy.models.base.PfUtils;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -45,7 +45,7 @@ public class ControlLoopAck extends ParticipantAckMessage {
// A map with ControlLoopElementID as its key, and a pair of result and message as value per
// ControlLoopElement.
- private Map<UUID, Pair<Boolean, String>> controlLoopResultMap = new LinkedHashMap<>();
+ private Map<UUID, ControlLoopElementAck> controlLoopResultMap = new LinkedHashMap<>();
/**
* Constructor for instantiating ParticipantRegisterAck class with message name.
diff --git a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAckTest.java b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAckTest.java
index d7d7e4327..5fded7341 100644
--- a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAckTest.java
+++ b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopAckTest.java
@@ -22,35 +22,36 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
+import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.assertSerializable;
import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.removeVariableFields;
import java.util.Map;
import java.util.UUID;
-import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementAck;
+import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
class ControlLoopAckTest {
@Test
- void testCopyConstructor() {
+ void testCopyConstructor() throws CoderException {
assertThatThrownBy(() -> new ControlLoopAck((ControlLoopAck) null))
.isInstanceOf(NullPointerException.class);
- final ControlLoopAck orig = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_UPDATE);
+ final var orig = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_UPDATE);
// verify with null values
assertEquals(removeVariableFields(orig.toString()),
removeVariableFields(new ControlLoopAck(orig).toString()));
// verify with all values
- ToscaConceptIdentifier id = new ToscaConceptIdentifier("id", "1.2.3");
+ var id = new ToscaConceptIdentifier("id", "1.2.3");
orig.setControlLoopId(id);
orig.setParticipantId(id);
orig.setParticipantType(id);
-
- Pair<Boolean, String> clElementResult = Pair.of(true, "ControlLoopElement result");
- final Map<UUID, Pair<Boolean, String>> controlLoopResultMap = Map.of(UUID.randomUUID(), clElementResult);
+ var clElementResult = new ControlLoopElementAck(true, "ControlLoopElement result");
+ final var controlLoopResultMap = Map.of(UUID.randomUUID(), clElementResult);
orig.setControlLoopResultMap(controlLoopResultMap);
orig.setResponseTo(UUID.randomUUID());
@@ -59,5 +60,7 @@ class ControlLoopAckTest {
assertEquals(removeVariableFields(orig.toString()),
removeVariableFields(new ControlLoopAck(orig).toString()));
+
+ assertSerializable(orig, ControlLoopAck.class);
}
}
diff --git a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopUpdateTest.java b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopUpdateTest.java
index 3aafe56f1..1b155a12a 100644
--- a/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopUpdateTest.java
+++ b/models/src/test/java/org/onap/policy/clamp/controlloop/models/messages/dmaap/participant/ControlLoopUpdateTest.java
@@ -22,6 +22,7 @@ package org.onap.policy.clamp.controlloop.models.messages.dmaap.participant;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
+import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.assertSerializable;
import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.removeVariableFields;
import java.time.Instant;
@@ -33,6 +34,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantUpdates;
+import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
/**
@@ -40,7 +42,7 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
*/
class ControlLoopUpdateTest {
@Test
- void testCopyConstructor() {
+ void testCopyConstructor() throws CoderException {
assertThatThrownBy(() -> new ControlLoopUpdate(null)).isInstanceOf(NullPointerException.class);
ControlLoopUpdate orig = new ControlLoopUpdate();
@@ -71,5 +73,6 @@ class ControlLoopUpdateTest {
ControlLoopUpdate other = new ControlLoopUpdate(orig);
assertEquals(removeVariableFields(orig.toString()), removeVariableFields(other.toString()));
+ assertSerializable(orig, ControlLoopUpdate.class);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java
index 680acd276..e11c883b4 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java
@@ -25,52 +25,41 @@ import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
-import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
-import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
-import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.models.base.PfModelException;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* This class sends messages from participants to CLAMP.
*/
+@Component
public class MessageSender extends TimerTask implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class);
private final ParticipantHandler participantHandler;
- private final ParticipantMessagePublisher publisher;
private ScheduledExecutorService timerPool;
/**
* Constructor, set the publisher.
*
* @param participantHandler the participant handler to use for gathering information
- * @param publisher the publisher to use for sending messages
- * @param interval time interval to send Participant Status periodic messages
+ * @param parameters the parameters of the participant
*/
- public MessageSender(ParticipantHandler participantHandler, ParticipantMessagePublisher publisher,
- long interval) {
+ public MessageSender(ParticipantHandler participantHandler, ParticipantParameters parameters) {
this.participantHandler = participantHandler;
- this.publisher = publisher;
// Kick off the timer
timerPool = makeTimerPool();
+ var interval = parameters.getIntermediaryParameters().getReportingTimeIntervalMs();
timerPool.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
}
@Override
public void run() {
LOGGER.debug("Sent heartbeat to CLAMP");
- this.sendHeartbeat();
+ participantHandler.sendHeartbeat();
}
@Override
@@ -79,97 +68,6 @@ public class MessageSender extends TimerTask implements Closeable {
}
/**
- * Send a response message for this participant.
- *
- * @param ackMessage the details to include in the response message
- */
- public void sendAckResponse(ControlLoopAck ackMessage) {
- sendAckResponse(null, ackMessage);
- }
-
- /**
- * Dispatch a response message for this participant.
- *
- * @param controlLoopId the control loop to which this message is a response
- * @param ackMessage the details to include in the response message
- */
- public void sendAckResponse(ToscaConceptIdentifier controlLoopId, ControlLoopAck ackMessage) {
- // Participant related fields
- ackMessage.setParticipantType(participantHandler.getParticipantType());
- ackMessage.setParticipantId(participantHandler.getParticipantId());
- publisher.sendControlLoopAck(ackMessage);
- }
-
- /**
- * Send a ParticipantRegister message for this participant.
- *
- * @param message the participantRegister message
- */
- public void sendParticipantRegister(ParticipantRegister message) {
- publisher.sendParticipantRegister(message);
- }
-
- /**
- * Send a ParticipantDeregister message for this participant.
- *
- * @param message the participantDeRegister message
- */
- public void sendParticipantDeregister(ParticipantDeregister message) {
- publisher.sendParticipantDeregister(message);
- }
-
- /**
- * Send a ParticipantUpdateAck message for this participant update.
- *
- * @param message the participantUpdateAck message
- */
- public void sendParticipantUpdateAck(ParticipantUpdateAck message) {
- publisher.sendParticipantUpdateAck(message);
- }
-
- /**
- * Send a ParticipantStatus message for this participant.
- *
- * @param participantStatus the ParticipantStatus message
- */
- public void sendParticipantStatus(ParticipantStatus participantStatus) {
- var controlLoops = participantHandler.getControlLoopHandler().getControlLoops();
- for (ControlLoopElementListener clElementListener :
- participantHandler.getControlLoopHandler().getListeners()) {
- updateClElementStatistics(controlLoops, clElementListener);
- }
-
- publisher.sendParticipantStatus(participantStatus);
- }
-
- /**
- * Dispatch a heartbeat for this participant.
- */
- public void sendHeartbeat() {
- publisher.sendHeartbeat(participantHandler.makeHeartbeat(false));
- }
-
- /**
- * Update ControlLoopElement statistics. The control loop elements listening will be
- * notified to retrieve statistics from respective controlloop elements, and controlloopelements
- * data on the handler will be updated.
- *
- * @param controlLoops the control loops
- * @param clElementListener control loop element listener
- */
- public void updateClElementStatistics(ControlLoops controlLoops, ControlLoopElementListener clElementListener) {
- for (ControlLoop controlLoop : controlLoops.getControlLoopList()) {
- for (ControlLoopElement element : controlLoop.getElements().values()) {
- try {
- clElementListener.handleStatistics(element.getId());
- } catch (PfModelException e) {
- LOGGER.debug("Getting statistics for Control loop element failed");
- }
- }
- }
- }
-
- /**
* Makes a new timer pool.
*
* @return a new timer pool
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
index 0e276f390..8b4c61dca 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
@@ -29,12 +29,11 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.Getter;
-import lombok.NoArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatistics;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementAck;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
@@ -45,24 +44,25 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Contr
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/*
* This class is responsible for managing the state of all control loops in the participant.
*/
-@NoArgsConstructor
+@Component
public class ControlLoopHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopHandler.class);
- private ToscaConceptIdentifier participantType = null;
- private ToscaConceptIdentifier participantId = null;
- private MessageSender messageSender = null;
+ private final ToscaConceptIdentifier participantType;
+ private final ToscaConceptIdentifier participantId;
+ private final ParticipantMessagePublisher publisher;
@Getter
private final Map<ToscaConceptIdentifier, ControlLoop> controlLoopMap = new LinkedHashMap<>();
@@ -77,12 +77,12 @@ public class ControlLoopHandler {
* Constructor, set the participant ID and messageSender.
*
* @param parameters the parameters of the participant
- * @param messageSender the messageSender for sending responses to messages
+ * @param publisher the ParticipantMessage Publisher
*/
- public ControlLoopHandler(ParticipantIntermediaryParameters parameters, MessageSender messageSender) {
- this.participantType = parameters.getParticipantType();
- this.participantId = parameters.getParticipantId();
- this.messageSender = messageSender;
+ public ControlLoopHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher) {
+ this.participantType = parameters.getIntermediaryParameters().getParticipantType();
+ this.participantId = parameters.getIntermediaryParameters().getParticipantId();
+ this.publisher = publisher;
}
public void registerControlLoopElementListener(ControlLoopElementListener listener) {
@@ -104,18 +104,20 @@ public class ControlLoopHandler {
LOGGER.warn("Cannot update Control loop element state, id is null");
}
- var controlLoopStateChangeAck =
- new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_STATECHANGE_ACK);
ControlLoopElement clElement = elementsOnThisParticipant.get(id);
if (clElement != null) {
+ var controlLoopStateChangeAck =
+ new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_STATECHANGE_ACK);
+ controlLoopStateChangeAck.setParticipantId(participantId);
+ controlLoopStateChangeAck.setParticipantType(participantType);
clElement.setOrderedState(orderedState);
clElement.setState(newState);
controlLoopStateChangeAck.getControlLoopResultMap().put(clElement.getId(),
- Pair.of(true, "Control loop element {} state changed to {}\", id, newState)"));
+ new ControlLoopElementAck(true, "Control loop element {} state changed to {}\", id, newState)"));
LOGGER.debug("Control loop element {} state changed to {}", id, newState);
controlLoopStateChangeAck.setMessage("ControlLoopElement state changed to {} " + newState);
controlLoopStateChangeAck.setResult(true);
- messageSender.sendAckResponse(controlLoopStateChangeAck);
+ publisher.sendControlLoopAck(controlLoopStateChangeAck);
return clElement;
}
return null;
@@ -147,15 +149,17 @@ public class ControlLoopHandler {
}
var controlLoop = controlLoopMap.get(stateChangeMsg.getControlLoopId());
- var controlLoopAck = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE);
if (controlLoop == null) {
+ var controlLoopAck = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE);
+ controlLoopAck.setParticipantId(participantId);
+ controlLoopAck.setParticipantType(participantType);
controlLoopAck.setMessage("Control loop " + stateChangeMsg.getControlLoopId()
+ " does not use this participant " + participantId);
controlLoopAck.setResult(false);
controlLoopAck.setResponseTo(stateChangeMsg.getMessageId());
controlLoopAck.setControlLoopId(stateChangeMsg.getControlLoopId());
- messageSender.sendAckResponse(controlLoopAck);
+ publisher.sendControlLoopAck(controlLoopAck);
LOGGER.debug("Control loop {} does not use this participant", stateChangeMsg.getControlLoopId());
return;
}
@@ -200,17 +204,19 @@ public class ControlLoopHandler {
var controlLoop = controlLoopMap.get(updateMsg.getControlLoopId());
- var controlLoopUpdateAck = new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_UPDATE_ACK);
-
// TODO: Updates to existing ControlLoops are not supported yet (Addition/Removal of ControlLoop
// elements to existing ControlLoop has to be supported).
if (controlLoop != null) {
+ var controlLoopUpdateAck = new ControlLoopAck(ParticipantMessageType.CONTROLLOOP_UPDATE_ACK);
+ controlLoopUpdateAck.setParticipantId(participantId);
+ controlLoopUpdateAck.setParticipantType(participantType);
+
controlLoopUpdateAck.setMessage("Control loop " + updateMsg.getControlLoopId()
+ " already defined on participant " + participantId);
controlLoopUpdateAck.setResult(false);
controlLoopUpdateAck.setResponseTo(updateMsg.getMessageId());
controlLoopUpdateAck.setControlLoopId(updateMsg.getControlLoopId());
- messageSender.sendAckResponse(controlLoopUpdateAck);
+ publisher.sendControlLoopAck(controlLoopUpdateAck);
return;
}
@@ -319,10 +325,12 @@ public class ControlLoopHandler {
if (orderedState.equals(controlLoop.getOrderedState())) {
var controlLoopAck = new ControlLoopAck(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE);
+ controlLoopAck.setParticipantId(participantId);
+ controlLoopAck.setParticipantType(participantType);
controlLoopAck.setMessage("Control loop is already in state" + orderedState);
controlLoopAck.setResult(false);
controlLoopAck.setControlLoopId(controlLoop.getDefinition());
- messageSender.sendAckResponse(controlLoopAck);
+ publisher.sendControlLoopAck(controlLoopAck);
return;
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
index 66e09e7f6..d3f6c4d25 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
@@ -22,7 +22,6 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
-import java.io.Closeable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
@@ -33,9 +32,11 @@ import lombok.Getter;
import lombok.Setter;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatisticsList;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopInfo;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopStatistics;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantDefinition;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus;
@@ -53,9 +54,10 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Parti
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender;
+import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener;
import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher;
import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
import org.slf4j.Logger;
@@ -67,14 +69,14 @@ import org.springframework.stereotype.Component;
*/
@Getter
@Component
-public class ParticipantHandler implements Closeable {
+public class ParticipantHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHandler.class);
private final ToscaConceptIdentifier participantType;
private final ToscaConceptIdentifier participantId;
- private final MessageSender sender;
private final ControlLoopHandler controlLoopHandler;
private final ParticipantStatistics participantStatistics;
+ private final ParticipantMessagePublisher publisher;
@Setter
private ParticipantState state = ParticipantState.UNKNOWN;
@@ -82,10 +84,9 @@ public class ParticipantHandler implements Closeable {
@Setter
private ParticipantHealthStatus healthStatus = ParticipantHealthStatus.UNKNOWN;
- private List<ControlLoopElementDefinition> clElementDefsOnThisParticipant =
- new ArrayList<>();
+ private final List<ControlLoopElementDefinition> clElementDefsOnThisParticipant = new ArrayList<>();
- public ToscaServiceTemplate toscaServiceTemplate = new ToscaServiceTemplate();
+ private ToscaServiceTemplate toscaServiceTemplate = new ToscaServiceTemplate();
/**
* Constructor, set the participant ID and sender.
@@ -93,13 +94,12 @@ public class ParticipantHandler implements Closeable {
* @param parameters the parameters of the participant
* @param publisher the publisher for sending responses to messages
*/
- public ParticipantHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher) {
+ public ParticipantHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher,
+ ControlLoopHandler controlLoopHandler) {
this.participantType = parameters.getIntermediaryParameters().getParticipantType();
this.participantId = parameters.getIntermediaryParameters().getParticipantId();
- this.sender =
- new MessageSender(this, publisher,
- parameters.getIntermediaryParameters().getReportingTimeIntervalMs());
- this.controlLoopHandler = new ControlLoopHandler(parameters.getIntermediaryParameters(), sender);
+ this.publisher = publisher;
+ this.controlLoopHandler = controlLoopHandler;
this.participantStatistics = new ParticipantStatistics();
this.participantStatistics.setParticipantId(participantId);
this.participantStatistics.setState(state);
@@ -107,18 +107,40 @@ public class ParticipantHandler implements Closeable {
this.participantStatistics.setTimeStamp(Instant.now());
}
- @Override
- public void close() {
- sender.close();
- }
-
/**
* Method which handles a participant health check event from clamp.
*
* @param participantStatusReqMsg participant participantStatusReq message
*/
public void handleParticipantStatusReq(final ParticipantStatusReq participantStatusReqMsg) {
- sender.sendParticipantStatus(makeHeartbeat(true));
+ var controlLoops = controlLoopHandler.getControlLoops();
+ for (ControlLoopElementListener clElementListener : controlLoopHandler.getListeners()) {
+ updateClElementStatistics(controlLoops, clElementListener);
+ }
+
+ var participantStatus = makeHeartbeat(true);
+ publisher.sendParticipantStatus(participantStatus);
+ }
+
+ /**
+ * Update ControlLoopElement statistics. The control loop elements listening will be
+ * notified to retrieve statistics from respective controlloop elements, and controlloopelements
+ * data on the handler will be updated.
+ *
+ * @param controlLoops the control loops
+ * @param clElementListener control loop element listener
+ */
+ private void updateClElementStatistics(ControlLoops controlLoops, ControlLoopElementListener clElementListener) {
+ for (ControlLoop controlLoop : controlLoops.getControlLoopList()) {
+ for (ControlLoopElement element : controlLoop.getElements().values()) {
+ try {
+ clElementListener.handleStatistics(element.getId());
+ } catch (PfModelException e) {
+ LOGGER.debug("Getting statistics for Control loop element failed for element ID {}",
+ element.getId(), e);
+ }
+ }
+ }
}
/**
@@ -165,7 +187,7 @@ public class ParticipantHandler implements Closeable {
var participantUpdateAck = new ParticipantUpdateAck();
handleStateChange(participantState, participantUpdateAck);
- sender.sendParticipantUpdateAck(participantUpdateAck);
+ publisher.sendParticipantUpdateAck(participantUpdateAck);
return getParticipant(definition.getName(), definition.getVersion());
}
@@ -215,7 +237,7 @@ public class ParticipantHandler implements Closeable {
participantRegister.setParticipantId(participantId);
participantRegister.setParticipantType(participantType);
- sender.sendParticipantRegister(participantRegister);
+ publisher.sendParticipantRegister(participantRegister);
}
/**
@@ -236,7 +258,7 @@ public class ParticipantHandler implements Closeable {
participantDeregister.setParticipantId(participantId);
participantDeregister.setParticipantType(participantType);
- sender.sendParticipantDeregister(participantDeregister);
+ publisher.sendParticipantDeregister(participantDeregister);
}
/**
@@ -267,7 +289,8 @@ public class ParticipantHandler implements Closeable {
// This message is to commission the controlloop
for (ParticipantDefinition participantDefinition : participantUpdateMsg.getParticipantDefinitionUpdates()) {
if (participantDefinition.getParticipantId().equals(participantType)) {
- clElementDefsOnThisParticipant = participantDefinition.getControlLoopElementDefinitionList();
+ clElementDefsOnThisParticipant.clear();
+ clElementDefsOnThisParticipant.addAll(participantDefinition.getControlLoopElementDefinitionList());
break;
}
}
@@ -289,7 +312,14 @@ public class ParticipantHandler implements Closeable {
participantUpdateAck.setParticipantId(participantId);
participantUpdateAck.setParticipantType(participantType);
- sender.sendParticipantUpdateAck(participantUpdateAck);
+ publisher.sendParticipantUpdateAck(participantUpdateAck);
+ }
+
+ /**
+ * Dispatch a heartbeat for this participant.
+ */
+ public void sendHeartbeat() {
+ publisher.sendHeartbeat(makeHeartbeat(false));
}
/**
@@ -322,15 +352,14 @@ public class ParticipantHandler implements Closeable {
private List<ControlLoopInfo> getControlLoopInfoList() {
List<ControlLoopInfo> controlLoopInfoList = new ArrayList<>();
- for (Map.Entry<ToscaConceptIdentifier, ControlLoop> entry :
- controlLoopHandler.getControlLoopMap().entrySet()) {
+ for (Map.Entry<ToscaConceptIdentifier, ControlLoop> entry : controlLoopHandler.getControlLoopMap().entrySet()) {
ControlLoopInfo clInfo = new ControlLoopInfo();
clInfo.setControlLoopId(entry.getKey());
ControlLoopStatistics clStatitistics = new ControlLoopStatistics();
clStatitistics.setControlLoopId(entry.getKey());
ClElementStatisticsList clElementStatisticsList = new ClElementStatisticsList();
- clElementStatisticsList.setClElementStatistics(
- entry.getValue().getControlLoopElementStatisticsList(entry.getValue()));
+ clElementStatisticsList
+ .setClElementStatistics(entry.getValue().getControlLoopElementStatisticsList(entry.getValue()));
clStatitistics.setClElementStatisticsList(clElementStatisticsList);
clInfo.setControlLoopStatistics(clStatitistics);
clInfo.setState(entry.getValue().getState());