aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
diff options
context:
space:
mode:
Diffstat (limited to 'main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java')
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java27
1 files changed, 15 insertions, 12 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
index 4dd8a9b3..050d8b24 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
@@ -20,13 +20,12 @@
package org.onap.policy.pdpx.main.startstop;
-import java.util.Arrays;
import lombok.Getter;
import lombok.Setter;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
@@ -70,6 +69,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
private final XacmlPdpParameterGroup xacmlPdpParameterGroup;
/**
+ * POLICY-PDP-PAP client.
+ */
+ private BidirectionalTopicClient topicClient;
+
+ /**
* Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then
* dispatches them to appropriate listener.
*/
@@ -108,8 +112,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
- sinkClient = new TopicSinkClient(TOPIC);
- heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state);
+ topicClient = new BidirectionalTopicClient(TOPIC, TOPIC);
+ sinkClient = new TopicSinkClient(topicClient.getSink());
+
+ heartbeat = new XacmlPdpHearbeatPublisher(topicClient,
+ xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state);
/*
* since the dispatcher isn't registered with the topic yet, we can go ahead
@@ -123,7 +130,7 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(),
XacmlPdpAafFilter.class, XacmlPdpRestController.class);
- } catch (RuntimeException | TopicSinkClientException | HttpClientConfigException e) {
+ } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) {
throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
}
@@ -197,18 +204,14 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher() {
- for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
- source.register(msgDispatcher);
- }
+ topicClient.getSource().register(msgDispatcher);
}
/**
* Unregisters the dispatcher from the topic source(s).
*/
private void unregisterMsgDispatcher() {
- for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
- source.unregister(msgDispatcher);
- }
+ topicClient.getSource().unregister(msgDispatcher);
}
/**