From c16142da394ba515aa639aa87d88137a73c0c8ac Mon Sep 17 00:00:00 2001 From: Sirisha_Manchikanti Date: Wed, 14 Jul 2021 23:48:53 +0100 Subject: Add support for new messages in controloop runtime Receive ParticipantRegister, ParticipantDeregister and ParticipantUpdateAck messages from participants, Send ParticipantRegisterAck, ParticipantDeregisterAck and ParticipantUpdate from controlloop runtime to participants. Issue-ID: POLICY-3414 Signed-off-by: Sirisha_Manchikanti Change-Id: Ib5cb7d582974e34e8a226f640747c596ac5b5beb --- .../main/parameters/ClRuntimeParameterGroup.java | 3 + .../runtime/supervision/SupervisionHandler.java | 126 ++++++++++++++++++++- .../comm/AbstractParticipantAckPublisher.java | 62 ++++++++++ .../comm/ParticipantDeregisterAckPublisher.java | 32 ++++++ .../comm/ParticipantDeregisterListener.java | 67 +++++++++++ .../comm/ParticipantRegisterAckPublisher.java | 32 ++++++ .../comm/ParticipantRegisterListener.java | 67 +++++++++++ .../comm/ParticipantStatusListener.java | 2 +- .../comm/ParticipantUpdateAckListener.java | 68 +++++++++++ .../comm/ParticipantUpdatePublisher.java | 32 ++++++ 10 files changed, 488 insertions(+), 3 deletions(-) create mode 100644 runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java create mode 100644 runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java create mode 100644 runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java create mode 100644 runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java create mode 100644 runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java create mode 100644 runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java create mode 100644 runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java (limited to 'runtime-controlloop/src/main') diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java index d1fa31261..433eeeebb 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/main/parameters/ClRuntimeParameterGroup.java @@ -45,6 +45,9 @@ public class ClRuntimeParameterGroup extends ParameterGroupImpl { private long participantStateChangeIntervalSec; private long participantClUpdateIntervalSec; private long participantClStateChangeIntervalSec; + private long participantRegisterAckIntervalSec; + private long participantDeregisterAckIntervalSec; + private long participantUpdateIntervalSec; /** * Create the Control Loop parameter group. diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java index aba545750..5e94d293e 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/SupervisionHandler.java @@ -20,21 +20,46 @@ package org.onap.policy.clamp.controlloop.runtime.supervision; +import java.time.Instant; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; import javax.ws.rs.core.Response; import lombok.AllArgsConstructor; import org.apache.commons.collections4.CollectionUtils; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopException; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider; import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; +import org.onap.policy.clamp.controlloop.runtime.commissioning.CommissioningProvider; +import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup; import org.onap.policy.clamp.controlloop.runtime.monitoring.MonitoringProvider; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopStateChangePublisher; import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantControlLoopUpdatePublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantDeregisterAckPublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantRegisterAckPublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStateChangePublisher; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener; +import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; +import org.onap.policy.common.utils.services.ServiceManager; +import org.onap.policy.common.utils.services.ServiceManagerException; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; @@ -60,10 +85,14 @@ public class SupervisionHandler { private final ControlLoopProvider controlLoopProvider; private final ParticipantProvider participantProvider; private final MonitoringProvider monitoringProvider; + private final CommissioningProvider commissioningProvider; // Publishers for participant communication private final ParticipantControlLoopUpdatePublisher controlLoopUpdatePublisher; private final ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher; + private final ParticipantRegisterAckPublisher participantRegisterAckPublisher; + private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher; + private final ParticipantUpdatePublisher participantUpdatePublisher; /** * Supervision trigger called when a command is issued on control loops. @@ -102,9 +131,8 @@ public class SupervisionHandler { * * @param participantStatusMessage the ParticipantStatus message received from a participant */ - public void handleParticipantStatusMessage(ParticipantStatus participantStatusMessage) { + public void handleParticipantMessage(ParticipantStatus participantStatusMessage) { LOGGER.debug("Participant Status received {}", participantStatusMessage); - try { superviseParticipant(participantStatusMessage); } catch (PfModelException | ControlLoopException svExc) { @@ -119,6 +147,36 @@ public class SupervisionHandler { } } + /** + * Handle a ParticipantRegister message from a participant. + * + * @param participantRegisterMessage the ParticipantRegister message received from a participant + */ + public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) { + LOGGER.debug("Participant Register received {}", participantRegisterMessage); + sendParticipantAckMessage(participantRegisterMessage); + sendParticipantUpdate(participantRegisterMessage); + } + + /** + * Handle a ParticipantDeregister message from a participant. + * + * @param participantDeregisterMessage the ParticipantDeregister message received from a participant + */ + public void handleParticipantMessage(ParticipantDeregister participantDeregisterMessage) { + LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage); + sendParticipantAckMessage(participantDeregisterMessage); + } + + /** + * Handle a ParticipantUpdateAck message from a participant. + * + * @param participantUpdateAckMessage the ParticipantUpdateAck message received from a participant + */ + public void handleParticipantMessage(ParticipantUpdateAck participantUpdateAckMessage) { + LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage); + } + /** * Supervise a control loop, performing whatever actions need to be performed on the control loop. * @@ -232,6 +290,70 @@ public class SupervisionHandler { } } + private void sendControlLoopUpdate(ControlLoop controlLoop) throws PfModelException { + var pclu = new ParticipantControlLoopUpdate(); + pclu.setControlLoopId(controlLoop.getKey().asIdentifier()); + pclu.setControlLoop(controlLoop); + // TODO: We should look up the correct TOSCA node template here for the control loop + // Tiny hack implemented to return the tosca service template entry from the database and be passed onto dmaap + pclu.setControlLoopDefinition(commissioningProvider.getToscaServiceTemplate(null, null)); + controlLoopUpdatePublisher.send(pclu); + } + + private void sendControlLoopStateChange(ControlLoop controlLoop) { + var clsc = new ParticipantControlLoopStateChange(); + clsc.setControlLoopId(controlLoop.getKey().asIdentifier()); + clsc.setMessageId(UUID.randomUUID()); + clsc.setOrderedState(controlLoop.getOrderedState()); + controlLoopStateChangePublisher.send(clsc); + } + + private void sendParticipantUpdate(ParticipantRegister participantRegisterMessage) { + var message = new ParticipantUpdate(); + message.setParticipantId(participantRegisterMessage.getParticipantId()); + message.setParticipantType(participantRegisterMessage.getParticipantType()); + message.setTimestamp(Instant.now()); + + ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition(); + clDefinition.setId(UUID.randomUUID()); + + try { + clDefinition.setControlLoopElementToscaServiceTemplate(commissioningProvider + .getToscaServiceTemplate(null, null)); + } catch (PfModelException pfme) { + LOGGER.warn("Get of tosca service template failed, cannot send participantupdate", pfme); + return; + } + + Map controlLoopElementDefinitionMap = new LinkedHashMap<>(); + controlLoopElementDefinitionMap.put(UUID.randomUUID(), clDefinition); + + Map> + participantDefinitionUpdateMap = new LinkedHashMap<>(); + participantDefinitionUpdateMap.put(participantRegisterMessage.getParticipantId(), + controlLoopElementDefinitionMap); + message.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap); + + LOGGER.debug("Participant Update sent", message); + participantUpdatePublisher.send(message); + } + + private void sendParticipantAckMessage(ParticipantRegister participantRegisterMessage) { + var message = new ParticipantRegisterAck(); + message.setResponseTo(participantRegisterMessage.getMessageId()); + message.setMessage("Participant Register Ack"); + message.setResult(true); + participantRegisterAckPublisher.send(message); + } + + private void sendParticipantAckMessage(ParticipantDeregister participantDeregisterMessage) { + var message = new ParticipantDeregisterAck(); + message.setResponseTo(participantDeregisterMessage.getMessageId()); + message.setMessage("Participant Deregister Ack"); + message.setResult(true); + participantDeregisterAckPublisher.send(message); + } + private void superviseParticipant(ParticipantStatus participantStatusMessage) throws PfModelException, ControlLoopException { if (participantStatusMessage.getParticipantId() == null) { diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java new file mode 100644 index 000000000..4b4ca9915 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/AbstractParticipantAckPublisher.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import java.util.List; +import javax.ws.rs.core.Response.Status; +import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Publisher; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; + +public abstract class AbstractParticipantAckPublisher implements Publisher { + + private TopicSinkClient topicSinkClient; + private boolean active = false; + + /** + * Method to send Participant message to participants on demand. + * + * @param participantMessage the Participant message + */ + public void send(final E participantMessage) { + if (!active) { + throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!"); + } + topicSinkClient.send(participantMessage); + } + + + @Override + public void active(List topicSinks) { + if (topicSinks.size() != 1) { + throw new IllegalArgumentException("Topic Sink must be one"); + } + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + active = true; + } + + @Override + public void stop() { + active = false; + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java new file mode 100644 index 000000000..c0fcb3e7d --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterAckPublisher.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; +import org.springframework.stereotype.Component; + +/** + * This class is used to send ParticipantDeregisterAck messages to participants on DMaaP. + */ +@Component +public class ParticipantDeregisterAckPublisher extends AbstractParticipantAckPublisher { + +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java new file mode 100644 index 000000000..a03ff0a63 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantDeregisterListener.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener; +import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Listener for ParticipantDeregister messages sent by participants. + */ +@Component +public class ParticipantDeregisterListener extends ScoListener implements Listener { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantDeregisterListener.class); + + private final SupervisionHandler supervisionHandler; + + /** + * Constructs the object. + */ + public ParticipantDeregisterListener(SupervisionHandler supervisionHandler) { + super(ParticipantDeregister.class); + this.supervisionHandler = supervisionHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantDeregister participantDeregisterMessage) { + LOGGER.debug("ParticipantDeregister message received from participant - {}", participantDeregisterMessage); + supervisionHandler.handleParticipantMessage(participantDeregisterMessage); + } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_DEREGISTER.name(); + } + + @Override + public ScoListener getScoListener() { + return this; + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java new file mode 100644 index 000000000..2c0c4b393 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterAckPublisher.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; +import org.springframework.stereotype.Component; + +/** + * This class is used to send ParticipantRegisterAck messages to participants on DMaaP. + */ +@Component +public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPublisher { + +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java new file mode 100644 index 000000000..a4d8c7697 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantRegisterListener.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener; +import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Listener for ParticipantRegister messages sent by participants. + */ +@Component +public class ParticipantRegisterListener extends ScoListener implements Listener { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantRegisterListener.class); + + private final SupervisionHandler supervisionHandler; + + /** + * Constructs the object. + */ + public ParticipantRegisterListener(SupervisionHandler supervisionHandler) { + super(ParticipantRegister.class); + this.supervisionHandler = supervisionHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantRegister participantRegisterMessage) { + LOGGER.debug("ParticipantRegister message received from participant - {}", participantRegisterMessage); + supervisionHandler.handleParticipantMessage(participantRegisterMessage); + } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_REGISTER.name(); + } + + @Override + public ScoListener getScoListener() { + return this; + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java index 8fa076240..9da886026 100644 --- a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantStatusListener.java @@ -52,7 +52,7 @@ public class ParticipantStatusListener extends ScoListener im public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, final ParticipantStatus participantStatusMessage) { LOGGER.debug("ParticipantStatus message received from participant - {}", participantStatusMessage); - supervisionHandler.handleParticipantStatusMessage(participantStatusMessage); + supervisionHandler.handleParticipantMessage(participantStatusMessage); } @Override diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java new file mode 100644 index 000000000..b8538b1f7 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdateAckListener.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; +import org.onap.policy.clamp.controlloop.runtime.config.messaging.Listener; +import org.onap.policy.clamp.controlloop.runtime.supervision.SupervisionHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Listener for ParticipantUpdateAck messages sent by participants. + */ +@Component +public class ParticipantUpdateAckListener extends ScoListener implements Listener { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantUpdateAckListener.class); + + private final SupervisionHandler supervisionHandler; + + /** + * Constructs the object. + */ + public ParticipantUpdateAckListener(SupervisionHandler supervisionHandler) { + super(ParticipantUpdateAck.class); + this.supervisionHandler = supervisionHandler; + } + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco, + final ParticipantUpdateAck participantUpdateAckMessage) { + LOGGER.debug("ParticipantUpdateAck message received from participant - {}", participantUpdateAckMessage); + supervisionHandler.handleParticipantMessage(participantUpdateAckMessage); + } + + @Override + public String getType() { + return ParticipantMessageType.PARTICIPANT_UPDATE_ACK.name(); + } + + @Override + public ScoListener getScoListener() { + return this; + } +} diff --git a/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java new file mode 100644 index 000000000..5af5f1f54 --- /dev/null +++ b/runtime-controlloop/src/main/java/org/onap/policy/clamp/controlloop/runtime/supervision/comm/ParticipantUpdatePublisher.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.runtime.supervision.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.springframework.stereotype.Component; + +/** + * This class is used to send ParticipantUpdate messages to participants on DMaaP. + */ +@Component +public class ParticipantUpdatePublisher extends AbstractParticipantPublisher { + +} -- cgit 1.2.3-korg