aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java')
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java55
1 files changed, 44 insertions, 11 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
index 3177c096..8c1f7928 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,18 +25,24 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpTopicCheck;
import org.onap.policy.pdpx.main.XacmlState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class XacmlPdpHearbeatPublisher implements Runnable {
- public static final int DEFAULT_INTERVAL_MS = 60000;
+ public static final int DEFAULT_HB_INTERVAL_MS = 60000;
private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpHearbeatPublisher.class);
+ private static final Coder CODER = new StandardCoder();
- private final TopicSinkClient topicSinkClient;
+ private final BidirectionalTopicClient topicChecker;
+ private final long probeHeartbeatTopicMs;
/**
* Tracks the state of this PDP.
@@ -47,7 +53,7 @@ public class XacmlPdpHearbeatPublisher implements Runnable {
* Current timer interval, in milliseconds.
*/
@Getter
- private long intervalMs = DEFAULT_INTERVAL_MS;
+ private long intervalMs = DEFAULT_HB_INTERVAL_MS;
private ScheduledExecutorService timerThread;
@@ -57,26 +63,53 @@ public class XacmlPdpHearbeatPublisher implements Runnable {
/**
* Constructor for instantiating XacmlPdpPublisher.
*
- * @param topicSinkClient used to send heart beat message
+ * @param topicChecker used to check the topic before sending heart beat message
+ * @param probeHeartbeatTopicMs frequency, in milliseconds, with which to probe the
+ * heartbeat topic before sending the first heartbeat. Zero disables probing
* @param state tracks the state of this PDP
*/
- public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlState state) {
- this.topicSinkClient = topicSinkClient;
+ public XacmlPdpHearbeatPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs,
+ XacmlState state) {
+ LOGGER.info("heartbeat topic probe {}ms", probeHeartbeatTopicMs);
+ this.topicChecker = topicChecker;
+ this.probeHeartbeatTopicMs = probeHeartbeatTopicMs;
this.currentState = state;
}
@Override
public void run() {
- PdpStatus message = currentState.genHeartbeat();
- LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message);
+ try {
+ if (!isTopicReady()) {
+ return;
+ }
+
+ PdpStatus message = currentState.genHeartbeat();
+ LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message);
+
+ String json = CODER.encode(message);
+ topicChecker.send(json);
- topicSinkClient.send(message);
+ } catch (RuntimeException | CoderException e) {
+ LOGGER.warn("send to {} failed because of {}", topicChecker.getSink().getTopic(), e.getMessage(), e);
+ }
+ }
+
+ private boolean isTopicReady() throws CoderException {
+ if (probeHeartbeatTopicMs <= 0 || topicChecker.isReady()) {
+ return true;
+ }
+
+ var check = new PdpTopicCheck();
+ check.setName(XacmlState.PDP_NAME);
+ return topicChecker.awaitReady(check, probeHeartbeatTopicMs);
}
/**
* Method to terminate the heart beat.
*/
public synchronized void terminate() {
+ topicChecker.stopWaiting();
+
if (timerThread != null) {
timerThread.shutdownNow();
timerThread = null;