diff options
author | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2021-07-14 23:20:30 +0100 |
---|---|---|
committer | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2021-07-15 22:46:45 +0100 |
commit | c8d0fd08038a956d2588e540d8c31c46acf21377 (patch) | |
tree | e33f4c00b606292ca81880dd75db491c900e1af0 /participant/participant-intermediary/src | |
parent | 258fdc2ddb8b5e130ccc2b287c10c3fd782b7ee9 (diff) |
Handle participant register,deregister,update messages
Send ParticipantRegister, ParticipantDeregister and ParticipantUpdateAck
from participant to controlloop runtime
Receive ParticipantRegisterAck, ParticipantDeregisterAck and
ParticipantUpdate messages from controlloop runtime
Issue-ID: POLICY-3414
Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech>
Change-Id: Ifb3c4fa64bdd2d50ec2302b35f342847f4994c5d
Diffstat (limited to 'participant/participant-intermediary/src')
11 files changed, 399 insertions, 26 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java index a87299bdc..7e448dc15 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java @@ -46,6 +46,16 @@ public interface ParticipantIntermediaryApi { void registerControlLoopElementListener(ControlLoopElementListener controlLoopElementListener); /** + * Send participant register message to controlloop runtime. + */ + void sendParticipantRegister(); + + /** + * Send participant deregister message to controlloop runtime. + */ + void sendParticipantDeregister(); + + /** * Get participants loops from the intermediary API. * * @param name the participant name, null for all @@ -114,5 +124,4 @@ public interface ParticipantIntermediaryApi { * @param elementStatistics the updated statistics */ void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics); - } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java index 838f47544..9652f1a8d 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java @@ -63,6 +63,16 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp } @Override + public void sendParticipantRegister() { + participantHandler.sendParticipantRegister(); + } + + @Override + public void sendParticipantDeregister() { + participantHandler.sendParticipantDeregister(); + } + + @Override public List<Participant> getParticipants(String name, String version) { return List.of(participantHandler.getParticipant(name, version)); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java index 6926bc30b..1bfce1374 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java @@ -30,9 +30,12 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.onap.policy.models.base.PfModelException; @@ -47,7 +50,7 @@ public class MessageSender extends TimerTask implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class); private final ParticipantHandler participantHandler; - private final ParticipantStatusPublisher publisher; + private final ParticipantMessagePublisher publisher; private ScheduledExecutorService timerPool; /** @@ -57,7 +60,7 @@ public class MessageSender extends TimerTask implements Closeable { * @param publisher the publisher to use for sending messages * @param interval time interval to send Participant Status periodic messages */ - public MessageSender(ParticipantHandler participantHandler, ParticipantStatusPublisher publisher, + public MessageSender(ParticipantHandler participantHandler, ParticipantMessagePublisher publisher, long interval) { this.participantHandler = participantHandler; this.publisher = publisher; @@ -127,11 +130,38 @@ public class MessageSender extends TimerTask implements Closeable { status.setControlLoops(controlLoops); - publisher.send(status); + publisher.sendParticipantStatus(status); } /** - * Update ControlLoopElement statistics. The control loop elements listening will be + * Send a ParticipantRegister message for this participant. + * + * @param message the participantRegister message + */ + public void sendParticipantRegister(ParticipantRegister message) { + publisher.sendParticipantRegister(message); + } + + /** + * Send a ParticipantDeregister message for this participant. + * + * @param message the participantDeRegister message + */ + public void sendParticipantDeregister(ParticipantDeregister message) { + publisher.sendParticipantDeregister(message); + } + + /** + * Send a ParticipantUpdateAck message for this participant update. + * + * @param message the participantUpdateAck message + */ + public void sendParticipantUpdateAck(ParticipantUpdateAck message) { + publisher.sendParticipantUpdateAck(message); + } + + /** + * Update ControlLoopElement statistics. The control loop elements listening will be * notified to retrieve statistics from respective controlloop elements, and controlloopelements * data on the handler will be updated. * diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java new file mode 100644 index 000000000..262b21630 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.util.function.Consumer; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; + +/** + * Abstract Listener for Participant Ack messages sent by runtime. + */ +public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T> { + + private final ParticipantHandler participantHandler; + private final Consumer<T> consumer; + + /** + * Constructs the object. + * + * @param clazz class of message this handles + * @param participantHandler ParticipantHandler + * @param consumer function that handles the message + */ + protected ParticipantAckListener(Class<T> clazz, ParticipantHandler participantHandler, Consumer<T> consumer) { + super(clazz); + this.participantHandler = participantHandler; + this.consumer = consumer; + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { + consumer.accept(message); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java new file mode 100644 index 000000000..e20f481f8 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.springframework.stereotype.Component; + +/** + * Listener for Participant Deregister Ack messages sent by runtime. + * + */ +@Component +public class ParticipantDeregisterAckListener extends ParticipantAckListener<ParticipantDeregisterAck> { + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ParticipantDeregisterAckListener(final ParticipantHandler participantHandler) { + super(ParticipantDeregisterAck.class, participantHandler, participantHandler::handleParticipantDeregisterAck); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java index 78b998453..9e1b84620 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java @@ -21,7 +21,10 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.comm; import java.util.List; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; import org.slf4j.Logger; @@ -31,17 +34,20 @@ import org.slf4j.LoggerFactory; * This class is used to send Participant Status messages to clamp using TopicSinkClient. * */ -public class ParticipantStatusPublisher { - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusPublisher.class); +public class ParticipantMessagePublisher { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class); private final TopicSinkClient topicSinkClient; /** - * Constructor for instantiating ParticipantStatusPublisher. + * Constructor for instantiating ParticipantMessagePublisher. * * @param topicSinks the topic sinks */ - public ParticipantStatusPublisher(List<TopicSink> topicSinks) { + public ParticipantMessagePublisher(List<TopicSink> topicSinks) { + if (topicSinks.size() != 1) { + throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1"); + } this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); } @@ -50,8 +56,38 @@ public class ParticipantStatusPublisher { * * @param participantStatus the Participant Status */ - public void send(final ParticipantStatus participantStatus) { + public void sendParticipantStatus(final ParticipantStatus participantStatus) { topicSinkClient.send(participantStatus); LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus); } + + /** + * Method to send Participant Status message to clamp on demand. + * + * @param participantRegister the Participant Status + */ + public void sendParticipantRegister(final ParticipantRegister participantRegister) { + topicSinkClient.send(participantRegister); + LOGGER.debug("Sent Participant Register message to CLAMP - {}", participantRegister); + } + + /** + * Method to send Participant Status message to clamp on demand. + * + * @param participantDeregister the Participant Status + */ + public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) { + topicSinkClient.send(participantDeregister); + LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister); + } + + /** + * Method to send Participant Update Ack message to runtime. + * + * @param participantUpdateAck the Participant Update Ack + */ + public void sendParticipantUpdateAck(final ParticipantUpdateAck participantUpdateAck) { + topicSinkClient.send(participantUpdateAck); + LOGGER.debug("Sent Participant Update Ack message to CLAMP - {}", participantUpdateAck); + } } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java new file mode 100644 index 000000000..a15a2a850 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.springframework.stereotype.Component; + +/** + * Listener for Participant Register Ack messages sent by runtime. + * + */ +@Component +public class ParticipantRegisterAckListener extends ParticipantAckListener<ParticipantRegisterAck> { + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ParticipantRegisterAckListener(final ParticipantHandler participantHandler) { + super(ParticipantRegisterAck.class, participantHandler, participantHandler::handleParticipantRegisterAck); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java new file mode 100644 index 000000000..42bd52d9a --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java @@ -0,0 +1,41 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.springframework.stereotype.Component; + +/** + * Listener for Participant Update messages sent by runtime. + */ +@Component +public class ParticipantUpdateListener extends ParticipantListener<ParticipantUpdate> { + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ParticipantUpdateListener(final ParticipantHandler participantHandler) { + super(ParticipantUpdate.class, participantHandler, participantHandler::handleParticipantUpdate); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java index dc7d87eec..e363504a5 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java @@ -21,7 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.config; import java.util.List; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; @@ -36,16 +36,16 @@ public class BeanFactory { private static final String[] MSG_TYPE_NAMES = {"messageType"}; /** - * create ParticipantStatusPublisher. + * create ParticipantMessagePublisher. * * @param parameters the ParticipantParameters - * @return ParticipantStatusPublisher + * @return ParticipantMessagePublisher */ @Bean - public ParticipantStatusPublisher publisher(final ParticipantParameters parameters) { + public ParticipantMessagePublisher publisher(final ParticipantParameters parameters) { List<TopicSink> topicSinks = TopicEndpointManager.getManager() .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); - return new ParticipantStatusPublisher(topicSinks); + return new ParticipantMessagePublisher(topicSinks); } @Bean diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java index 2d789d40d..0aa536746 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -24,10 +24,14 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantHealthCheckListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStateChangeListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSource; @@ -50,14 +54,18 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl // Topics from which the participant receives and to which the participant sends messages private List<TopicSource> topicSources; + ParticipantIntermediaryApi participantIntermediaryApi; + /** * Instantiate the activator for participant. * * @param applicationContext ApplicationContext * @param parameters the ParticipantParameters */ - public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters) { + public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters, + ParticipantIntermediaryApi participantIntermediaryApi) { this.applicationContext = applicationContext; + this.participantIntermediaryApi = participantIntermediaryApi; topicSources = TopicEndpointManager.getManager() .addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources()); @@ -81,6 +89,7 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) { if (!isAlive()) { start(); + sendParticipantRegister(); } } @@ -92,10 +101,19 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl @EventListener public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) { if (isAlive()) { + sendParticipantDeregister(); stop(); } } + private void sendParticipantRegister() { + participantIntermediaryApi.sendParticipantRegister(); + } + + private void sendParticipantDeregister() { + participantIntermediaryApi.sendParticipantDeregister(); + } + /** * Registers the dispatcher with the topic source(s). */ @@ -114,6 +132,15 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_UPDATE.name(), applicationContext.getBean(ControlLoopUpdateListener.class)); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(), + applicationContext.getBean(ParticipantRegisterAckListener.class)); + + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(), + applicationContext.getBean(ParticipantDeregisterAckListener.class)); + + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_UPDATE.name(), + applicationContext.getBean(ParticipantUpdateListener.class)); + for (final TopicSource source : topicSources) { source.register(msgDispatcher); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java index 1c54658fa..a8913c1f0 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -21,22 +21,32 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; import java.io.Closeable; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Objects; +import java.util.UUID; import lombok.Getter; import lombok.Setter; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; @@ -63,13 +73,15 @@ public class ParticipantHandler implements Closeable { @Setter private ParticipantHealthStatus healthStatus = ParticipantHealthStatus.UNKNOWN; + private final Map<UUID, ControlLoopElementDefinition> clElementDefsOnThisParticipant = new LinkedHashMap<>(); + /** * Constructor, set the participant ID and sender. * * @param parameters the parameters of the participant * @param publisher the publisher for sending responses to messages */ - public ParticipantHandler(ParticipantParameters parameters, ParticipantStatusPublisher publisher) { + public ParticipantHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher) { this.participantType = parameters.getIntermediaryParameters().getParticipantType(); this.participantId = parameters.getIntermediaryParameters().getParticipantId(); this.sender = @@ -249,20 +261,87 @@ public class ParticipantHandler implements Closeable { /** * Check if a participant message applies to this participant handler. * - * @param partipantMsg the message to check + * @param participantMsg the message to check * @return true if it applies, false otherwise */ - public boolean canHandle(ParticipantMessage partipantMsg) { - return partipantMsg.appliesTo(participantType, participantId); + public boolean appliesTo(ParticipantMessage participantMsg) { + return participantMsg.appliesTo(participantType, participantId); } /** - * Check if a participant message applies to this participant handler. + * Method to send ParticipantRegister message to controlloop runtime. + */ + public void sendParticipantRegister() { + var participantRegister = new ParticipantRegister(); + participantRegister.setParticipantId(participantId); + participantRegister.setParticipantType(participantType); + + sender.sendParticipantRegister(participantRegister); + } + + /** + * Handle a participantRegister Ack message. * - * @param partipantMsg the message to check - * @return true if it applies, false otherwise + * @param participantRegisterAckMsg the participantRegisterAck message + */ + public void handleParticipantRegisterAck(ParticipantRegisterAck participantRegisterAckMsg) { + LOGGER.debug("ParticipantRegisterAck message received as responseTo {}", + participantRegisterAckMsg.getResponseTo()); + } + + /** + * Method to send ParticipantDeregister message to controlloop runtime. + */ + public void sendParticipantDeregister() { + var participantDeregister = new ParticipantDeregister(); + participantDeregister.setParticipantId(participantId); + participantDeregister.setParticipantType(participantType); + + sender.sendParticipantDeregister(participantDeregister); + } + + /** + * Handle a participantDeregister Ack message. + * + * @param participantDeregisterAckMsg the participantDeregisterAck message */ - public boolean appliesTo(ParticipantMessage partipantMsg) { - return partipantMsg.appliesTo(participantType, participantId); + public void handleParticipantDeregisterAck(ParticipantDeregisterAck participantDeregisterAckMsg) { + LOGGER.debug("ParticipantDeregisterAck message received as responseTo {}", + participantDeregisterAckMsg.getResponseTo()); + } + + /** + * Handle a ParticipantUpdate message. + * + * @param participantUpdateMsg the ParticipantUpdate message + */ + public void handleParticipantUpdate(ParticipantUpdate participantUpdateMsg) { + LOGGER.debug("ParticipantUpdate message received for participantId {}", + participantUpdateMsg.getParticipantId()); + + if (!participantUpdateMsg.appliesTo(participantType, participantId)) { + return; + } + + Map<UUID, ControlLoopElementDefinition> clDefinitionMap = + participantUpdateMsg.getParticipantDefinitionUpdateMap().get(participantUpdateMsg.getParticipantId()); + + for (ControlLoopElementDefinition element : clDefinitionMap.values()) { + clElementDefsOnThisParticipant.put(element.getId(), element); + } + + sendParticipantUpdateAck(participantUpdateMsg.getMessageId()); + } + + /** + * Method to send ParticipantUpdateAck message to controlloop runtime. + */ + public void sendParticipantUpdateAck(UUID messageId) { + var participantUpdateAck = new ParticipantUpdateAck(); + participantUpdateAck.setResponseTo(messageId); + participantUpdateAck.setMessage("Participant Update Ack message"); + participantUpdateAck.setResult(true); + + sender.sendParticipantUpdateAck(participantUpdateAck); } } |