diff options
Diffstat (limited to 'main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java')
-rw-r--r-- | main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java | 61 |
1 files changed, 38 insertions, 23 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index 617275b4..3b089400 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -22,12 +22,13 @@ package org.onap.policy.pap.main.startstop; -import java.util.Arrays; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.endpoints.http.server.RestServer; @@ -84,15 +85,17 @@ public class PapActivator extends ServiceManagerContainer { /** * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then dispatches them to - * {@link #reqIdDispatcher}. + * {@link #responseReqIdDispatcher}. */ - private final MessageTypeDispatcher msgDispatcher; + private final MessageTypeDispatcher responseMsgDispatcher; + private final MessageTypeDispatcher heartbeatMsgDispatcher; /** * Listens for {@link PdpStatus} messages and then routes them to the listener associated with the ID of the * originating request. */ - private final RequestIdDispatcher<PdpStatus> reqIdDispatcher; + private final RequestIdDispatcher<PdpStatus> responseReqIdDispatcher; + private final RequestIdDispatcher<PdpStatus> heartbeatReqIdDispatcher; /** * Listener for anonymous {@link PdpStatus} messages either for registration or heartbeat. @@ -111,8 +114,10 @@ public class PapActivator extends ServiceManagerContainer { try { this.papParameterGroup = papParameterGroup; - this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.responseMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + this.heartbeatMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + this.responseReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.heartbeatReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); this.pdpHeartbeatListener = new PdpHeartbeatListener(papParameterGroup.getPdpParameters(), papParameterGroup.isSavePdpStatisticsInDb()); @@ -149,16 +154,24 @@ public class PapActivator extends ServiceManagerContainer { () -> Registry.unregister(PapConstants.REG_PAP_DAO_FACTORY)); addAction("Pdp Heartbeat Listener", - () -> reqIdDispatcher.register(pdpHeartbeatListener), - () -> reqIdDispatcher.unregister(pdpHeartbeatListener)); + () -> heartbeatReqIdDispatcher.register(pdpHeartbeatListener), + () -> heartbeatReqIdDispatcher.unregister(pdpHeartbeatListener)); - addAction("Request ID Dispatcher", - () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher), - () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); + addAction("Response Request ID Dispatcher", + () -> responseMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.responseReqIdDispatcher), + () -> responseMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); - addAction("Message Dispatcher", - this::registerMsgDispatcher, - this::unregisterMsgDispatcher); + addAction("Heartbeat Request ID Dispatcher", + () -> heartbeatMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.heartbeatReqIdDispatcher), + () -> heartbeatMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); + + addAction("Response Message Dispatcher", + () -> registerMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP), + () -> unregisterMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP)); + + addAction("Heartbeat Message Dispatcher", + () -> registerMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT), + () -> unregisterMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT)); addAction("topics", TopicEndpointManager.getManager()::start, @@ -215,7 +228,7 @@ public class PapActivator extends ServiceManagerContainer { .params(pdpParams) .policyNotifier(notifier.get()) .pdpPublisher(pdpPub.get()) - .responseDispatcher(reqIdDispatcher) + .responseDispatcher(responseReqIdDispatcher) .stateChangeTimers(pdpStChgTimers.get()) .updateTimers(pdpUpdTimers.get()) .savePdpStatistics(papParameterGroup.isSavePdpStatisticsInDb()) @@ -288,21 +301,23 @@ public class PapActivator extends ServiceManagerContainer { /** * Registers the dispatcher with the topic source(s). + * @param dispatcher dispatcher to register + * @param topic topic of interest */ - private void registerMsgDispatcher() { - for (final TopicSource source : TopicEndpointManager.getManager() - .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { - source.register(msgDispatcher); + private void registerMsgDispatcher(TopicListener dispatcher, String topic) { + for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) { + source.register(dispatcher); } } /** * Unregisters the dispatcher from the topic source(s). + * @param dispatcher dispatcher to unregister + * @param topic topic of interest */ - private void unregisterMsgDispatcher() { - for (final TopicSource source : TopicEndpointManager.getManager() - .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { - source.unregister(msgDispatcher); + private void unregisterMsgDispatcher(TopicListener dispatcher, String topic) { + for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) { + source.unregister(dispatcher); } } } |