diff options
Diffstat (limited to 'main/src')
7 files changed, 210 insertions, 51 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/XacmlState.java b/main/src/main/java/org/onap/policy/pdpx/main/XacmlState.java index 17995fd6..d1e326f1 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/XacmlState.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/XacmlState.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; * Current state of this XACML PDP. */ public class XacmlState { + // The logger for this class private static final Logger LOGGER = LoggerFactory.getLogger(XacmlState.class); /** @@ -112,6 +113,9 @@ public class XacmlState { PdpStatus status2 = makeResponse(message, ""); + // start/stop rest controller based on state change + handleXacmlRestController(); + // these fields aren't needed in the response, so clear them out to avoid sending status2.setPolicies(null); @@ -165,4 +169,23 @@ public class XacmlState { status2.setResponse(resp); return status2; } + + /** + * Manages the Xacml-Pdp rest controller based on the Xacml-Pdp State. + * Current supported states: + * ACTIVE - rest service is running and handling requests + * PASSIVE - rest service is not running + */ + private void handleXacmlRestController() { + if (status.getState() == PdpState.ACTIVE) { + LOGGER.info("State change: {} - Starting rest controller", status.getState()); + XacmlPdpActivator.getCurrent().startXacmlRestController(); + } else if (status.getState() == PdpState.PASSIVE) { + LOGGER.info("State change: {} - Stopping rest controller", status.getState()); + XacmlPdpActivator.getCurrent().stopXacmlRestController(); + } else { + // unsupported state + LOGGER.warn("Unsupported state: {}", status.getState()); + } + } } diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java index 3177c096..8c1f7928 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,18 +25,24 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpTopicCheck; import org.onap.policy.pdpx.main.XacmlState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class XacmlPdpHearbeatPublisher implements Runnable { - public static final int DEFAULT_INTERVAL_MS = 60000; + public static final int DEFAULT_HB_INTERVAL_MS = 60000; private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpHearbeatPublisher.class); + private static final Coder CODER = new StandardCoder(); - private final TopicSinkClient topicSinkClient; + private final BidirectionalTopicClient topicChecker; + private final long probeHeartbeatTopicMs; /** * Tracks the state of this PDP. @@ -47,7 +53,7 @@ public class XacmlPdpHearbeatPublisher implements Runnable { * Current timer interval, in milliseconds. */ @Getter - private long intervalMs = DEFAULT_INTERVAL_MS; + private long intervalMs = DEFAULT_HB_INTERVAL_MS; private ScheduledExecutorService timerThread; @@ -57,26 +63,53 @@ public class XacmlPdpHearbeatPublisher implements Runnable { /** * Constructor for instantiating XacmlPdpPublisher. * - * @param topicSinkClient used to send heart beat message + * @param topicChecker used to check the topic before sending heart beat message + * @param probeHeartbeatTopicMs frequency, in milliseconds, with which to probe the + * heartbeat topic before sending the first heartbeat. Zero disables probing * @param state tracks the state of this PDP */ - public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlState state) { - this.topicSinkClient = topicSinkClient; + public XacmlPdpHearbeatPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs, + XacmlState state) { + LOGGER.info("heartbeat topic probe {}ms", probeHeartbeatTopicMs); + this.topicChecker = topicChecker; + this.probeHeartbeatTopicMs = probeHeartbeatTopicMs; this.currentState = state; } @Override public void run() { - PdpStatus message = currentState.genHeartbeat(); - LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message); + try { + if (!isTopicReady()) { + return; + } + + PdpStatus message = currentState.genHeartbeat(); + LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message); + + String json = CODER.encode(message); + topicChecker.send(json); - topicSinkClient.send(message); + } catch (RuntimeException | CoderException e) { + LOGGER.warn("send to {} failed because of {}", topicChecker.getSink().getTopic(), e.getMessage(), e); + } + } + + private boolean isTopicReady() throws CoderException { + if (probeHeartbeatTopicMs <= 0 || topicChecker.isReady()) { + return true; + } + + var check = new PdpTopicCheck(); + check.setName(XacmlState.PDP_NAME); + return topicChecker.awaitReady(check, probeHeartbeatTopicMs); } /** * Method to terminate the heart beat. */ public synchronized void terminate() { + topicChecker.stopWaiting(); + if (timerThread != null) { timerThread.shutdownNow(); timerThread = null; diff --git a/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java b/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java index b994fe9e..a9332e90 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java @@ -22,6 +22,7 @@ package org.onap.policy.pdpx.main.parameters; import lombok.Getter; +import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.parameters.RestClientParameters; import org.onap.policy.common.endpoints.parameters.RestServerParameters; @@ -29,6 +30,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ParameterGroupImpl; import org.onap.policy.common.parameters.ValidationStatus; +import org.onap.policy.common.parameters.annotations.Min; import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; import org.onap.policy.common.parameters.annotations.Valid; @@ -41,6 +43,7 @@ import org.onap.policy.models.base.Validated; @Getter @NotNull @NotBlank +@NoArgsConstructor public class XacmlPdpParameterGroup extends ParameterGroupImpl { public static final String PARAM_POLICY_API = "policyApiParameters"; @@ -54,6 +57,13 @@ public class XacmlPdpParameterGroup extends ParameterGroupImpl { private TopicParameterGroup topicParameterGroup; @Valid private XacmlApplicationParameters applicationParameters; + /** + * Frequency, in seconds, with which to probe the heartbeat topic before sending the + * first heartbeat. Set to zero to disable probing. + */ + @Min(0) + private long probeHeartbeatTopicSec = 4; + /** * Create the xacml pdp parameter group. diff --git a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java index 4dd8a9b3..892b3835 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java @@ -20,13 +20,12 @@ package org.onap.policy.pdpx.main.startstop; -import java.util.Arrays; import lombok.Getter; import lombok.Setter; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.endpoints.http.client.HttpClientConfigException; import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; @@ -70,6 +69,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer { private final XacmlPdpParameterGroup xacmlPdpParameterGroup; /** + * POLICY-PDP-PAP client. + */ + private BidirectionalTopicClient topicClient; + + /** * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then * dispatches them to appropriate listener. */ @@ -108,8 +112,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer { this.xacmlPdpParameterGroup = xacmlPdpParameterGroup; this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - sinkClient = new TopicSinkClient(TOPIC); - heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state); + topicClient = new BidirectionalTopicClient(TOPIC, TOPIC); + sinkClient = new TopicSinkClient(topicClient.getSink()); + + heartbeat = new XacmlPdpHearbeatPublisher(topicClient, + xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state); /* * since the dispatcher isn't registered with the topic yet, we can go ahead @@ -123,7 +130,7 @@ public class XacmlPdpActivator extends ServiceManagerContainer { restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(), XacmlPdpAafFilter.class, XacmlPdpRestController.class); - } catch (RuntimeException | TopicSinkClientException | HttpClientConfigException e) { + } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) { throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e); } @@ -145,16 +152,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer { addAction("Terminate PDP", () -> { }, () -> sendTerminateMessage(sinkClient, state)); - // initial heart beats act as registration messages addAction("Heartbeat Publisher", heartbeat::start, heartbeat::terminate); - addAction("REST Server", - restServer::start, - restServer::stop); - // @formatter:on } @@ -197,18 +199,14 @@ public class XacmlPdpActivator extends ServiceManagerContainer { * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { - for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) { - source.register(msgDispatcher); - } + topicClient.getSource().register(msgDispatcher); } /** * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { - for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) { - source.unregister(msgDispatcher); - } + topicClient.getSource().unregister(msgDispatcher); } /** diff --git a/main/src/test/java/org/onap/policy/pdpx/main/XacmlStateTest.java b/main/src/test/java/org/onap/policy/pdpx/main/XacmlStateTest.java index feaaf4f6..5ff3d5c7 100644 --- a/main/src/test/java/org/onap/policy/pdpx/main/XacmlStateTest.java +++ b/main/src/test/java/org/onap/policy/pdpx/main/XacmlStateTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.junit.AfterClass; @@ -129,10 +130,12 @@ public class XacmlStateTest { req.setState(PdpState.ACTIVE); status = state.updateInternalState(req); assertEquals(PdpState.ACTIVE, status.getState()); + verify(act).startXacmlRestController(); req.setState(PdpState.PASSIVE); status = state.updateInternalState(req); assertEquals(PdpState.PASSIVE, status.getState()); + verify(act).stopXacmlRestController(); } @Test diff --git a/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java b/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java index 7f902113..51689584 100644 --- a/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java +++ b/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java @@ -42,7 +42,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient; +import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.pdpx.main.XacmlState; @@ -54,7 +56,10 @@ public class XacmlPdpHearbeatPublisherTest { private static final long INTERVAL_INVALID = 0; @Mock - private TopicSinkClient client; + private TopicSink sink; + + @Mock + private BidirectionalTopicClient checker; @Mock private XacmlState state; @@ -68,7 +73,6 @@ public class XacmlPdpHearbeatPublisherTest { @Mock private ScheduledFuture<?> timer2; - @Mock private PdpStatus status; private Queue<ScheduledFuture<?>> timers; @@ -81,13 +85,17 @@ public class XacmlPdpHearbeatPublisherTest { */ @Before public void setUp() { + when(sink.getTopic()).thenReturn("my-topic"); + when(checker.getSink()).thenReturn(sink); + when(checker.isReady()).thenReturn(true); when(state.genHeartbeat()).thenReturn(status); + status = new PdpStatus(); timers = new LinkedList<>(Arrays.asList(timer1, timer2)); when(executor.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(args -> timers.remove()); - publisher = new MyPublisher(client, state); + publisher = new MyPublisher(checker, 10, state); } @Test @@ -95,7 +103,75 @@ public class XacmlPdpHearbeatPublisherTest { publisher.run(); verify(state).genHeartbeat(); - verify(client).send(status); + verify(checker).send(any()); + } + + /** + * Tests the run() method when the probe is disabled. + */ + @Test + public void testRunNoProbe() throws CoderException { + publisher = new MyPublisher(checker, 0, state); + + publisher.run(); + + verify(checker, never()).isReady(); + verify(checker, never()).awaitReady(any(), anyLong()); + + verify(state).genHeartbeat(); + verify(checker).send(any()); + } + + /** + * Tests the run() method when the topic is not ready, and then becomes ready. + */ + @Test + public void testRunNotReady() throws CoderException { + // not ready yet + when(checker.isReady()).thenReturn(false); + when(checker.awaitReady(any(), anyLong())).thenReturn(false); + + publisher.run(); + verify(state, never()).genHeartbeat(); + verify(checker, never()).send(any()); + + // isReady is still false, but awaitReady is now true - should generate heartbeat + when(checker.awaitReady(any(), anyLong())).thenReturn(true); + + publisher.run(); + verify(state).genHeartbeat(); + verify(checker).send(any()); + + // now isReady is true, too - should not rerun awaitReady + when(checker.isReady()).thenReturn(true); + + publisher.run(); + verify(state, times(2)).genHeartbeat(); + verify(checker, times(2)).send(any()); + verify(checker, times(2)).awaitReady(any(), anyLong()); + } + + /** + * Tests the run() method when the checker throws an exception. + */ + @Test + public void testRunCheckerEx() throws CoderException { + // force it to call awaitReady + when(checker.isReady()).thenReturn(false); + + when(checker.awaitReady(any(), anyLong())) + .thenThrow(new CoderException("expected exception")) + .thenReturn(true); + + // exception thrown - should not generate heartbeat + publisher.run(); + verify(state, never()).genHeartbeat(); + verify(checker, never()).send(any()); + + // no exception this time - SHOULD generate heartbeat + publisher.run(); + verify(state).genHeartbeat(); + verify(checker).send(any()); } @Test @@ -103,6 +179,8 @@ public class XacmlPdpHearbeatPublisherTest { // not yet started publisher.terminate(); + verify(checker).stopWaiting(); + // now start it and then try again publisher.start(); @@ -156,7 +234,7 @@ public class XacmlPdpHearbeatPublisherTest { public void testStart() { publisher.start(); - verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_INTERVAL_MS, + verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_HB_INTERVAL_MS, TimeUnit.MILLISECONDS); // repeat - nothing more should happen @@ -168,7 +246,7 @@ public class XacmlPdpHearbeatPublisherTest { @Test public void testMakeTimerThread() { // create a plain listener to test the "real" makeTimer() method - publisher = new XacmlPdpHearbeatPublisher(client, state); + publisher = new XacmlPdpHearbeatPublisher(checker, 1, state); assertThatCode(() -> { publisher.start(); @@ -179,8 +257,8 @@ public class XacmlPdpHearbeatPublisherTest { private class MyPublisher extends XacmlPdpHearbeatPublisher { - public MyPublisher(TopicSinkClient topicSinkClient, XacmlState state) { - super(topicSinkClient, state); + public MyPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs, XacmlState state) { + super(topicChecker, probeHeartbeatTopicMs, state); } @Override diff --git a/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java b/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java index 4286ccf5..c874761d 100644 --- a/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java +++ b/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,6 +35,7 @@ import org.onap.policy.pdpx.main.PolicyXacmlPdpException; import org.onap.policy.pdpx.main.parameters.CommonTestData; import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterGroup; import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterHandler; +import org.powermock.reflect.Whitebox; /** @@ -42,6 +43,8 @@ import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterHandler; * */ public class TestXacmlPdpActivator extends CommonRest { + private static final String PROBE_FIELD_NAME = "probeHeartbeatTopicSec"; + private static XacmlPdpParameterGroup parGroup; private XacmlPdpActivator activator = null; @@ -67,20 +70,10 @@ public class TestXacmlPdpActivator extends CommonRest { @Override @Before public void setUp() { + Whitebox.setInternalState(parGroup, PROBE_FIELD_NAME, 4); activator = new XacmlPdpActivator(parGroup); } - /** - * Teardown tests. - * @throws PolicyXacmlPdpException on termination errors - */ - @After - public void teardown() throws PolicyXacmlPdpException { - if (activator != null && activator.isAlive()) { - activator.stop(); - } - } - @Test public void testXacmlPdpActivator() throws Exception { assertFalse(activator.isAlive()); @@ -88,15 +81,25 @@ public class TestXacmlPdpActivator extends CommonRest { activator.start(); assertTrue(activator.isAlive()); + // XacmlPdp starts in PASSIVE state so the rest controller should not be alive + assertFalse(activator.isXacmlRestControllerAlive()); assertTrue(activator.getParameterGroup().isValid()); assertEquals(CommonTestData.PDPX_PARAMETER_GROUP_NAME, activator.getParameterGroup().getName()); assertEquals(CommonTestData.PDPX_GROUP, activator.getParameterGroup().getPdpGroup()); + activator.startXacmlRestController(); + assertTrue(activator.isXacmlRestControllerAlive()); + activator.stopXacmlRestController(); assertFalse(activator.isXacmlRestControllerAlive()); + } - activator.startXacmlRestController(); - assertTrue(activator.isXacmlRestControllerAlive()); + @Test + public void testXacmlPdpActivator_NoProbe() throws Exception { + Whitebox.setInternalState(parGroup, PROBE_FIELD_NAME, 0); + activator = new XacmlPdpActivator(parGroup); + activator.start(); + assertTrue(activator.isAlive()); } @Test @@ -111,4 +114,15 @@ public class TestXacmlPdpActivator extends CommonRest { activator.stop(); assertFalse(activator.isAlive()); } + + /** + * Teardown tests. + * @throws PolicyXacmlPdpException on termination errors + */ + @After + public void teardown() throws PolicyXacmlPdpException { + if (activator != null && activator.isAlive()) { + activator.stop(); + } + } } |