aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2019-10-25 09:25:59 -0400
committerJim Hahn <jrh3@att.com>2019-10-25 09:25:59 -0400
commit45c8e61888711a3d6f5913d1d7ddf640ff995b12 (patch)
tree5b812c70da20f1669c7566c008391845617fbbd9
parent1e896c1a2d599c02a4494e868cac5fe5399f819f (diff)
Add notifier for generating notifications
Also modified the Publisher class to make it generic so that it could be used to publish PdpMessage AND PolicyNotification. Issue-ID: POLICY-1841 Signed-off-by: Jim Hahn <jrh3@att.com> Change-Id: I305de21a4ef84730f163af63446bafadab11a809
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java15
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/notification/PolicyNotifier.java153
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/CommonRequestBase.java2
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java6
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/notification/PolicyNotifierTest.java214
5 files changed, 379 insertions, 11 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java b/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java
index ec54dd5a..6d2d8358 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java
@@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.pap.main.PolicyPapException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,8 +38,10 @@ import org.slf4j.LoggerFactory;
*
* <p>This class has not been tested for multiple threads invoking {@link #run()}
* simultaneously.
+ *
+ * @param <T> type of message published by this publisher
*/
-public class Publisher implements Runnable {
+public class Publisher<T> implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
/**
@@ -51,7 +52,7 @@ public class Publisher implements Runnable {
/**
* Request queue. The references may contain {@code null}.
*/
- private final BlockingQueue<QueueToken<PdpMessage>> queue = new LinkedBlockingQueue<>();
+ private final BlockingQueue<QueueToken<T>> queue = new LinkedBlockingQueue<>();
/**
* Set to {@code true} to cause the publisher to stop running.
@@ -88,7 +89,7 @@ public class Publisher implements Runnable {
*
* @param ref reference to the message to be published
*/
- public void enqueue(QueueToken<PdpMessage> ref) {
+ public void enqueue(QueueToken<T> ref) {
queue.add(ref);
}
@@ -98,7 +99,7 @@ public class Publisher implements Runnable {
@Override
public void run() {
for (;;) {
- QueueToken<PdpMessage> token = getNext();
+ QueueToken<T> token = getNext();
if (stopNow) {
// unblock any other publisher threads
@@ -106,7 +107,7 @@ public class Publisher implements Runnable {
break;
}
- PdpMessage data = token.replaceItem(null);
+ T data = token.replaceItem(null);
if (data != null) {
client.send(data);
}
@@ -120,7 +121,7 @@ public class Publisher implements Runnable {
* @return the next item, or a reference containing {@code null} if this is
* interrupted
*/
- private QueueToken<PdpMessage> getNext() {
+ private QueueToken<T> getNext() {
try {
return queue.take();
diff --git a/main/src/main/java/org/onap/policy/pap/main/notification/PolicyNotifier.java b/main/src/main/java/org/onap/policy/pap/main/notification/PolicyNotifier.java
new file mode 100644
index 00000000..97a862be
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/notification/PolicyNotifier.java
@@ -0,0 +1,153 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.notification;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.onap.policy.models.pap.concepts.PolicyNotification;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.QueueToken;
+
+/**
+ * Notifier for completion of policy updates.
+ */
+public class PolicyNotifier {
+ /**
+ * Notification publisher.
+ */
+ private final Publisher<PolicyNotification> publisher;
+
+ /**
+ * Deployment tracker.
+ */
+ private final PolicyDeployTracker deployTracker = makeDeploymentTracker();
+
+ /**
+ * Undeployment tracker.
+ */
+ private final PolicyUndeployTracker undeployTracker = makeUndeploymentTracker();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param publisher notification publisher
+ */
+ public PolicyNotifier(Publisher<PolicyNotification> publisher) {
+ this.publisher = publisher;
+ }
+
+ /**
+ * Adds data to the deployment tracker. If a PDP appears within the undeployment
+ * tracker, then it's removed from there.
+ *
+ * @param data data to be added
+ */
+ public synchronized void addDeploymentData(PolicyPdpNotificationData data) {
+ PolicyNotification notification = new PolicyNotification();
+
+ undeployTracker.removeData(data, notification.getDeleted());
+ deployTracker.addData(data);
+
+ publish(notification);
+ }
+
+ /**
+ * Adds data to the undeployment tracker. If a PDP appears within the deployment
+ * tracker, then it's removed from there.
+ *
+ * @param data data to be added
+ */
+ public synchronized void addUndeploymentData(PolicyPdpNotificationData data) {
+ PolicyNotification notification = new PolicyNotification();
+
+ deployTracker.removeData(data, notification.getAdded());
+ undeployTracker.addData(data);
+
+ publish(notification);
+ }
+
+ /**
+ * Processes a response from a PDP.
+ *
+ * @param pdp PDP of interest
+ * @param activePolicies policies that are still active on the PDP, as specified in
+ * the response
+ */
+ public synchronized void processResponse(String pdp, Collection<ToscaPolicyIdentifier> activePolicies) {
+ processResponse(pdp, new HashSet<>(activePolicies));
+ }
+
+ /**
+ * Processes a response from a PDP.
+ *
+ * @param pdp PDP of interest
+ * @param activePolicies policies that are still active on the PDP, as specified in
+ * the response
+ */
+ public synchronized void processResponse(String pdp, Set<ToscaPolicyIdentifier> activePolicies) {
+ PolicyNotification notification = new PolicyNotification();
+
+ undeployTracker.processResponse(pdp, activePolicies, notification.getDeleted());
+ deployTracker.processResponse(pdp, activePolicies, notification.getAdded());
+
+ publish(notification);
+ }
+
+ /**
+ * Removes a PDP from any policies still awaiting responses from it, generating
+ * notifications for any of those policies that become complete as a result.
+ *
+ * @param pdp PDP to be removed
+ */
+ public synchronized void removePdp(String pdp) {
+ PolicyNotification notification = new PolicyNotification();
+
+ undeployTracker.removePdp(pdp, notification.getDeleted());
+ deployTracker.removePdp(pdp, notification.getAdded());
+
+ publish(notification);
+ }
+
+ /**
+ * Publishes a notification, if it is not empty.
+ *
+ * @param notification notification to be published
+ */
+ private void publish(PolicyNotification notification) {
+ if (!notification.isEmpty()) {
+ publisher.enqueue(new QueueToken<>(notification));
+ }
+ }
+
+
+ // the following methods may be overridden by junit tests
+
+ protected PolicyDeployTracker makeDeploymentTracker() {
+ return new PolicyDeployTracker();
+ }
+
+ protected PolicyUndeployTracker makeUndeploymentTracker() {
+ return new PolicyUndeployTracker();
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/CommonRequestBase.java b/main/src/test/java/org/onap/policy/pap/main/comm/CommonRequestBase.java
index ceda1bab..0337161e 100644
--- a/main/src/test/java/org/onap/policy/pap/main/comm/CommonRequestBase.java
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/CommonRequestBase.java
@@ -69,7 +69,7 @@ public class CommonRequestBase {
protected static final PdpState DIFF_STATE = PdpState.TERMINATED;
protected static final int RETRIES = 1;
- protected Publisher publisher;
+ protected Publisher<PdpMessage> publisher;
protected RequestIdDispatcher<PdpStatus> dispatcher;
protected Object lock;
protected TimerManager timers;
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java
index 4b9bef71..2c0479b1 100644
--- a/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java
@@ -75,7 +75,7 @@ public class PublisherTest extends Threaded {
*/
private static final long MAX_WAIT_MS = 5000;
- private Publisher pub;
+ private Publisher<PdpMessage> pub;
private MyListener listener;
/**
@@ -108,7 +108,7 @@ public class PublisherTest extends Threaded {
public void setUp() throws Exception {
super.setUp();
- pub = new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP);
+ pub = new Publisher<>(PapConstants.TOPIC_POLICY_PDP_PAP);
listener = new MyListener();
TopicEndpointManager.getManager().getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).register(listener);
@@ -146,7 +146,7 @@ public class PublisherTest extends Threaded {
@Test
public void testPublisher_Ex() throws Exception {
- assertThatThrownBy(() -> new Publisher("unknwon-topic")).isInstanceOf(PolicyPapException.class);
+ assertThatThrownBy(() -> new Publisher<>("unknwon-topic")).isInstanceOf(PolicyPapException.class);
}
@Test
diff --git a/main/src/test/java/org/onap/policy/pap/main/notification/PolicyNotifierTest.java b/main/src/test/java/org/onap/policy/pap/main/notification/PolicyNotifierTest.java
new file mode 100644
index 00000000..1c65dd10
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/notification/PolicyNotifierTest.java
@@ -0,0 +1,214 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.notification;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.models.pap.concepts.PolicyNotification;
+import org.onap.policy.models.pap.concepts.PolicyStatus;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.QueueToken;
+
+public class PolicyNotifierTest extends PolicyCommonSupport {
+
+ @Mock
+ private Publisher<PolicyNotification> publisher;
+
+ @Mock
+ private PolicyDeployTracker deploy;
+
+ @Mock
+ private PolicyUndeployTracker undeploy;
+
+ @Mock
+ private PolicyStatus status1;
+
+ @Mock
+ private PolicyStatus status2;
+
+ @Mock
+ private PolicyStatus status3;
+
+ @Mock
+ private PolicyStatus status4;
+
+ @Captor
+ ArgumentCaptor<QueueToken<PolicyNotification>> notifyCaptor;
+
+ private MyNotifier notifier;
+
+ /**
+ * Creates various objects, including {@link #notifier}.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ super.setUp();
+
+ notifier = new MyNotifier(publisher);
+ }
+
+ @Test
+ public void testAddDeploymentData() {
+ doAnswer(addStatus(1, status1, status2)).when(undeploy).removeData(any(), any());
+
+ PolicyPdpNotificationData data = makeData(policy1, PDP1, PDP2);
+ notifier.addDeploymentData(data);
+
+ verify(deploy).addData(data);
+ verify(undeploy).removeData(eq(data), any());
+
+ PolicyNotification notification = getNotification();
+ assertEquals(Arrays.asList(status1, status2), notification.getDeleted());
+ assertTrue(notification.getAdded().isEmpty());
+ }
+
+ @Test
+ public void testAddUndeploymentData() {
+ doAnswer(addStatus(1, status1, status2)).when(deploy).removeData(any(), any());
+
+ PolicyPdpNotificationData data = makeData(policy1, PDP1, PDP2);
+ notifier.addUndeploymentData(data);
+
+ verify(undeploy).addData(data);
+ verify(deploy).removeData(eq(data), any());
+
+ PolicyNotification notification = getNotification();
+ assertEquals(Arrays.asList(status1, status2), notification.getAdded());
+ assertTrue(notification.getDeleted().isEmpty());
+ }
+
+ @Test
+ public void testProcessResponseString() {
+ doAnswer(addStatus(2, status1, status2)).when(deploy).processResponse(eq(PDP1), any(), any());
+ doAnswer(addStatus(2, status3, status4)).when(undeploy).processResponse(eq(PDP1), any(), any());
+
+ List<ToscaPolicyIdentifier> activePolicies = Arrays.asList(policy1, policy2);
+ notifier.processResponse(PDP1, activePolicies);
+
+ PolicyNotification notification = getNotification();
+ assertEquals(Arrays.asList(status1, status2), notification.getAdded());
+ assertEquals(Arrays.asList(status3, status4), notification.getDeleted());
+ }
+
+ @Test
+ public void testRemovePdp() {
+ doAnswer(addStatus(1, status1, status2)).when(deploy).removePdp(eq(PDP1), any());
+ doAnswer(addStatus(1, status3, status4)).when(undeploy).removePdp(eq(PDP1), any());
+
+ notifier.removePdp(PDP1);
+
+ PolicyNotification notification = getNotification();
+ assertEquals(Arrays.asList(status1, status2), notification.getAdded());
+ assertEquals(Arrays.asList(status3, status4), notification.getDeleted());
+ }
+
+ /**
+ * Tests publish(), when the notification is empty.
+ */
+ @Test
+ public void testPublishEmpty() {
+ notifier.removePdp(PDP1);
+
+ verify(publisher, never()).enqueue(any());
+ }
+
+ /**
+ * Tests publish(), when the notification is NOT empty.
+ */
+ @Test
+ public void testPublishNotEmpty() {
+ doAnswer(addStatus(1, status1, status2)).when(deploy).removePdp(eq(PDP1), any());
+
+ notifier.removePdp(PDP1);
+
+ verify(publisher).enqueue(any());
+ }
+
+ @Test
+ public void testMakeDeploymentTracker_testMakeUndeploymentTracker() {
+ // make real object, which will invoke the real makeXxx() methods
+ new PolicyNotifier(publisher).removePdp(PDP1);
+
+ verify(publisher, never()).enqueue(any());
+ }
+
+ /**
+ * Creates an answer that adds status updates to a status list.
+ *
+ * @param listIndex index of the status list within the argument list
+ * @param status status updates to be added
+ * @return an answer that adds the given status updates
+ */
+ private Answer<Void> addStatus(int listIndex, PolicyStatus... status) {
+ return invocation -> {
+ @SuppressWarnings("unchecked")
+ List<PolicyStatus> statusList = invocation.getArgumentAt(listIndex, List.class);
+ statusList.addAll(Arrays.asList(status));
+ return null;
+ };
+ }
+
+ /**
+ * Gets the notification that was published.
+ *
+ * @return the notification that was published
+ */
+ private PolicyNotification getNotification() {
+ verify(publisher).enqueue(notifyCaptor.capture());
+ return notifyCaptor.getValue().get();
+ }
+
+
+ private class MyNotifier extends PolicyNotifier {
+
+ public MyNotifier(Publisher<PolicyNotification> publisher) {
+ super(publisher);
+ }
+
+ @Override
+ protected PolicyDeployTracker makeDeploymentTracker() {
+ return deploy;
+ }
+
+ @Override
+ protected PolicyUndeployTracker makeUndeploymentTracker() {
+ return undeploy;
+ }
+ }
+}