diff options
Diffstat (limited to 'main')
5 files changed, 292 insertions, 1 deletions
diff --git a/main/pom.xml b/main/pom.xml index 63eb0ecc..6cf42d09 100644 --- a/main/pom.xml +++ b/main/pom.xml @@ -45,6 +45,11 @@ </dependency> <dependency> <groupId>org.onap.policy.common</groupId> + <artifactId>pdp-common</artifactId> + <version>${policy.common.version}</version> + </dependency> + <dependency> + <groupId>org.onap.policy.common</groupId> <artifactId>utils</artifactId> <version>${policy.common.version}</version> </dependency> diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java new file mode 100644 index 00000000..38588d89 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java @@ -0,0 +1,109 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import java.util.Collection; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.TypedMessageListener; +import org.onap.policy.pdp.common.models.PdpStatus; + +/** + * Listener for PDP Status messages expected to be received from multiple PDPs. The + * listener "completes" once a message has been seen from all of the PDPs. + */ +public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> { + + /** + * This is decremented once a message has been received from every PDP. + */ + private final CountDownLatch allSeen = new CountDownLatch(1); + + /** + * PDPs from which no message has been received yet. + */ + private final Set<String> unseenPdpNames = ConcurrentHashMap.newKeySet(); + + /** + * Constructs the object. + * + * @param pdpName name of the PDP for which to wait + */ + public MultiPdpStatusListener(String pdpName) { + unseenPdpNames.add(pdpName); + } + + /** + * Constructs the object. + * + * @param pdpNames names of the PDP for which to wait + */ + public MultiPdpStatusListener(Collection<String> pdpNames) { + if (pdpNames.isEmpty()) { + allSeen.countDown(); + + } else { + unseenPdpNames.addAll(pdpNames); + } + } + + /** + * Gets the set of names for which messages have not yet been received. + * + * @return the names of the PDPs that have not been seen yet + */ + public SortedSet<String> getUnseenPdpNames() { + return new TreeSet<>(unseenPdpNames); + } + + /** + * Waits for messages to be received for all PDPs, or until a timeout is reached. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return {@code true} if messages were received for all PDPs, {@code false} if the + * timeout was reached first + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return allSeen.await(timeout, unit); + } + + /** + * Indicates that a message was received for a PDP. Triggers completion of + * {@link #await(long, TimeUnit)} if all PDPs have received a message. Threads may + * override this method to process a message. However, they should still invoke this + * method so that PDPs can be properly tracked. + */ + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, PdpStatus message) { + unseenPdpNames.remove(message.getName()); + + if (unseenPdpNames.isEmpty()) { + allSeen.countDown(); + } + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PapParameterHandler.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PapParameterHandler.java index 489eb52d..0bcbb5cb 100644 --- a/main/src/main/java/org/onap/policy/pap/main/parameters/PapParameterHandler.java +++ b/main/src/main/java/org/onap/policy/pap/main/parameters/PapParameterHandler.java @@ -21,7 +21,6 @@ package org.onap.policy.pap.main.parameters; - import java.io.File; import org.onap.policy.common.parameters.GroupValidationResult; import org.onap.policy.common.utils.coder.Coder; diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java new file mode 100644 index 00000000..2dc1aa2c --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java @@ -0,0 +1,177 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.pap.main.comm.MultiPdpStatusListener; +import org.onap.policy.pdp.common.models.PdpStatus; + +public class MultiPdpStatusListenerTest { + private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; + private static final String TOPIC = "my-topic"; + private static final String NAME1 = "pdp_1"; + private static final String NAME2 = "pdp_2"; + private static final List<String> NAME_LIST = Arrays.asList(NAME1, NAME2); + + private MultiPdpStatusListener listener; + private PdpStatus status; + + @Test + public void testMultiPdpStatusListenerString() throws Exception { + listener = new MultiPdpStatusListener(NAME1); + assertEquals(Arrays.asList(NAME1).toString(), listener.getUnseenPdpNames().toString()); + + // a name is in the queue - not done yet + assertFalse(doWait(0)); + } + + @Test + public void testMultiPdpStatusListenerCollectionOfString() throws Exception { + List<String> lst = NAME_LIST; + + listener = new MultiPdpStatusListener(lst); + assertEquals(lst.toString(), listener.getUnseenPdpNames().toString()); + + // a name is in the queue - not done yet + assertFalse(doWait(0)); + + /* + * Try with an empty list - should already be complete. + */ + listener = new MultiPdpStatusListener(new LinkedList<>()); + assertTrue(listener.getUnseenPdpNames().isEmpty()); + assertTrue(doWait(0)); + } + + @Test + public void testGetUnseenPdpNames() { + List<String> lst = NAME_LIST; + + listener = new MultiPdpStatusListener(lst); + assertEquals(lst.toString(), listener.getUnseenPdpNames().toString()); + + // receive message from one PDP + status = new PdpStatus(); + status.setName(NAME2); + listener.onTopicEvent(INFRA, TOPIC, status); + assertEquals(Arrays.asList(NAME1).toString(), listener.getUnseenPdpNames().toString()); + + // receive message from the other PDP + status = new PdpStatus(); + status.setName(NAME1); + listener.onTopicEvent(INFRA, TOPIC, status); + assertTrue(listener.getUnseenPdpNames().isEmpty()); + } + + @Test + public void testAwait() throws Exception { + // try with an empty list - should already be complete + listener = new MultiPdpStatusListener(new LinkedList<>()); + assertTrue(doWait(0)); + + // try it with something in the list + listener = new MultiPdpStatusListener(NAME_LIST); + assertFalse(doWait(0)); + + // process a message from one PDP - wait should block the entire time + status = new PdpStatus(); + status.setName(NAME1); + listener.onTopicEvent(INFRA, TOPIC, status); + long tbeg = System.currentTimeMillis(); + assertFalse(doWait(50)); + assertTrue(System.currentTimeMillis() - tbeg >= 49); + + // process a message from the other PDP - wait should NOT block + status = new PdpStatus(); + status.setName(NAME2); + listener.onTopicEvent(INFRA, TOPIC, status); + tbeg = System.currentTimeMillis(); + assertTrue(doWait(4000)); + assertTrue(System.currentTimeMillis() - tbeg < 3000); + } + + @Test + public void testOnTopicEvent() throws Exception { + listener = new MultiPdpStatusListener(NAME_LIST); + + // not done yet + assertFalse(doWait(0)); + + // process a message - still not done as have another name to go + status = new PdpStatus(); + status.setName(NAME1); + listener.onTopicEvent(INFRA, TOPIC, status); + assertFalse(doWait(0)); + + // process a message from the same PDP - still not done + status = new PdpStatus(); + status.setName(NAME1); + listener.onTopicEvent(INFRA, TOPIC, status); + assertFalse(doWait(0)); + + // process another message - now we're done + status = new PdpStatus(); + status.setName(NAME2); + listener.onTopicEvent(INFRA, TOPIC, status); + assertTrue(doWait(0)); + } + + /** + * Waits for the listener to complete. Spawns a background thread to do the waiting so + * we can limit how long we wait. + * + * @param millisec milliseconds to wait + * @return {@code true} if the wait completed successfully, {@code false} otherwise + * @throws InterruptedException if this thread is interrupted while waiting for the + * background thread to complete + */ + private boolean doWait(long millisec) throws InterruptedException { + AtomicBoolean done = new AtomicBoolean(false); + + Thread thread = new Thread() { + @Override + public void run() { + try { + done.set(listener.await(millisec, TimeUnit.MILLISECONDS)); + + } catch (InterruptedException expected) { + return; + } + } + }; + + thread.start(); + thread.join(5000); + thread.interrupt(); + + return done.get(); + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PdpClientExceptionTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PdpClientExceptionTest.java index dcaba782..aa927665 100644 --- a/main/src/test/java/org/onap/policy/pap/main/comm/PdpClientExceptionTest.java +++ b/main/src/test/java/org/onap/policy/pap/main/comm/PdpClientExceptionTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; import org.onap.policy.common.utils.test.ExceptionsTester; +import org.onap.policy.pap.main.comm.PdpClientException; public class PdpClientExceptionTest { |