aboutsummaryrefslogtreecommitdiffstats
path: root/main
diff options
context:
space:
mode:
Diffstat (limited to 'main')
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java55
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java10
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java27
-rw-r--r--main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java96
-rw-r--r--main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java12
5 files changed, 168 insertions, 32 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
index 3177c096..8c1f7928 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,18 +25,24 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpTopicCheck;
import org.onap.policy.pdpx.main.XacmlState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class XacmlPdpHearbeatPublisher implements Runnable {
- public static final int DEFAULT_INTERVAL_MS = 60000;
+ public static final int DEFAULT_HB_INTERVAL_MS = 60000;
private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpHearbeatPublisher.class);
+ private static final Coder CODER = new StandardCoder();
- private final TopicSinkClient topicSinkClient;
+ private final BidirectionalTopicClient topicChecker;
+ private final long probeHeartbeatTopicMs;
/**
* Tracks the state of this PDP.
@@ -47,7 +53,7 @@ public class XacmlPdpHearbeatPublisher implements Runnable {
* Current timer interval, in milliseconds.
*/
@Getter
- private long intervalMs = DEFAULT_INTERVAL_MS;
+ private long intervalMs = DEFAULT_HB_INTERVAL_MS;
private ScheduledExecutorService timerThread;
@@ -57,26 +63,53 @@ public class XacmlPdpHearbeatPublisher implements Runnable {
/**
* Constructor for instantiating XacmlPdpPublisher.
*
- * @param topicSinkClient used to send heart beat message
+ * @param topicChecker used to check the topic before sending heart beat message
+ * @param probeHeartbeatTopicMs frequency, in milliseconds, with which to probe the
+ * heartbeat topic before sending the first heartbeat. Zero disables probing
* @param state tracks the state of this PDP
*/
- public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlState state) {
- this.topicSinkClient = topicSinkClient;
+ public XacmlPdpHearbeatPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs,
+ XacmlState state) {
+ LOGGER.info("heartbeat topic probe {}ms", probeHeartbeatTopicMs);
+ this.topicChecker = topicChecker;
+ this.probeHeartbeatTopicMs = probeHeartbeatTopicMs;
this.currentState = state;
}
@Override
public void run() {
- PdpStatus message = currentState.genHeartbeat();
- LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message);
+ try {
+ if (!isTopicReady()) {
+ return;
+ }
+
+ PdpStatus message = currentState.genHeartbeat();
+ LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message);
+
+ String json = CODER.encode(message);
+ topicChecker.send(json);
- topicSinkClient.send(message);
+ } catch (RuntimeException | CoderException e) {
+ LOGGER.warn("send to {} failed because of {}", topicChecker.getSink().getTopic(), e.getMessage(), e);
+ }
+ }
+
+ private boolean isTopicReady() throws CoderException {
+ if (probeHeartbeatTopicMs <= 0 || topicChecker.isReady()) {
+ return true;
+ }
+
+ var check = new PdpTopicCheck();
+ check.setName(XacmlState.PDP_NAME);
+ return topicChecker.awaitReady(check, probeHeartbeatTopicMs);
}
/**
* Method to terminate the heart beat.
*/
public synchronized void terminate() {
+ topicChecker.stopWaiting();
+
if (timerThread != null) {
timerThread.shutdownNow();
timerThread = null;
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java b/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java
index b994fe9e..a9332e90 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java
@@ -22,6 +22,7 @@
package org.onap.policy.pdpx.main.parameters;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.parameters.RestClientParameters;
import org.onap.policy.common.endpoints.parameters.RestServerParameters;
@@ -29,6 +30,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.common.parameters.ParameterGroupImpl;
import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.parameters.annotations.Min;
import org.onap.policy.common.parameters.annotations.NotBlank;
import org.onap.policy.common.parameters.annotations.NotNull;
import org.onap.policy.common.parameters.annotations.Valid;
@@ -41,6 +43,7 @@ import org.onap.policy.models.base.Validated;
@Getter
@NotNull
@NotBlank
+@NoArgsConstructor
public class XacmlPdpParameterGroup extends ParameterGroupImpl {
public static final String PARAM_POLICY_API = "policyApiParameters";
@@ -54,6 +57,13 @@ public class XacmlPdpParameterGroup extends ParameterGroupImpl {
private TopicParameterGroup topicParameterGroup;
@Valid
private XacmlApplicationParameters applicationParameters;
+ /**
+ * Frequency, in seconds, with which to probe the heartbeat topic before sending the
+ * first heartbeat. Set to zero to disable probing.
+ */
+ @Min(0)
+ private long probeHeartbeatTopicSec = 4;
+
/**
* Create the xacml pdp parameter group.
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
index 4dd8a9b3..050d8b24 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
@@ -20,13 +20,12 @@
package org.onap.policy.pdpx.main.startstop;
-import java.util.Arrays;
import lombok.Getter;
import lombok.Setter;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
@@ -70,6 +69,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
private final XacmlPdpParameterGroup xacmlPdpParameterGroup;
/**
+ * POLICY-PDP-PAP client.
+ */
+ private BidirectionalTopicClient topicClient;
+
+ /**
* Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then
* dispatches them to appropriate listener.
*/
@@ -108,8 +112,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
- sinkClient = new TopicSinkClient(TOPIC);
- heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state);
+ topicClient = new BidirectionalTopicClient(TOPIC, TOPIC);
+ sinkClient = new TopicSinkClient(topicClient.getSink());
+
+ heartbeat = new XacmlPdpHearbeatPublisher(topicClient,
+ xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state);
/*
* since the dispatcher isn't registered with the topic yet, we can go ahead
@@ -123,7 +130,7 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(),
XacmlPdpAafFilter.class, XacmlPdpRestController.class);
- } catch (RuntimeException | TopicSinkClientException | HttpClientConfigException e) {
+ } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) {
throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
}
@@ -197,18 +204,14 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher() {
- for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
- source.register(msgDispatcher);
- }
+ topicClient.getSource().register(msgDispatcher);
}
/**
* Unregisters the dispatcher from the topic source(s).
*/
private void unregisterMsgDispatcher() {
- for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
- source.unregister(msgDispatcher);
- }
+ topicClient.getSource().unregister(msgDispatcher);
}
/**
diff --git a/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java b/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java
index 7f902113..51689584 100644
--- a/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java
+++ b/main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java
@@ -42,7 +42,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.pdpx.main.XacmlState;
@@ -54,7 +56,10 @@ public class XacmlPdpHearbeatPublisherTest {
private static final long INTERVAL_INVALID = 0;
@Mock
- private TopicSinkClient client;
+ private TopicSink sink;
+
+ @Mock
+ private BidirectionalTopicClient checker;
@Mock
private XacmlState state;
@@ -68,7 +73,6 @@ public class XacmlPdpHearbeatPublisherTest {
@Mock
private ScheduledFuture<?> timer2;
- @Mock
private PdpStatus status;
private Queue<ScheduledFuture<?>> timers;
@@ -81,13 +85,17 @@ public class XacmlPdpHearbeatPublisherTest {
*/
@Before
public void setUp() {
+ when(sink.getTopic()).thenReturn("my-topic");
+ when(checker.getSink()).thenReturn(sink);
+ when(checker.isReady()).thenReturn(true);
when(state.genHeartbeat()).thenReturn(status);
+ status = new PdpStatus();
timers = new LinkedList<>(Arrays.asList(timer1, timer2));
when(executor.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(args -> timers.remove());
- publisher = new MyPublisher(client, state);
+ publisher = new MyPublisher(checker, 10, state);
}
@Test
@@ -95,7 +103,75 @@ public class XacmlPdpHearbeatPublisherTest {
publisher.run();
verify(state).genHeartbeat();
- verify(client).send(status);
+ verify(checker).send(any());
+ }
+
+ /**
+ * Tests the run() method when the probe is disabled.
+ */
+ @Test
+ public void testRunNoProbe() throws CoderException {
+ publisher = new MyPublisher(checker, 0, state);
+
+ publisher.run();
+
+ verify(checker, never()).isReady();
+ verify(checker, never()).awaitReady(any(), anyLong());
+
+ verify(state).genHeartbeat();
+ verify(checker).send(any());
+ }
+
+ /**
+ * Tests the run() method when the topic is not ready, and then becomes ready.
+ */
+ @Test
+ public void testRunNotReady() throws CoderException {
+ // not ready yet
+ when(checker.isReady()).thenReturn(false);
+ when(checker.awaitReady(any(), anyLong())).thenReturn(false);
+
+ publisher.run();
+ verify(state, never()).genHeartbeat();
+ verify(checker, never()).send(any());
+
+ // isReady is still false, but awaitReady is now true - should generate heartbeat
+ when(checker.awaitReady(any(), anyLong())).thenReturn(true);
+
+ publisher.run();
+ verify(state).genHeartbeat();
+ verify(checker).send(any());
+
+ // now isReady is true, too - should not rerun awaitReady
+ when(checker.isReady()).thenReturn(true);
+
+ publisher.run();
+ verify(state, times(2)).genHeartbeat();
+ verify(checker, times(2)).send(any());
+ verify(checker, times(2)).awaitReady(any(), anyLong());
+ }
+
+ /**
+ * Tests the run() method when the checker throws an exception.
+ */
+ @Test
+ public void testRunCheckerEx() throws CoderException {
+ // force it to call awaitReady
+ when(checker.isReady()).thenReturn(false);
+
+ when(checker.awaitReady(any(), anyLong()))
+ .thenThrow(new CoderException("expected exception"))
+ .thenReturn(true);
+
+ // exception thrown - should not generate heartbeat
+ publisher.run();
+ verify(state, never()).genHeartbeat();
+ verify(checker, never()).send(any());
+
+ // no exception this time - SHOULD generate heartbeat
+ publisher.run();
+ verify(state).genHeartbeat();
+ verify(checker).send(any());
}
@Test
@@ -103,6 +179,8 @@ public class XacmlPdpHearbeatPublisherTest {
// not yet started
publisher.terminate();
+ verify(checker).stopWaiting();
+
// now start it and then try again
publisher.start();
@@ -156,7 +234,7 @@ public class XacmlPdpHearbeatPublisherTest {
public void testStart() {
publisher.start();
- verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_INTERVAL_MS,
+ verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_HB_INTERVAL_MS,
TimeUnit.MILLISECONDS);
// repeat - nothing more should happen
@@ -168,7 +246,7 @@ public class XacmlPdpHearbeatPublisherTest {
@Test
public void testMakeTimerThread() {
// create a plain listener to test the "real" makeTimer() method
- publisher = new XacmlPdpHearbeatPublisher(client, state);
+ publisher = new XacmlPdpHearbeatPublisher(checker, 1, state);
assertThatCode(() -> {
publisher.start();
@@ -179,8 +257,8 @@ public class XacmlPdpHearbeatPublisherTest {
private class MyPublisher extends XacmlPdpHearbeatPublisher {
- public MyPublisher(TopicSinkClient topicSinkClient, XacmlState state) {
- super(topicSinkClient, state);
+ public MyPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs, XacmlState state) {
+ super(topicChecker, probeHeartbeatTopicMs, state);
}
@Override
diff --git a/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java b/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java
index 4286ccf5..9025722c 100644
--- a/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java
+++ b/main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java
@@ -35,6 +35,7 @@ import org.onap.policy.pdpx.main.PolicyXacmlPdpException;
import org.onap.policy.pdpx.main.parameters.CommonTestData;
import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterGroup;
import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterHandler;
+import org.powermock.reflect.Whitebox;
/**
@@ -42,6 +43,8 @@ import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterHandler;
*
*/
public class TestXacmlPdpActivator extends CommonRest {
+ private static final String PROBE_FIELD_NAME = "probeHeartbeatTopicSec";
+
private static XacmlPdpParameterGroup parGroup;
private XacmlPdpActivator activator = null;
@@ -67,6 +70,7 @@ public class TestXacmlPdpActivator extends CommonRest {
@Override
@Before
public void setUp() {
+ Whitebox.setInternalState(parGroup, PROBE_FIELD_NAME, 4);
activator = new XacmlPdpActivator(parGroup);
}
@@ -100,6 +104,14 @@ public class TestXacmlPdpActivator extends CommonRest {
}
@Test
+ public void testXacmlPdpActivator_NoProbe() throws Exception {
+ Whitebox.setInternalState(parGroup, PROBE_FIELD_NAME, 0);
+ activator = new XacmlPdpActivator(parGroup);
+ activator.start();
+ assertTrue(activator.isAlive());
+ }
+
+ @Test
public void testGetCurrent_testSetCurrent() {
XacmlPdpActivator.setCurrent(activator);
assertSame(activator, XacmlPdpActivator.getCurrent());