diff options
5 files changed, 379 insertions, 11 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java b/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java index ec54dd5a..6d2d8358 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java @@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException; import org.onap.policy.common.utils.coder.StandardCoder; -import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.pap.main.PolicyPapException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +38,10 @@ import org.slf4j.LoggerFactory; * * <p>This class has not been tested for multiple threads invoking {@link #run()} * simultaneously. + * + * @param <T> type of message published by this publisher */ -public class Publisher implements Runnable { +public class Publisher<T> implements Runnable { private static final Logger logger = LoggerFactory.getLogger(Publisher.class); /** @@ -51,7 +52,7 @@ public class Publisher implements Runnable { /** * Request queue. The references may contain {@code null}. */ - private final BlockingQueue<QueueToken<PdpMessage>> queue = new LinkedBlockingQueue<>(); + private final BlockingQueue<QueueToken<T>> queue = new LinkedBlockingQueue<>(); /** * Set to {@code true} to cause the publisher to stop running. @@ -88,7 +89,7 @@ public class Publisher implements Runnable { * * @param ref reference to the message to be published */ - public void enqueue(QueueToken<PdpMessage> ref) { + public void enqueue(QueueToken<T> ref) { queue.add(ref); } @@ -98,7 +99,7 @@ public class Publisher implements Runnable { @Override public void run() { for (;;) { - QueueToken<PdpMessage> token = getNext(); + QueueToken<T> token = getNext(); if (stopNow) { // unblock any other publisher threads @@ -106,7 +107,7 @@ public class Publisher implements Runnable { break; } - PdpMessage data = token.replaceItem(null); + T data = token.replaceItem(null); if (data != null) { client.send(data); } @@ -120,7 +121,7 @@ public class Publisher implements Runnable { * @return the next item, or a reference containing {@code null} if this is * interrupted */ - private QueueToken<PdpMessage> getNext() { + private QueueToken<T> getNext() { try { return queue.take(); diff --git a/main/src/main/java/org/onap/policy/pap/main/notification/PolicyNotifier.java b/main/src/main/java/org/onap/policy/pap/main/notification/PolicyNotifier.java new file mode 100644 index 00000000..97a862be --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/notification/PolicyNotifier.java @@ -0,0 +1,153 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.notification; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import org.onap.policy.models.pap.concepts.PolicyNotification; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; +import org.onap.policy.pap.main.comm.Publisher; +import org.onap.policy.pap.main.comm.QueueToken; + +/** + * Notifier for completion of policy updates. + */ +public class PolicyNotifier { + /** + * Notification publisher. + */ + private final Publisher<PolicyNotification> publisher; + + /** + * Deployment tracker. + */ + private final PolicyDeployTracker deployTracker = makeDeploymentTracker(); + + /** + * Undeployment tracker. + */ + private final PolicyUndeployTracker undeployTracker = makeUndeploymentTracker(); + + + /** + * Constructs the object. + * + * @param publisher notification publisher + */ + public PolicyNotifier(Publisher<PolicyNotification> publisher) { + this.publisher = publisher; + } + + /** + * Adds data to the deployment tracker. If a PDP appears within the undeployment + * tracker, then it's removed from there. + * + * @param data data to be added + */ + public synchronized void addDeploymentData(PolicyPdpNotificationData data) { + PolicyNotification notification = new PolicyNotification(); + + undeployTracker.removeData(data, notification.getDeleted()); + deployTracker.addData(data); + + publish(notification); + } + + /** + * Adds data to the undeployment tracker. If a PDP appears within the deployment + * tracker, then it's removed from there. + * + * @param data data to be added + */ + public synchronized void addUndeploymentData(PolicyPdpNotificationData data) { + PolicyNotification notification = new PolicyNotification(); + + deployTracker.removeData(data, notification.getAdded()); + undeployTracker.addData(data); + + publish(notification); + } + + /** + * Processes a response from a PDP. + * + * @param pdp PDP of interest + * @param activePolicies policies that are still active on the PDP, as specified in + * the response + */ + public synchronized void processResponse(String pdp, Collection<ToscaPolicyIdentifier> activePolicies) { + processResponse(pdp, new HashSet<>(activePolicies)); + } + + /** + * Processes a response from a PDP. + * + * @param pdp PDP of interest + * @param activePolicies policies that are still active on the PDP, as specified in + * the response + */ + public synchronized void processResponse(String pdp, Set<ToscaPolicyIdentifier> activePolicies) { + PolicyNotification notification = new PolicyNotification(); + + undeployTracker.processResponse(pdp, activePolicies, notification.getDeleted()); + deployTracker.processResponse(pdp, activePolicies, notification.getAdded()); + + publish(notification); + } + + /** + * Removes a PDP from any policies still awaiting responses from it, generating + * notifications for any of those policies that become complete as a result. + * + * @param pdp PDP to be removed + */ + public synchronized void removePdp(String pdp) { + PolicyNotification notification = new PolicyNotification(); + + undeployTracker.removePdp(pdp, notification.getDeleted()); + deployTracker.removePdp(pdp, notification.getAdded()); + + publish(notification); + } + + /** + * Publishes a notification, if it is not empty. + * + * @param notification notification to be published + */ + private void publish(PolicyNotification notification) { + if (!notification.isEmpty()) { + publisher.enqueue(new QueueToken<>(notification)); + } + } + + + // the following methods may be overridden by junit tests + + protected PolicyDeployTracker makeDeploymentTracker() { + return new PolicyDeployTracker(); + } + + protected PolicyUndeployTracker makeUndeploymentTracker() { + return new PolicyUndeployTracker(); + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/CommonRequestBase.java b/main/src/test/java/org/onap/policy/pap/main/comm/CommonRequestBase.java index ceda1bab..0337161e 100644 --- a/main/src/test/java/org/onap/policy/pap/main/comm/CommonRequestBase.java +++ b/main/src/test/java/org/onap/policy/pap/main/comm/CommonRequestBase.java @@ -69,7 +69,7 @@ public class CommonRequestBase { protected static final PdpState DIFF_STATE = PdpState.TERMINATED; protected static final int RETRIES = 1; - protected Publisher publisher; + protected Publisher<PdpMessage> publisher; protected RequestIdDispatcher<PdpStatus> dispatcher; protected Object lock; protected TimerManager timers; diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java index 4b9bef71..2c0479b1 100644 --- a/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java +++ b/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java @@ -75,7 +75,7 @@ public class PublisherTest extends Threaded { */ private static final long MAX_WAIT_MS = 5000; - private Publisher pub; + private Publisher<PdpMessage> pub; private MyListener listener; /** @@ -108,7 +108,7 @@ public class PublisherTest extends Threaded { public void setUp() throws Exception { super.setUp(); - pub = new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP); + pub = new Publisher<>(PapConstants.TOPIC_POLICY_PDP_PAP); listener = new MyListener(); TopicEndpointManager.getManager().getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).register(listener); @@ -146,7 +146,7 @@ public class PublisherTest extends Threaded { @Test public void testPublisher_Ex() throws Exception { - assertThatThrownBy(() -> new Publisher("unknwon-topic")).isInstanceOf(PolicyPapException.class); + assertThatThrownBy(() -> new Publisher<>("unknwon-topic")).isInstanceOf(PolicyPapException.class); } @Test diff --git a/main/src/test/java/org/onap/policy/pap/main/notification/PolicyNotifierTest.java b/main/src/test/java/org/onap/policy/pap/main/notification/PolicyNotifierTest.java new file mode 100644 index 00000000..1c65dd10 --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/notification/PolicyNotifierTest.java @@ -0,0 +1,214 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.notification; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.stubbing.Answer; +import org.onap.policy.models.pap.concepts.PolicyNotification; +import org.onap.policy.models.pap.concepts.PolicyStatus; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; +import org.onap.policy.pap.main.comm.Publisher; +import org.onap.policy.pap.main.comm.QueueToken; + +public class PolicyNotifierTest extends PolicyCommonSupport { + + @Mock + private Publisher<PolicyNotification> publisher; + + @Mock + private PolicyDeployTracker deploy; + + @Mock + private PolicyUndeployTracker undeploy; + + @Mock + private PolicyStatus status1; + + @Mock + private PolicyStatus status2; + + @Mock + private PolicyStatus status3; + + @Mock + private PolicyStatus status4; + + @Captor + ArgumentCaptor<QueueToken<PolicyNotification>> notifyCaptor; + + private MyNotifier notifier; + + /** + * Creates various objects, including {@link #notifier}. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + super.setUp(); + + notifier = new MyNotifier(publisher); + } + + @Test + public void testAddDeploymentData() { + doAnswer(addStatus(1, status1, status2)).when(undeploy).removeData(any(), any()); + + PolicyPdpNotificationData data = makeData(policy1, PDP1, PDP2); + notifier.addDeploymentData(data); + + verify(deploy).addData(data); + verify(undeploy).removeData(eq(data), any()); + + PolicyNotification notification = getNotification(); + assertEquals(Arrays.asList(status1, status2), notification.getDeleted()); + assertTrue(notification.getAdded().isEmpty()); + } + + @Test + public void testAddUndeploymentData() { + doAnswer(addStatus(1, status1, status2)).when(deploy).removeData(any(), any()); + + PolicyPdpNotificationData data = makeData(policy1, PDP1, PDP2); + notifier.addUndeploymentData(data); + + verify(undeploy).addData(data); + verify(deploy).removeData(eq(data), any()); + + PolicyNotification notification = getNotification(); + assertEquals(Arrays.asList(status1, status2), notification.getAdded()); + assertTrue(notification.getDeleted().isEmpty()); + } + + @Test + public void testProcessResponseString() { + doAnswer(addStatus(2, status1, status2)).when(deploy).processResponse(eq(PDP1), any(), any()); + doAnswer(addStatus(2, status3, status4)).when(undeploy).processResponse(eq(PDP1), any(), any()); + + List<ToscaPolicyIdentifier> activePolicies = Arrays.asList(policy1, policy2); + notifier.processResponse(PDP1, activePolicies); + + PolicyNotification notification = getNotification(); + assertEquals(Arrays.asList(status1, status2), notification.getAdded()); + assertEquals(Arrays.asList(status3, status4), notification.getDeleted()); + } + + @Test + public void testRemovePdp() { + doAnswer(addStatus(1, status1, status2)).when(deploy).removePdp(eq(PDP1), any()); + doAnswer(addStatus(1, status3, status4)).when(undeploy).removePdp(eq(PDP1), any()); + + notifier.removePdp(PDP1); + + PolicyNotification notification = getNotification(); + assertEquals(Arrays.asList(status1, status2), notification.getAdded()); + assertEquals(Arrays.asList(status3, status4), notification.getDeleted()); + } + + /** + * Tests publish(), when the notification is empty. + */ + @Test + public void testPublishEmpty() { + notifier.removePdp(PDP1); + + verify(publisher, never()).enqueue(any()); + } + + /** + * Tests publish(), when the notification is NOT empty. + */ + @Test + public void testPublishNotEmpty() { + doAnswer(addStatus(1, status1, status2)).when(deploy).removePdp(eq(PDP1), any()); + + notifier.removePdp(PDP1); + + verify(publisher).enqueue(any()); + } + + @Test + public void testMakeDeploymentTracker_testMakeUndeploymentTracker() { + // make real object, which will invoke the real makeXxx() methods + new PolicyNotifier(publisher).removePdp(PDP1); + + verify(publisher, never()).enqueue(any()); + } + + /** + * Creates an answer that adds status updates to a status list. + * + * @param listIndex index of the status list within the argument list + * @param status status updates to be added + * @return an answer that adds the given status updates + */ + private Answer<Void> addStatus(int listIndex, PolicyStatus... status) { + return invocation -> { + @SuppressWarnings("unchecked") + List<PolicyStatus> statusList = invocation.getArgumentAt(listIndex, List.class); + statusList.addAll(Arrays.asList(status)); + return null; + }; + } + + /** + * Gets the notification that was published. + * + * @return the notification that was published + */ + private PolicyNotification getNotification() { + verify(publisher).enqueue(notifyCaptor.capture()); + return notifyCaptor.getValue().get(); + } + + + private class MyNotifier extends PolicyNotifier { + + public MyNotifier(Publisher<PolicyNotification> publisher) { + super(publisher); + } + + @Override + protected PolicyDeployTracker makeDeploymentTracker() { + return deploy; + } + + @Override + protected PolicyUndeployTracker makeUndeploymentTracker() { + return undeploy; + } + } +} |