diff options
Diffstat (limited to 'tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java')
-rw-r--r-- | tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java | 36 |
1 files changed, 36 insertions, 0 deletions
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java index dd0cf30a8..be2fa1a30 100644 --- a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java +++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -26,13 +26,22 @@ import javax.ws.rs.core.Response.Status; import lombok.Getter; import lombok.experimental.Delegate; import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantHealthCheckListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStateChangeListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; +import org.onap.policy.common.endpoints.listeners.ScoListener; import org.onap.policy.common.utils.services.ServiceManagerContainer; /** @@ -80,6 +89,9 @@ public class IntermediaryActivator extends ServiceManagerContainer { // @formatter:off final AtomicReference<ParticipantStatusPublisher> statusPublisher = new AtomicReference<>(); + final AtomicReference<ParticipantStateChangeListener> participantStateChangeListener = new AtomicReference<>(); + final AtomicReference<ParticipantHealthCheckListener> participantHealthCheckListener = new AtomicReference<>(); + final AtomicReference<ControlLoopStateChangeListener> controlLoopStateChangeListener = new AtomicReference<>(); final AtomicReference<ControlLoopUpdateListener> controlLoopUpdateListener = new AtomicReference<>(); addAction("Topic endpoint management", @@ -94,6 +106,18 @@ public class IntermediaryActivator extends ServiceManagerContainer { () -> participantHandler.set(new ParticipantHandler(parameters, statusPublisher.get())), () -> participantHandler.get().close()); + addAction("Participant State Change Listener", + () -> participantStateChangeListener.set(new ParticipantStateChangeListener(participantHandler.get())), + () -> participantStateChangeListener.get().close()); + + addAction("Participant Health Check Listener", + () -> participantHealthCheckListener.set(new ParticipantHealthCheckListener(participantHandler.get())), + () -> participantHealthCheckListener.get().close()); + + addAction("Control Loop State Change Listener", + () -> controlLoopStateChangeListener.set(new ControlLoopStateChangeListener(participantHandler.get())), + () -> controlLoopStateChangeListener.get().close()); + addAction("Control Loop Update Listener", () -> controlLoopUpdateListener.set(new ControlLoopUpdateListener(participantHandler.get())), () -> controlLoopUpdateListener.get().close()); @@ -106,6 +130,18 @@ public class IntermediaryActivator extends ServiceManagerContainer { * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATE_CHANGE.name(), + (ScoListener<ParticipantStateChange>) new ParticipantStateChangeListener( + participantHandler.get())); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_HEALTH_CHECK.name(), + (ScoListener<ParticipantHealthCheck>) new ParticipantHealthCheckListener( + participantHandler.get())); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_STATE_CHANGE.name(), + (ScoListener<ParticipantControlLoopStateChange>) new ControlLoopStateChangeListener( + participantHandler.get())); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_UPDATE.name(), + (ScoListener<ParticipantControlLoopUpdate>) new ControlLoopUpdateListener( + participantHandler.get())); for (final TopicSource source : topicSources) { source.register(msgDispatcher); } |