From 18bf88fc5aed7586249319af690fb5c09451ad03 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Tue, 6 Jul 2021 10:19:52 -0400 Subject: Use separate subscription for heartbeats Separated the heartbeat processing onto its own POLICY-HEARTBEAT topic, still on the "real" (i.e., "effective") POLICY-PDP-PAP topic, like we had originally been doing with the statistics. With this change, statistics processing continues to be part of the heartbeat class/processing, thus a separate class is not required to listen for, and handle, statistics. This new subscription uses a shared consumer group so that only one PAP will process any given heartbeat message, which should reduce the likelihood of DB contention and duplicate keys. This also means that the "saveStatisticsInDb" flag will still be used, which is a more obvious mechanism for controlling the storing of statistics than the presence/absence of a topic in the config file. Issue-ID: POLICY-3460 Change-Id: Ia07132b1c7aef006af86fddbe677fb1243a4e2c3 Signed-off-by: Jim Hahn --- .../org/onap/policy/pap/main/PapConstants.java | 3 +- .../policy/pap/main/startstop/PapActivator.java | 61 ++++++++++++++-------- .../test/resources/e2e/PapConfigParameters.json | 7 +++ .../resources/parameters/PapConfigParameters.json | 7 +++ .../parameters/PapConfigParametersStd.json | 7 +++ 5 files changed, 61 insertions(+), 24 deletions(-) (limited to 'main') diff --git a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java index 7d4cb7b4..f38a82e0 100644 --- a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java +++ b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java @@ -25,7 +25,7 @@ import org.onap.policy.common.utils.network.NetworkUtil; /** * Names of various items contained in the Registry. */ -public class PapConstants { +public final class PapConstants { // Registry keys public static final String REG_PAP_ACTIVATOR = "object:activator/pap"; @@ -38,6 +38,7 @@ public class PapConstants { // topic names public static final String TOPIC_POLICY_PDP_PAP = "POLICY-PDP-PAP"; public static final String TOPIC_POLICY_NOTIFICATION = "POLICY-NOTIFICATION"; + public static final String TOPIC_POLICY_HEARTBEAT = "POLICY-HEARTBEAT"; // policy components names public static final String POLICY_API = "api"; diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index 617275b4..3b089400 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -22,12 +22,13 @@ package org.onap.policy.pap.main.startstop; -import java.util.Arrays; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.endpoints.http.server.RestServer; @@ -84,15 +85,17 @@ public class PapActivator extends ServiceManagerContainer { /** * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then dispatches them to - * {@link #reqIdDispatcher}. + * {@link #responseReqIdDispatcher}. */ - private final MessageTypeDispatcher msgDispatcher; + private final MessageTypeDispatcher responseMsgDispatcher; + private final MessageTypeDispatcher heartbeatMsgDispatcher; /** * Listens for {@link PdpStatus} messages and then routes them to the listener associated with the ID of the * originating request. */ - private final RequestIdDispatcher reqIdDispatcher; + private final RequestIdDispatcher responseReqIdDispatcher; + private final RequestIdDispatcher heartbeatReqIdDispatcher; /** * Listener for anonymous {@link PdpStatus} messages either for registration or heartbeat. @@ -111,8 +114,10 @@ public class PapActivator extends ServiceManagerContainer { try { this.papParameterGroup = papParameterGroup; - this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.responseMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + this.heartbeatMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + this.responseReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.heartbeatReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); this.pdpHeartbeatListener = new PdpHeartbeatListener(papParameterGroup.getPdpParameters(), papParameterGroup.isSavePdpStatisticsInDb()); @@ -149,16 +154,24 @@ public class PapActivator extends ServiceManagerContainer { () -> Registry.unregister(PapConstants.REG_PAP_DAO_FACTORY)); addAction("Pdp Heartbeat Listener", - () -> reqIdDispatcher.register(pdpHeartbeatListener), - () -> reqIdDispatcher.unregister(pdpHeartbeatListener)); + () -> heartbeatReqIdDispatcher.register(pdpHeartbeatListener), + () -> heartbeatReqIdDispatcher.unregister(pdpHeartbeatListener)); - addAction("Request ID Dispatcher", - () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher), - () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); + addAction("Response Request ID Dispatcher", + () -> responseMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.responseReqIdDispatcher), + () -> responseMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); - addAction("Message Dispatcher", - this::registerMsgDispatcher, - this::unregisterMsgDispatcher); + addAction("Heartbeat Request ID Dispatcher", + () -> heartbeatMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.heartbeatReqIdDispatcher), + () -> heartbeatMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); + + addAction("Response Message Dispatcher", + () -> registerMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP), + () -> unregisterMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP)); + + addAction("Heartbeat Message Dispatcher", + () -> registerMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT), + () -> unregisterMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT)); addAction("topics", TopicEndpointManager.getManager()::start, @@ -215,7 +228,7 @@ public class PapActivator extends ServiceManagerContainer { .params(pdpParams) .policyNotifier(notifier.get()) .pdpPublisher(pdpPub.get()) - .responseDispatcher(reqIdDispatcher) + .responseDispatcher(responseReqIdDispatcher) .stateChangeTimers(pdpStChgTimers.get()) .updateTimers(pdpUpdTimers.get()) .savePdpStatistics(papParameterGroup.isSavePdpStatisticsInDb()) @@ -288,21 +301,23 @@ public class PapActivator extends ServiceManagerContainer { /** * Registers the dispatcher with the topic source(s). + * @param dispatcher dispatcher to register + * @param topic topic of interest */ - private void registerMsgDispatcher() { - for (final TopicSource source : TopicEndpointManager.getManager() - .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { - source.register(msgDispatcher); + private void registerMsgDispatcher(TopicListener dispatcher, String topic) { + for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) { + source.register(dispatcher); } } /** * Unregisters the dispatcher from the topic source(s). + * @param dispatcher dispatcher to unregister + * @param topic topic of interest */ - private void unregisterMsgDispatcher() { - for (final TopicSource source : TopicEndpointManager.getManager() - .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { - source.unregister(msgDispatcher); + private void unregisterMsgDispatcher(TopicListener dispatcher, String topic) { + for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) { + source.unregister(dispatcher); } } } diff --git a/main/src/test/resources/e2e/PapConfigParameters.json b/main/src/test/resources/e2e/PapConfigParameters.json index 610ded8c..b10f5801 100644 --- a/main/src/test/resources/e2e/PapConfigParameters.json +++ b/main/src/test/resources/e2e/PapConfigParameters.json @@ -32,6 +32,13 @@ "topic" : "POLICY-PDP-PAP", "servers" : [ "message-router" ], "topicCommInfrastructure" : "noop" + }, + { + "topic" : "POLICY-HEARTBEAT", + "effectiveTopic": "POLICY-PDP-PAP", + "consumerGroup": "policy-pap", + "servers" : [ "message-router" ], + "topicCommInfrastructure" : "noop" }], "topicSinks" : [{ "topic" : "POLICY-PDP-PAP", diff --git a/main/src/test/resources/parameters/PapConfigParameters.json b/main/src/test/resources/parameters/PapConfigParameters.json index 452bc9a7..09adfcec 100644 --- a/main/src/test/resources/parameters/PapConfigParameters.json +++ b/main/src/test/resources/parameters/PapConfigParameters.json @@ -33,6 +33,13 @@ "topic" : "POLICY-PDP-PAP", "servers" : [ "message-router" ], "topicCommInfrastructure" : "noop" + }, + { + "topic" : "POLICY-HEARTBEAT", + "effectiveTopic": "POLICY-PDP-PAP", + "consumerGroup": "policy-pap", + "servers" : [ "message-router" ], + "topicCommInfrastructure" : "noop" }], "topicSinks" : [{ "topic" : "POLICY-PDP-PAP", diff --git a/main/src/test/resources/parameters/PapConfigParametersStd.json b/main/src/test/resources/parameters/PapConfigParametersStd.json index 0868aa41..0f881430 100644 --- a/main/src/test/resources/parameters/PapConfigParametersStd.json +++ b/main/src/test/resources/parameters/PapConfigParametersStd.json @@ -34,6 +34,13 @@ "topic" : "POLICY-PDP-PAP", "servers" : [ "message-router" ], "topicCommInfrastructure" : "noop" + }, + { + "topic" : "POLICY-HEARTBEAT", + "effectiveTopic": "POLICY-PDP-PAP", + "consumerGroup": "policy-pap", + "servers" : [ "message-router" ], + "topicCommInfrastructure" : "noop" }], "topicSinks" : [{ "topic" : "POLICY-PDP-PAP", -- cgit 1.2.3-korg