diff options
author | Jim Hahn <jrh3@att.com> | 2021-07-06 10:19:52 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2021-07-06 10:55:13 -0400 |
commit | 18bf88fc5aed7586249319af690fb5c09451ad03 (patch) | |
tree | 8419adca544445d64c37714b5867c7e927945317 /main/src | |
parent | 4f37fcf24755332c256f72104df36a43cb4c7560 (diff) |
Use separate subscription for heartbeats
Separated the heartbeat processing onto its own POLICY-HEARTBEAT topic,
still on the "real" (i.e., "effective") POLICY-PDP-PAP topic, like we
had originally been doing with the statistics. With this change,
statistics processing continues to be part of the heartbeat
class/processing, thus a separate class is not required to listen for,
and handle, statistics.
This new subscription uses a shared consumer group so that only one PAP
will process any given heartbeat message, which should reduce the
likelihood of DB contention and duplicate keys.
This also means that the "saveStatisticsInDb" flag will still be used,
which is a more obvious mechanism for controlling the storing of
statistics than the presence/absence of a topic in the config file.
Issue-ID: POLICY-3460
Change-Id: Ia07132b1c7aef006af86fddbe677fb1243a4e2c3
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'main/src')
5 files changed, 61 insertions, 24 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java index 7d4cb7b4..f38a82e0 100644 --- a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java +++ b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java @@ -25,7 +25,7 @@ import org.onap.policy.common.utils.network.NetworkUtil; /** * Names of various items contained in the Registry. */ -public class PapConstants { +public final class PapConstants { // Registry keys public static final String REG_PAP_ACTIVATOR = "object:activator/pap"; @@ -38,6 +38,7 @@ public class PapConstants { // topic names public static final String TOPIC_POLICY_PDP_PAP = "POLICY-PDP-PAP"; public static final String TOPIC_POLICY_NOTIFICATION = "POLICY-NOTIFICATION"; + public static final String TOPIC_POLICY_HEARTBEAT = "POLICY-HEARTBEAT"; // policy components names public static final String POLICY_API = "api"; diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index 617275b4..3b089400 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -22,12 +22,13 @@ package org.onap.policy.pap.main.startstop; -import java.util.Arrays; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.endpoints.http.server.RestServer; @@ -84,15 +85,17 @@ public class PapActivator extends ServiceManagerContainer { /** * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then dispatches them to - * {@link #reqIdDispatcher}. + * {@link #responseReqIdDispatcher}. */ - private final MessageTypeDispatcher msgDispatcher; + private final MessageTypeDispatcher responseMsgDispatcher; + private final MessageTypeDispatcher heartbeatMsgDispatcher; /** * Listens for {@link PdpStatus} messages and then routes them to the listener associated with the ID of the * originating request. */ - private final RequestIdDispatcher<PdpStatus> reqIdDispatcher; + private final RequestIdDispatcher<PdpStatus> responseReqIdDispatcher; + private final RequestIdDispatcher<PdpStatus> heartbeatReqIdDispatcher; /** * Listener for anonymous {@link PdpStatus} messages either for registration or heartbeat. @@ -111,8 +114,10 @@ public class PapActivator extends ServiceManagerContainer { try { this.papParameterGroup = papParameterGroup; - this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.responseMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + this.heartbeatMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + this.responseReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.heartbeatReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); this.pdpHeartbeatListener = new PdpHeartbeatListener(papParameterGroup.getPdpParameters(), papParameterGroup.isSavePdpStatisticsInDb()); @@ -149,16 +154,24 @@ public class PapActivator extends ServiceManagerContainer { () -> Registry.unregister(PapConstants.REG_PAP_DAO_FACTORY)); addAction("Pdp Heartbeat Listener", - () -> reqIdDispatcher.register(pdpHeartbeatListener), - () -> reqIdDispatcher.unregister(pdpHeartbeatListener)); + () -> heartbeatReqIdDispatcher.register(pdpHeartbeatListener), + () -> heartbeatReqIdDispatcher.unregister(pdpHeartbeatListener)); - addAction("Request ID Dispatcher", - () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher), - () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); + addAction("Response Request ID Dispatcher", + () -> responseMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.responseReqIdDispatcher), + () -> responseMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); - addAction("Message Dispatcher", - this::registerMsgDispatcher, - this::unregisterMsgDispatcher); + addAction("Heartbeat Request ID Dispatcher", + () -> heartbeatMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.heartbeatReqIdDispatcher), + () -> heartbeatMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); + + addAction("Response Message Dispatcher", + () -> registerMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP), + () -> unregisterMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP)); + + addAction("Heartbeat Message Dispatcher", + () -> registerMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT), + () -> unregisterMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT)); addAction("topics", TopicEndpointManager.getManager()::start, @@ -215,7 +228,7 @@ public class PapActivator extends ServiceManagerContainer { .params(pdpParams) .policyNotifier(notifier.get()) .pdpPublisher(pdpPub.get()) - .responseDispatcher(reqIdDispatcher) + .responseDispatcher(responseReqIdDispatcher) .stateChangeTimers(pdpStChgTimers.get()) .updateTimers(pdpUpdTimers.get()) .savePdpStatistics(papParameterGroup.isSavePdpStatisticsInDb()) @@ -288,21 +301,23 @@ public class PapActivator extends ServiceManagerContainer { /** * Registers the dispatcher with the topic source(s). + * @param dispatcher dispatcher to register + * @param topic topic of interest */ - private void registerMsgDispatcher() { - for (final TopicSource source : TopicEndpointManager.getManager() - .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { - source.register(msgDispatcher); + private void registerMsgDispatcher(TopicListener dispatcher, String topic) { + for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) { + source.register(dispatcher); } } /** * Unregisters the dispatcher from the topic source(s). + * @param dispatcher dispatcher to unregister + * @param topic topic of interest */ - private void unregisterMsgDispatcher() { - for (final TopicSource source : TopicEndpointManager.getManager() - .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { - source.unregister(msgDispatcher); + private void unregisterMsgDispatcher(TopicListener dispatcher, String topic) { + for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) { + source.unregister(dispatcher); } } } diff --git a/main/src/test/resources/e2e/PapConfigParameters.json b/main/src/test/resources/e2e/PapConfigParameters.json index 610ded8c..b10f5801 100644 --- a/main/src/test/resources/e2e/PapConfigParameters.json +++ b/main/src/test/resources/e2e/PapConfigParameters.json @@ -32,6 +32,13 @@ "topic" : "POLICY-PDP-PAP", "servers" : [ "message-router" ], "topicCommInfrastructure" : "noop" + }, + { + "topic" : "POLICY-HEARTBEAT", + "effectiveTopic": "POLICY-PDP-PAP", + "consumerGroup": "policy-pap", + "servers" : [ "message-router" ], + "topicCommInfrastructure" : "noop" }], "topicSinks" : [{ "topic" : "POLICY-PDP-PAP", diff --git a/main/src/test/resources/parameters/PapConfigParameters.json b/main/src/test/resources/parameters/PapConfigParameters.json index 452bc9a7..09adfcec 100644 --- a/main/src/test/resources/parameters/PapConfigParameters.json +++ b/main/src/test/resources/parameters/PapConfigParameters.json @@ -33,6 +33,13 @@ "topic" : "POLICY-PDP-PAP", "servers" : [ "message-router" ], "topicCommInfrastructure" : "noop" + }, + { + "topic" : "POLICY-HEARTBEAT", + "effectiveTopic": "POLICY-PDP-PAP", + "consumerGroup": "policy-pap", + "servers" : [ "message-router" ], + "topicCommInfrastructure" : "noop" }], "topicSinks" : [{ "topic" : "POLICY-PDP-PAP", diff --git a/main/src/test/resources/parameters/PapConfigParametersStd.json b/main/src/test/resources/parameters/PapConfigParametersStd.json index 0868aa41..0f881430 100644 --- a/main/src/test/resources/parameters/PapConfigParametersStd.json +++ b/main/src/test/resources/parameters/PapConfigParametersStd.json @@ -34,6 +34,13 @@ "topic" : "POLICY-PDP-PAP", "servers" : [ "message-router" ], "topicCommInfrastructure" : "noop" + }, + { + "topic" : "POLICY-HEARTBEAT", + "effectiveTopic": "POLICY-PDP-PAP", + "consumerGroup": "policy-pap", + "servers" : [ "message-router" ], + "topicCommInfrastructure" : "noop" }], "topicSinks" : [{ "topic" : "POLICY-PDP-PAP", |