summaryrefslogtreecommitdiffstats
path: root/runtime-controlloop/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'runtime-controlloop/src/main')
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java3
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java126
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java62
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java32
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java67
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java32
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java67
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java2
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java68
-rw-r--r--runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java32
10 files changed, 488 insertions, 3 deletions
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java
index d1fa31261..433eeeebb 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java
@@ -45,6 +45,9 @@ public class ClRuntimeParameterGroup extends ParameterGroupImpl {
private long participantStateChangeIntervalSec;
private long participantClUpdateIntervalSec;
private long participantClStateChangeIntervalSec;
+ private long participantRegisterAckIntervalSec;
+ private long participantDeregisterAckIntervalSec;
+ private long participantUpdateIntervalSec;
/**
* Create the Control Loop parameter group.
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
index aba545750..5e94d293e 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java
@@ -20,21 +20,46 @@
package org.onap.policy.clamp.controlloop.runtime.supervision;
+import java.time.Instant;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import javax.ws.rs.core.Response;
import lombok.AllArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider;
+import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopStateChangePublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopUpdatePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStateChangePublisher;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
+import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.onap.policy.common.utils.services.ServiceManager;
+import org.onap.policy.common.utils.services.ServiceManagerException;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
@@ -60,10 +85,14 @@ public class SupervisionHandler {
private final ControlLoopProvider controlLoopProvider;
private final ParticipantProvider participantProvider;
private final MonitoringProvider monitoringProvider;
+ private final CommissioningProvider commissioningProvider;
// Publishers for participant communication
private final ParticipantControlLoopUpdatePublisher controlLoopUpdatePublisher;
private final ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher;
+ private final ParticipantRegisterAckPublisher participantRegisterAckPublisher;
+ private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
+ private final ParticipantUpdatePublisher participantUpdatePublisher;
/**
* Supervision trigger called when a command is issued on control loops.
@@ -102,9 +131,8 @@ public class SupervisionHandler {
*
* @param participantStatusMessage the ParticipantStatus message received from a participant
*/
- public void handleParticipantStatusMessage(ParticipantStatus participantStatusMessage) {
+ public void handleParticipantMessage(ParticipantStatus participantStatusMessage) {
LOGGER.debug("Participant Status received {}", participantStatusMessage);
-
try {
superviseParticipant(participantStatusMessage);
} catch (PfModelException | ControlLoopException svExc) {
@@ -120,6 +148,36 @@ public class SupervisionHandler {
}
/**
+ * Handle a ParticipantRegister message from a participant.
+ *
+ * @param participantRegisterMessage the ParticipantRegister message received from a participant
+ */
+ public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
+ LOGGER.debug("Participant Register received {}", participantRegisterMessage);
+ sendParticipantAckMessage(participantRegisterMessage);
+ sendParticipantUpdate(participantRegisterMessage);
+ }
+
+ /**
+ * Handle a ParticipantDeregister message from a participant.
+ *
+ * @param participantDeregisterMessage the ParticipantDeregister message received from a participant
+ */
+ public void handleParticipantMessage(ParticipantDeregister participantDeregisterMessage) {
+ LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage);
+ sendParticipantAckMessage(participantDeregisterMessage);
+ }
+
+ /**
+ * Handle a ParticipantUpdateAck message from a participant.
+ *
+ * @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant
+ */
+ public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) {
+ LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage);
+ }
+
+ /**
* Supervise a control loop, performing whatever actions need to be performed on the control loop.
*
* @param controlLoop the control loop to supervises
@@ -232,6 +290,70 @@ public class SupervisionHandler {
}
}
+ private void sendControlLoopUpdate(ControlLoop controlLoop) throws PfModelException {
+ var pclu = new ParticipantControlLoopUpdate();
+ pclu.setControlLoopId(controlLoop.getKey().asIdentifier());
+ pclu.setControlLoop(controlLoop);
+ // TODO: We should look up the correct TOSCA node template here for the control loop
+ // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap
+ pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null));
+ controlLoopUpdatePublisher.send(pclu);
+ }
+
+ private void sendControlLoopStateChange(ControlLoop controlLoop) {
+ var clsc = new ParticipantControlLoopStateChange();
+ clsc.setControlLoopId(controlLoop.getKey().asIdentifier());
+ clsc.setMessageId(UUID.randomUUID());
+ clsc.setOrderedState(controlLoop.getOrderedState());
+ controlLoopStateChangePublisher.send(clsc);
+ }
+
+ private void sendParticipantUpdate(ParticipantRegister participantRegisterMessage) {
+ var message = new ParticipantUpdate();
+ message.setParticipantId(participantRegisterMessage.getParticipantId());
+ message.setParticipantType(participantRegisterMessage.getParticipantType());
+ message.setTimestamp(Instant.now());
+
+ ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition();
+ clDefinition.setId(UUID.randomUUID());
+
+ try {
+ clDefinition.setControlLoopElementToscaServiceTemplate(commissioningProvider
+ .getToscaServiceTemplate(null, null));
+ } catch (PfModelException pfme) {
+ LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme);
+ return;
+ }
+
+ Map<UUID, ControlLoopElementDefinition> controlLoopElementDefinitionMap = new LinkedHashMap<>();
+ controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition);
+
+ Map<ToscaConceptIdentifier, Map<UUID, ControlLoopElementDefinition>>
+ participantDefinitionUpdateMap = new LinkedHashMap<>();
+ participantDefinitionUpdateMap.put(participantRegisterMessage.getParticipantId(),
+ controlLoopElementDefinitionMap);
+ message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap);
+
+ LOGGER.debug("Participant Update sent", message);
+ participantUpdatePublisher.send(message);
+ }
+
+ private void sendParticipantAckMessage(ParticipantRegister participantRegisterMessage) {
+ var message = new ParticipantRegisterAck();
+ message.setResponseTo(participantRegisterMessage.getMessageId());
+ message.setMessage("Participant Register Ack");
+ message.setResult(true);
+ participantRegisterAckPublisher.send(message);
+ }
+
+ private void sendParticipantAckMessage(ParticipantDeregister participantDeregisterMessage) {
+ var message = new ParticipantDeregisterAck();
+ message.setResponseTo(participantDeregisterMessage.getMessageId());
+ message.setMessage("Participant Deregister Ack");
+ message.setResult(true);
+ participantDeregisterAckPublisher.send(message);
+ }
+
private void superviseParticipant(ParticipantStatus participantStatusMessage)
throws PfModelException, ControlLoopException {
if (participantStatusMessage.getParticipantId() == null) {
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java
new file mode 100644
index 000000000..4b4ca9915
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import java.util.List;
+import javax.ws.rs.core.Response.Status;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
+import org.onap.policy.clamp.controlloop.runtime.config.messaging.Publisher;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+
+public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMessage> implements Publisher {
+
+ private TopicSinkClient topicSinkClient;
+ private boolean active = false;
+
+ /**
+ * Method to send Participant message to participants on demand.
+ *
+ * @param participantMessage the Participant message
+ */
+ public void send(final E participantMessage) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
+ topicSinkClient.send(participantMessage);
+ }
+
+
+ @Override
+ public void active(List<TopicSink> topicSinks) {
+ if (topicSinks.size() != 1) {
+ throw new IllegalArgumentException("Topic Sink must be one");
+ }
+ this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ active = true;
+ }
+
+ @Override
+ public void stop() {
+ active = false;
+ }
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
new file mode 100644
index 000000000..c0fcb3e7d
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class is used to send ParticipantDeregisterAck messages to participants on DMaaP.
+ */
+@Component
+public class ParticipantDeregisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantDeregisterAck> {
+
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java
new file mode 100644
index 000000000..a03ff0a63
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
+import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * Listener for ParticipantDeregister messages sent by participants.
+ */
+@Component
+public class ParticipantDeregisterListener extends ScoListener<ParticipantDeregister> implements Listener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantDeregisterListener.class);
+
+ private final SupervisionHandler supervisionHandler;
+
+ /**
+ * Constructs the object.
+ */
+ public ParticipantDeregisterListener(SupervisionHandler supervisionHandler) {
+ super(ParticipantDeregister.class);
+ this.supervisionHandler = supervisionHandler;
+ }
+
+ @Override
+ public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
+ final ParticipantDeregister participantDeregisterMessage) {
+ LOGGER.debug("ParticipantDeregister message received from participant - {}", participantDeregisterMessage);
+ supervisionHandler.handleParticipantMessage(participantDeregisterMessage);
+ }
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_DEREGISTER.name();
+ }
+
+ @Override
+ public ScoListener<ParticipantDeregister> getScoListener() {
+ return this;
+ }
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
new file mode 100644
index 000000000..2c0c4b393
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class is used to send ParticipantRegisterAck messages to participants on DMaaP.
+ */
+@Component
+public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPublisher<ParticipantRegisterAck> {
+
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java
new file mode 100644
index 000000000..a4d8c7697
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
+import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * Listener for ParticipantRegister messages sent by participants.
+ */
+@Component
+public class ParticipantRegisterListener extends ScoListener<ParticipantRegister> implements Listener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantRegisterListener.class);
+
+ private final SupervisionHandler supervisionHandler;
+
+ /**
+ * Constructs the object.
+ */
+ public ParticipantRegisterListener(SupervisionHandler supervisionHandler) {
+ super(ParticipantRegister.class);
+ this.supervisionHandler = supervisionHandler;
+ }
+
+ @Override
+ public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
+ final ParticipantRegister participantRegisterMessage) {
+ LOGGER.debug("ParticipantRegister message received from participant - {}", participantRegisterMessage);
+ supervisionHandler.handleParticipantMessage(participantRegisterMessage);
+ }
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_REGISTER.name();
+ }
+
+ @Override
+ public ScoListener<ParticipantRegister> getScoListener() {
+ return this;
+ }
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java
index 8fa076240..9da886026 100644
--- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java
@@ -52,7 +52,7 @@ public class ParticipantStatusListener extends ScoListener<ParticipantStatus> im
public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
final ParticipantStatus participantStatusMessage) {
LOGGER.debug("ParticipantStatus message received from participant - {}", participantStatusMessage);
- supervisionHandler.handleParticipantStatusMessage(participantStatusMessage);
+ supervisionHandler.handleParticipantMessage(participantStatusMessage);
}
@Override
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
new file mode 100644
index 000000000..b8538b1f7
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener;
+import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * Listener for ParticipantUpdateAck messages sent by participants.
+ */
+@Component
+public class ParticipantUpdateAckListener extends ScoListener<ParticipantUpdateAck> implements Listener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantUpdateAckListener.class);
+
+ private final SupervisionHandler supervisionHandler;
+
+ /**
+ * Constructs the object.
+ */
+ public ParticipantUpdateAckListener(SupervisionHandler supervisionHandler) {
+ super(ParticipantUpdateAck.class);
+ this.supervisionHandler = supervisionHandler;
+ }
+
+ @Override
+ public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
+ final ParticipantUpdateAck participantUpdateAckMessage) {
+ LOGGER.debug("ParticipantUpdateAck message received from participant - {}", participantUpdateAckMessage);
+ supervisionHandler.handleParticipantMessage(participantUpdateAckMessage);
+ }
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_UPDATE_ACK.name();
+ }
+
+ @Override
+ public ScoListener<ParticipantUpdateAck> getScoListener() {
+ return this;
+ }
+}
diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
new file mode 100644
index 000000000..5af5f1f54
--- /dev/null
+++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class is used to send ParticipantUpdate messages to participants on DMaaP.
+ */
+@Component
+public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<ParticipantUpdate> {
+
+}