diff options
5 files changed, 168 insertions, 32 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java index 3177c096..8c1f7928 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,18 +25,24 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpTopicCheck; import org.onap.policy.pdpx.main.XacmlState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class XacmlPdpHearbeatPublisher implements Runnable { - public static final int DEFAULT_INTERVAL_MS = 60000; + public static final int DEFAULT_HB_INTERVAL_MS = 60000; private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpHearbeatPublisher.class); + private static final Coder CODER = new StandardCoder(); - private final TopicSinkClient topicSinkClient; + private final BidirectionalTopicClient topicChecker; + private final long probeHeartbeatTopicMs; /** * Tracks the state of this PDP. @@ -47,7 +53,7 @@ public class XacmlPdpHearbeatPublisher implements Runnable { * Current timer interval, in milliseconds. */ @Getter - private long intervalMs = DEFAULT_INTERVAL_MS; + private long intervalMs = DEFAULT_HB_INTERVAL_MS; private ScheduledExecutorService timerThread; @@ -57,26 +63,53 @@ public class XacmlPdpHearbeatPublisher implements Runnable { /** * Constructor for instantiating XacmlPdpPublisher. * - * @param topicSinkClient used to send heart beat message + * @param topicChecker used to check the topic before sending heart beat message + * @param probeHeartbeatTopicMs frequency, in milliseconds, with which to probe the + * heartbeat topic before sending the first heartbeat. Zero disables probing * @param state tracks the state of this PDP */ - public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlState state) { - this.topicSinkClient = topicSinkClient; + public XacmlPdpHearbeatPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs, + XacmlState state) { + LOGGER.info("heartbeat topic probe {}ms", probeHeartbeatTopicMs); + this.topicChecker = topicChecker; + this.probeHeartbeatTopicMs = probeHeartbeatTopicMs; this.currentState = state; } @Override public void run() { - PdpStatus message = currentState.genHeartbeat(); - LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message); + try { + if (!isTopicReady()) { + return; + } + + PdpStatus message = currentState.genHeartbeat(); + LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message); + + String json = CODER.encode(message); + topicChecker.send(json); - topicSinkClient.send(message); + } catch (RuntimeException | CoderException e) { + LOGGER.warn("send to {} failed because of {}", topicChecker.getSink().getTopic(), e.getMessage(), e); + } + } + + private boolean isTopicReady() throws CoderException { + if (probeHeartbeatTopicMs <= 0 || topicChecker.isReady()) { + return true; + } + + var check = new PdpTopicCheck(); + check.setName(XacmlState.PDP_NAME); + return topicChecker.awaitReady(check, probeHeartbeatTopicMs); } /** * Method to terminate the heart beat. */ public synchronized void terminate() { + topicChecker.stopWaiting(); + if (timerThread != null) { timerThread.shutdownNow(); timerThread = null; diff --git a/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java b/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java index b994fe9e..a9332e90 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java @@ -22,6 +22,7 @@ package org.onap.policy.pdpx.main.parameters; import lombok.Getter; +import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.parameters.RestClientParameters; import org.onap.policy.common.endpoints.parameters.RestServerParameters; @@ -29,6 +30,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ParameterGroupImpl; import org.onap.policy.common.parameters.ValidationStatus; +import org.onap.policy.common.parameters.annotations.Min; import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; import org.onap.policy.common.parameters.annotations.Valid; @@ -41,6 +43,7 @@ import org.onap.policy.models.base.Validated; @Getter @NotNull @NotBlank +@NoArgsConstructor public class XacmlPdpParameterGroup extends ParameterGroupImpl { public static final String PARAM_POLICY_API = "policyApiParameters"; @@ -54,6 +57,13 @@ public class XacmlPdpParameterGroup extends ParameterGroupImpl { private TopicParameterGroup topicParameterGroup; @Valid private XacmlApplicationParameters applicationParameters; + /** + * Frequency, in seconds, with which to probe the heartbeat topic before sending the + * first heartbeat. Set to zero to disable probing. + */ + @Min(0) + private long probeHeartbeatTopicSec = 4; + /** * Create the xacml pdp parameter group. diff --git a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java index 4dd8a9b3..050d8b24 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java @@ -20,13 +20,12 @@ package org.onap.policy.pdpx.main.startstop; -import java.util.Arrays; import lombok.Getter; import lombok.Setter; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.endpoints.http.client.HttpClientConfigException; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; @@ -70,6 +69,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer { private final XacmlPdpParameterGroup xacmlPdpParameterGroup; /** + * POLICY-PDP-PAP client. + */ + private BidirectionalTopicClient topicClient; + + /** * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then * dispatches them to appropriate listener. */ @@ -108,8 +112,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer { this.xacmlPdpParameterGroup = xacmlPdpParameterGroup; this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - sinkClient = new TopicSinkClient(TOPIC); - heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state); + topicClient = new BidirectionalTopicClient(TOPIC, TOPIC); + sinkClient = new TopicSinkClient(topicClient.getSink()); + + heartbeat = new XacmlPdpHearbeatPublisher(topicClient, + xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state); /* * since the dispatcher isn't registered with the topic yet, we can go ahead @@ -123,7 +130,7 @@ public class XacmlPdpActivator extends ServiceManagerContainer { restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(), XacmlPdpAafFilter.class, XacmlPdpRestController.class); - } catch (RuntimeException | TopicSinkClientException | HttpClientConfigException e) { + } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) { throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e); } @@ -197,18 +204,14 @@ public class XacmlPdpActivator extends ServiceManagerContainer { * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { - for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) { - source.register(msgDispatcher); - } + topicClient.getSource().register(msgDispatcher); } /** * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { - for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) { - source.unregister(msgDispatcher); - } + topicClient.getSource().unregister(msgDispatcher); } /** diff --git a/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java b/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java index 7f902113..51689584 100644 --- a/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java +++ b/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java @@ -42,7 +42,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.pdpx.main.XacmlState; @@ -54,7 +56,10 @@ public class XacmlPdpHearbeatPublisherTest { private static final long INTERVAL_INVALID = 0; @Mock - private TopicSinkClient client; + private TopicSink sink; + + @Mock + private BidirectionalTopicClient checker; @Mock private XacmlState state; @@ -68,7 +73,6 @@ public class XacmlPdpHearbeatPublisherTest { @Mock private ScheduledFuture<?> timer2; - @Mock private PdpStatus status; private Queue<ScheduledFuture<?>> timers; @@ -81,13 +85,17 @@ public class XacmlPdpHearbeatPublisherTest { */ @Before public void setUp() { + when(sink.getTopic()).thenReturn("my-topic"); + when(checker.getSink()).thenReturn(sink); + when(checker.isReady()).thenReturn(true); when(state.genHeartbeat()).thenReturn(status); + status = new PdpStatus(); timers = new LinkedList<>(Arrays.asList(timer1, timer2)); when(executor.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(args -> timers.remove()); - publisher = new MyPublisher(client, state); + publisher = new MyPublisher(checker, 10, state); } @Test @@ -95,7 +103,75 @@ public class XacmlPdpHearbeatPublisherTest { publisher.run(); verify(state).genHeartbeat(); - verify(client).send(status); + verify(checker).send(any()); + } + + /** + * Tests the run() method when the probe is disabled. + */ + @Test + public void testRunNoProbe() throws CoderException { + publisher = new MyPublisher(checker, 0, state); + + publisher.run(); + + verify(checker, never()).isReady(); + verify(checker, never()).awaitReady(any(), anyLong()); + + verify(state).genHeartbeat(); + verify(checker).send(any()); + } + + /** + * Tests the run() method when the topic is not ready, and then becomes ready. + */ + @Test + public void testRunNotReady() throws CoderException { + // not ready yet + when(checker.isReady()).thenReturn(false); + when(checker.awaitReady(any(), anyLong())).thenReturn(false); + + publisher.run(); + verify(state, never()).genHeartbeat(); + verify(checker, never()).send(any()); + + // isReady is still false, but awaitReady is now true - should generate heartbeat + when(checker.awaitReady(any(), anyLong())).thenReturn(true); + + publisher.run(); + verify(state).genHeartbeat(); + verify(checker).send(any()); + + // now isReady is true, too - should not rerun awaitReady + when(checker.isReady()).thenReturn(true); + + publisher.run(); + verify(state, times(2)).genHeartbeat(); + verify(checker, times(2)).send(any()); + verify(checker, times(2)).awaitReady(any(), anyLong()); + } + + /** + * Tests the run() method when the checker throws an exception. + */ + @Test + public void testRunCheckerEx() throws CoderException { + // force it to call awaitReady + when(checker.isReady()).thenReturn(false); + + when(checker.awaitReady(any(), anyLong())) + .thenThrow(new CoderException("expected exception")) + .thenReturn(true); + + // exception thrown - should not generate heartbeat + publisher.run(); + verify(state, never()).genHeartbeat(); + verify(checker, never()).send(any()); + + // no exception this time - SHOULD generate heartbeat + publisher.run(); + verify(state).genHeartbeat(); + verify(checker).send(any()); } @Test @@ -103,6 +179,8 @@ public class XacmlPdpHearbeatPublisherTest { // not yet started publisher.terminate(); + verify(checker).stopWaiting(); + // now start it and then try again publisher.start(); @@ -156,7 +234,7 @@ public class XacmlPdpHearbeatPublisherTest { public void testStart() { publisher.start(); - verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_INTERVAL_MS, + verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_HB_INTERVAL_MS, TimeUnit.MILLISECONDS); // repeat - nothing more should happen @@ -168,7 +246,7 @@ public class XacmlPdpHearbeatPublisherTest { @Test public void testMakeTimerThread() { // create a plain listener to test the "real" makeTimer() method - publisher = new XacmlPdpHearbeatPublisher(client, state); + publisher = new XacmlPdpHearbeatPublisher(checker, 1, state); assertThatCode(() -> { publisher.start(); @@ -179,8 +257,8 @@ public class XacmlPdpHearbeatPublisherTest { private class MyPublisher extends XacmlPdpHearbeatPublisher { - public MyPublisher(TopicSinkClient topicSinkClient, XacmlState state) { - super(topicSinkClient, state); + public MyPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs, XacmlState state) { + super(topicChecker, probeHeartbeatTopicMs, state); } @Override diff --git a/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java b/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java index 4286ccf5..9025722c 100644 --- a/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java +++ b/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java @@ -35,6 +35,7 @@ import org.onap.policy.pdpx.main.PolicyXacmlPdpException; import org.onap.policy.pdpx.main.parameters.CommonTestData; import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterGroup; import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterHandler; +import org.powermock.reflect.Whitebox; /** @@ -42,6 +43,8 @@ import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterHandler; * */ public class TestXacmlPdpActivator extends CommonRest { + private static final String PROBE_FIELD_NAME = "probeHeartbeatTopicSec"; + private static XacmlPdpParameterGroup parGroup; private XacmlPdpActivator activator = null; @@ -67,6 +70,7 @@ public class TestXacmlPdpActivator extends CommonRest { @Override @Before public void setUp() { + Whitebox.setInternalState(parGroup, PROBE_FIELD_NAME, 4); activator = new XacmlPdpActivator(parGroup); } @@ -100,6 +104,14 @@ public class TestXacmlPdpActivator extends CommonRest { } @Test + public void testXacmlPdpActivator_NoProbe() throws Exception { + Whitebox.setInternalState(parGroup, PROBE_FIELD_NAME, 0); + activator = new XacmlPdpActivator(parGroup); + activator.start(); + assertTrue(activator.isAlive()); + } + + @Test public void testGetCurrent_testSetCurrent() { XacmlPdpActivator.setCurrent(activator); assertSame(activator, XacmlPdpActivator.getCurrent()); |