diff options
Diffstat (limited to 'main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java')
-rw-r--r-- | main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java index 4dd8a9b3..050d8b24 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java @@ -20,13 +20,12 @@ package org.onap.policy.pdpx.main.startstop; -import java.util.Arrays; import lombok.Getter; import lombok.Setter; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.endpoints.http.client.HttpClientConfigException; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; @@ -70,6 +69,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer { private final XacmlPdpParameterGroup xacmlPdpParameterGroup; /** + * POLICY-PDP-PAP client. + */ + private BidirectionalTopicClient topicClient; + + /** * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then * dispatches them to appropriate listener. */ @@ -108,8 +112,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer { this.xacmlPdpParameterGroup = xacmlPdpParameterGroup; this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - sinkClient = new TopicSinkClient(TOPIC); - heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state); + topicClient = new BidirectionalTopicClient(TOPIC, TOPIC); + sinkClient = new TopicSinkClient(topicClient.getSink()); + + heartbeat = new XacmlPdpHearbeatPublisher(topicClient, + xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state); /* * since the dispatcher isn't registered with the topic yet, we can go ahead @@ -123,7 +130,7 @@ public class XacmlPdpActivator extends ServiceManagerContainer { restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(), XacmlPdpAafFilter.class, XacmlPdpRestController.class); - } catch (RuntimeException | TopicSinkClientException | HttpClientConfigException e) { + } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) { throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e); } @@ -197,18 +204,14 @@ public class XacmlPdpActivator extends ServiceManagerContainer { * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { - for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) { - source.register(msgDispatcher); - } + topicClient.getSource().register(msgDispatcher); } /** * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { - for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) { - source.unregister(msgDispatcher); - } + topicClient.getSource().unregister(msgDispatcher); } /** |