diff options
Diffstat (limited to 'main/src/main/java/org/onap')
3 files changed, 69 insertions, 23 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java index 3177c096..8c1f7928 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,18 +25,24 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpTopicCheck; import org.onap.policy.pdpx.main.XacmlState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class XacmlPdpHearbeatPublisher implements Runnable { - public static final int DEFAULT_INTERVAL_MS = 60000; + public static final int DEFAULT_HB_INTERVAL_MS = 60000; private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpHearbeatPublisher.class); + private static final Coder CODER = new StandardCoder(); - private final TopicSinkClient topicSinkClient; + private final BidirectionalTopicClient topicChecker; + private final long probeHeartbeatTopicMs; /** * Tracks the state of this PDP. @@ -47,7 +53,7 @@ public class XacmlPdpHearbeatPublisher implements Runnable { * Current timer interval, in milliseconds. */ @Getter - private long intervalMs = DEFAULT_INTERVAL_MS; + private long intervalMs = DEFAULT_HB_INTERVAL_MS; private ScheduledExecutorService timerThread; @@ -57,26 +63,53 @@ public class XacmlPdpHearbeatPublisher implements Runnable { /** * Constructor for instantiating XacmlPdpPublisher. * - * @param topicSinkClient used to send heart beat message + * @param topicChecker used to check the topic before sending heart beat message + * @param probeHeartbeatTopicMs frequency, in milliseconds, with which to probe the + * heartbeat topic before sending the first heartbeat. Zero disables probing * @param state tracks the state of this PDP */ - public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlState state) { - this.topicSinkClient = topicSinkClient; + public XacmlPdpHearbeatPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs, + XacmlState state) { + LOGGER.info("heartbeat topic probe {}ms", probeHeartbeatTopicMs); + this.topicChecker = topicChecker; + this.probeHeartbeatTopicMs = probeHeartbeatTopicMs; this.currentState = state; } @Override public void run() { - PdpStatus message = currentState.genHeartbeat(); - LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message); + try { + if (!isTopicReady()) { + return; + } + + PdpStatus message = currentState.genHeartbeat(); + LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message); + + String json = CODER.encode(message); + topicChecker.send(json); - topicSinkClient.send(message); + } catch (RuntimeException | CoderException e) { + LOGGER.warn("send to {} failed because of {}", topicChecker.getSink().getTopic(), e.getMessage(), e); + } + } + + private boolean isTopicReady() throws CoderException { + if (probeHeartbeatTopicMs <= 0 || topicChecker.isReady()) { + return true; + } + + var check = new PdpTopicCheck(); + check.setName(XacmlState.PDP_NAME); + return topicChecker.awaitReady(check, probeHeartbeatTopicMs); } /** * Method to terminate the heart beat. */ public synchronized void terminate() { + topicChecker.stopWaiting(); + if (timerThread != null) { timerThread.shutdownNow(); timerThread = null; diff --git a/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java b/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java index b994fe9e..a9332e90 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java @@ -22,6 +22,7 @@ package org.onap.policy.pdpx.main.parameters; import lombok.Getter; +import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.parameters.RestClientParameters; import org.onap.policy.common.endpoints.parameters.RestServerParameters; @@ -29,6 +30,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ParameterGroupImpl; import org.onap.policy.common.parameters.ValidationStatus; +import org.onap.policy.common.parameters.annotations.Min; import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; import org.onap.policy.common.parameters.annotations.Valid; @@ -41,6 +43,7 @@ import org.onap.policy.models.base.Validated; @Getter @NotNull @NotBlank +@NoArgsConstructor public class XacmlPdpParameterGroup extends ParameterGroupImpl { public static final String PARAM_POLICY_API = "policyApiParameters"; @@ -54,6 +57,13 @@ public class XacmlPdpParameterGroup extends ParameterGroupImpl { private TopicParameterGroup topicParameterGroup; @Valid private XacmlApplicationParameters applicationParameters; + /** + * Frequency, in seconds, with which to probe the heartbeat topic before sending the + * first heartbeat. Set to zero to disable probing. + */ + @Min(0) + private long probeHeartbeatTopicSec = 4; + /** * Create the xacml pdp parameter group. diff --git a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java index 4dd8a9b3..050d8b24 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java @@ -20,13 +20,12 @@ package org.onap.policy.pdpx.main.startstop; -import java.util.Arrays; import lombok.Getter; import lombok.Setter; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.endpoints.http.client.HttpClientConfigException; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; @@ -70,6 +69,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer { private final XacmlPdpParameterGroup xacmlPdpParameterGroup; /** + * POLICY-PDP-PAP client. + */ + private BidirectionalTopicClient topicClient; + + /** * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then * dispatches them to appropriate listener. */ @@ -108,8 +112,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer { this.xacmlPdpParameterGroup = xacmlPdpParameterGroup; this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - sinkClient = new TopicSinkClient(TOPIC); - heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state); + topicClient = new BidirectionalTopicClient(TOPIC, TOPIC); + sinkClient = new TopicSinkClient(topicClient.getSink()); + + heartbeat = new XacmlPdpHearbeatPublisher(topicClient, + xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state); /* * since the dispatcher isn't registered with the topic yet, we can go ahead @@ -123,7 +130,7 @@ public class XacmlPdpActivator extends ServiceManagerContainer { restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(), XacmlPdpAafFilter.class, XacmlPdpRestController.class); - } catch (RuntimeException | TopicSinkClientException | HttpClientConfigException e) { + } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) { throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e); } @@ -197,18 +204,14 @@ public class XacmlPdpActivator extends ServiceManagerContainer { * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { - for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) { - source.register(msgDispatcher); - } + topicClient.getSource().register(msgDispatcher); } /** * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { - for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) { - source.unregister(msgDispatcher); - } + topicClient.getSource().unregister(msgDispatcher); } /** |