diff options
author | Jim Hahn <jrh3@att.com> | 2021-07-07 12:26:04 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2021-07-07 12:26:04 +0000 |
commit | fdbd773c3a09b8002c51393c529597b671b2d042 (patch) | |
tree | b4778cba67aeaffa353fa4413840d27d7efb8cbe /main | |
parent | a87b9d9e191b4dee5d821db55d1ff4fd70b8d73a (diff) | |
parent | 18bf88fc5aed7586249319af690fb5c09451ad03 (diff) |
Merge "Use separate subscription for heartbeats"
Diffstat (limited to 'main')
5 files changed, 61 insertions, 24 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java index 7d4cb7b4..f38a82e0 100644 --- a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java +++ b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java @@ -25,7 +25,7 @@ import org.onap.policy.common.utils.network.NetworkUtil; /** * Names of various items contained in the Registry. */ -public class PapConstants { +public final class PapConstants { // Registry keys public static final String REG_PAP_ACTIVATOR = "object:activator/pap"; @@ -38,6 +38,7 @@ public class PapConstants { // topic names public static final String TOPIC_POLICY_PDP_PAP = "POLICY-PDP-PAP"; public static final String TOPIC_POLICY_NOTIFICATION = "POLICY-NOTIFICATION"; + public static final String TOPIC_POLICY_HEARTBEAT = "POLICY-HEARTBEAT"; // policy components names public static final String POLICY_API = "api"; diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index 617275b4..3b089400 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -22,12 +22,13 @@ package org.onap.policy.pap.main.startstop; -import java.util.Arrays; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.endpoints.http.server.RestServer; @@ -84,15 +85,17 @@ public class PapActivator extends ServiceManagerContainer { /** * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then dispatches them to - * {@link #reqIdDispatcher}. + * {@link #responseReqIdDispatcher}. */ - private final MessageTypeDispatcher msgDispatcher; + private final MessageTypeDispatcher responseMsgDispatcher; + private final MessageTypeDispatcher heartbeatMsgDispatcher; /** * Listens for {@link PdpStatus} messages and then routes them to the listener associated with the ID of the * originating request. */ - private final RequestIdDispatcher<PdpStatus> reqIdDispatcher; + private final RequestIdDispatcher<PdpStatus> responseReqIdDispatcher; + private final RequestIdDispatcher<PdpStatus> heartbeatReqIdDispatcher; /** * Listener for anonymous {@link PdpStatus} messages either for registration or heartbeat. @@ -111,8 +114,10 @@ public class PapActivator extends ServiceManagerContainer { try { this.papParameterGroup = papParameterGroup; - this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.responseMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + this.heartbeatMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + this.responseReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.heartbeatReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); this.pdpHeartbeatListener = new PdpHeartbeatListener(papParameterGroup.getPdpParameters(), papParameterGroup.isSavePdpStatisticsInDb()); @@ -149,16 +154,24 @@ public class PapActivator extends ServiceManagerContainer { () -> Registry.unregister(PapConstants.REG_PAP_DAO_FACTORY)); addAction("Pdp Heartbeat Listener", - () -> reqIdDispatcher.register(pdpHeartbeatListener), - () -> reqIdDispatcher.unregister(pdpHeartbeatListener)); + () -> heartbeatReqIdDispatcher.register(pdpHeartbeatListener), + () -> heartbeatReqIdDispatcher.unregister(pdpHeartbeatListener)); - addAction("Request ID Dispatcher", - () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher), - () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); + addAction("Response Request ID Dispatcher", + () -> responseMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.responseReqIdDispatcher), + () -> responseMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); - addAction("Message Dispatcher", - this::registerMsgDispatcher, - this::unregisterMsgDispatcher); + addAction("Heartbeat Request ID Dispatcher", + () -> heartbeatMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.heartbeatReqIdDispatcher), + () -> heartbeatMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); + + addAction("Response Message Dispatcher", + () -> registerMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP), + () -> unregisterMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP)); + + addAction("Heartbeat Message Dispatcher", + () -> registerMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT), + () -> unregisterMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT)); addAction("topics", TopicEndpointManager.getManager()::start, @@ -215,7 +228,7 @@ public class PapActivator extends ServiceManagerContainer { .params(pdpParams) .policyNotifier(notifier.get()) .pdpPublisher(pdpPub.get()) - .responseDispatcher(reqIdDispatcher) + .responseDispatcher(responseReqIdDispatcher) .stateChangeTimers(pdpStChgTimers.get()) .updateTimers(pdpUpdTimers.get()) .savePdpStatistics(papParameterGroup.isSavePdpStatisticsInDb()) @@ -288,21 +301,23 @@ public class PapActivator extends ServiceManagerContainer { /** * Registers the dispatcher with the topic source(s). + * @param dispatcher dispatcher to register + * @param topic topic of interest */ - private void registerMsgDispatcher() { - for (final TopicSource source : TopicEndpointManager.getManager() - .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { - source.register(msgDispatcher); + private void registerMsgDispatcher(TopicListener dispatcher, String topic) { + for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) { + source.register(dispatcher); } } /** * Unregisters the dispatcher from the topic source(s). + * @param dispatcher dispatcher to unregister + * @param topic topic of interest */ - private void unregisterMsgDispatcher() { - for (final TopicSource source : TopicEndpointManager.getManager() - .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { - source.unregister(msgDispatcher); + private void unregisterMsgDispatcher(TopicListener dispatcher, String topic) { + for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) { + source.unregister(dispatcher); } } } diff --git a/main/src/test/resources/e2e/PapConfigParameters.json b/main/src/test/resources/e2e/PapConfigParameters.json index 610ded8c..b10f5801 100644 --- a/main/src/test/resources/e2e/PapConfigParameters.json +++ b/main/src/test/resources/e2e/PapConfigParameters.json @@ -32,6 +32,13 @@ "topic" : "POLICY-PDP-PAP", "servers" : [ "message-router" ], "topicCommInfrastructure" : "noop" + }, + { + "topic" : "POLICY-HEARTBEAT", + "effectiveTopic": "POLICY-PDP-PAP", + "consumerGroup": "policy-pap", + "servers" : [ "message-router" ], + "topicCommInfrastructure" : "noop" }], "topicSinks" : [{ "topic" : "POLICY-PDP-PAP", diff --git a/main/src/test/resources/parameters/PapConfigParameters.json b/main/src/test/resources/parameters/PapConfigParameters.json index 452bc9a7..09adfcec 100644 --- a/main/src/test/resources/parameters/PapConfigParameters.json +++ b/main/src/test/resources/parameters/PapConfigParameters.json @@ -33,6 +33,13 @@ "topic" : "POLICY-PDP-PAP", "servers" : [ "message-router" ], "topicCommInfrastructure" : "noop" + }, + { + "topic" : "POLICY-HEARTBEAT", + "effectiveTopic": "POLICY-PDP-PAP", + "consumerGroup": "policy-pap", + "servers" : [ "message-router" ], + "topicCommInfrastructure" : "noop" }], "topicSinks" : [{ "topic" : "POLICY-PDP-PAP", diff --git a/main/src/test/resources/parameters/PapConfigParametersStd.json b/main/src/test/resources/parameters/PapConfigParametersStd.json index 0868aa41..0f881430 100644 --- a/main/src/test/resources/parameters/PapConfigParametersStd.json +++ b/main/src/test/resources/parameters/PapConfigParametersStd.json @@ -34,6 +34,13 @@ "topic" : "POLICY-PDP-PAP", "servers" : [ "message-router" ], "topicCommInfrastructure" : "noop" + }, + { + "topic" : "POLICY-HEARTBEAT", + "effectiveTopic": "POLICY-PDP-PAP", + "consumerGroup": "policy-pap", + "servers" : [ "message-router" ], + "topicCommInfrastructure" : "noop" }], "topicSinks" : [{ "topic" : "POLICY-PDP-PAP", |