diff options
Diffstat (limited to 'main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java')
-rw-r--r-- | main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java | 55 |
1 files changed, 44 insertions, 11 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java index 3177c096..8c1f7928 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,18 +25,24 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpTopicCheck; import org.onap.policy.pdpx.main.XacmlState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class XacmlPdpHearbeatPublisher implements Runnable { - public static final int DEFAULT_INTERVAL_MS = 60000; + public static final int DEFAULT_HB_INTERVAL_MS = 60000; private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpHearbeatPublisher.class); + private static final Coder CODER = new StandardCoder(); - private final TopicSinkClient topicSinkClient; + private final BidirectionalTopicClient topicChecker; + private final long probeHeartbeatTopicMs; /** * Tracks the state of this PDP. @@ -47,7 +53,7 @@ public class XacmlPdpHearbeatPublisher implements Runnable { * Current timer interval, in milliseconds. */ @Getter - private long intervalMs = DEFAULT_INTERVAL_MS; + private long intervalMs = DEFAULT_HB_INTERVAL_MS; private ScheduledExecutorService timerThread; @@ -57,26 +63,53 @@ public class XacmlPdpHearbeatPublisher implements Runnable { /** * Constructor for instantiating XacmlPdpPublisher. * - * @param topicSinkClient used to send heart beat message + * @param topicChecker used to check the topic before sending heart beat message + * @param probeHeartbeatTopicMs frequency, in milliseconds, with which to probe the + * heartbeat topic before sending the first heartbeat. Zero disables probing * @param state tracks the state of this PDP */ - public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlState state) { - this.topicSinkClient = topicSinkClient; + public XacmlPdpHearbeatPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs, + XacmlState state) { + LOGGER.info("heartbeat topic probe {}ms", probeHeartbeatTopicMs); + this.topicChecker = topicChecker; + this.probeHeartbeatTopicMs = probeHeartbeatTopicMs; this.currentState = state; } @Override public void run() { - PdpStatus message = currentState.genHeartbeat(); - LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message); + try { + if (!isTopicReady()) { + return; + } + + PdpStatus message = currentState.genHeartbeat(); + LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message); + + String json = CODER.encode(message); + topicChecker.send(json); - topicSinkClient.send(message); + } catch (RuntimeException | CoderException e) { + LOGGER.warn("send to {} failed because of {}", topicChecker.getSink().getTopic(), e.getMessage(), e); + } + } + + private boolean isTopicReady() throws CoderException { + if (probeHeartbeatTopicMs <= 0 || topicChecker.isReady()) { + return true; + } + + var check = new PdpTopicCheck(); + check.setName(XacmlState.PDP_NAME); + return topicChecker.awaitReady(check, probeHeartbeatTopicMs); } /** * Method to terminate the heart beat. */ public synchronized void terminate() { + topicChecker.stopWaiting(); + if (timerThread != null) { timerThread.shutdownNow(); timerThread = null; |