diff options
Diffstat (limited to 'main/src')
-rw-r--r-- | main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java | 65 | ||||
-rw-r--r-- | main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java | 112 |
2 files changed, 127 insertions, 50 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java index 38588d89..652756a4 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java @@ -30,12 +30,15 @@ import java.util.concurrent.TimeUnit; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.TypedMessageListener; import org.onap.policy.pdp.common.models.PdpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Listener for PDP Status messages expected to be received from multiple PDPs. The * listener "completes" once a message has been seen from all of the PDPs. */ -public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> { +public abstract class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> { + private static final Logger logger = LoggerFactory.getLogger(MultiPdpStatusListener.class); /** * This is decremented once a message has been received from every PDP. @@ -43,40 +46,40 @@ public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> { private final CountDownLatch allSeen = new CountDownLatch(1); /** - * PDPs from which no message has been received yet. + * IDs for which no message has been received yet. */ - private final Set<String> unseenPdpNames = ConcurrentHashMap.newKeySet(); + private final Set<String> unseenIds = ConcurrentHashMap.newKeySet(); /** * Constructs the object. * - * @param pdpName name of the PDP for which to wait + * @param id ID for which to wait */ - public MultiPdpStatusListener(String pdpName) { - unseenPdpNames.add(pdpName); + public MultiPdpStatusListener(String id) { + unseenIds.add(id); } /** * Constructs the object. * - * @param pdpNames names of the PDP for which to wait + * @param ids IDs for which to wait */ - public MultiPdpStatusListener(Collection<String> pdpNames) { - if (pdpNames.isEmpty()) { + public MultiPdpStatusListener(Collection<String> ids) { + if (ids.isEmpty()) { allSeen.countDown(); } else { - unseenPdpNames.addAll(pdpNames); + unseenIds.addAll(ids); } } /** - * Gets the set of names for which messages have not yet been received. + * Gets the set of IDs for which messages have not yet been received. * - * @return the names of the PDPs that have not been seen yet + * @return the IDs that have not been seen yet */ - public SortedSet<String> getUnseenPdpNames() { - return new TreeSet<>(unseenPdpNames); + public SortedSet<String> getUnseenIds() { + return new TreeSet<>(unseenIds); } /** @@ -93,17 +96,37 @@ public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> { } /** - * Indicates that a message was received for a PDP. Triggers completion of - * {@link #await(long, TimeUnit)} if all PDPs have received a message. Threads may - * override this method to process a message. However, they should still invoke this - * method so that PDPs can be properly tracked. + * After giving the event to the subclass via + * {@link #handleEvent(CommInfrastructure, String, PdpStatus)}, this triggers + * completion of {@link #await(long, TimeUnit)}, if all PDPs have received a message. */ @Override - public void onTopicEvent(CommInfrastructure infra, String topic, PdpStatus message) { - unseenPdpNames.remove(message.getName()); + public final void onTopicEvent(CommInfrastructure infra, String topic, PdpStatus message) { + String id = null; + try { + id = handleEvent(infra, topic, message); + } catch (RuntimeException e) { + logger.warn("handleEvent failed due to: {}", e.getMessage(), e); + } + + if (id == null) { + return; + } + + unseenIds.remove(id); - if (unseenPdpNames.isEmpty()) { + if (unseenIds.isEmpty()) { allSeen.countDown(); } } + + /** + * Indicates that a message was received for a PDP. + * + * @param infra infrastructure with which the message was received + * @param topic topic on which the message was received + * @param message message that was received + * @return the ID extracted from the message, or {@code null} if it cannot be extracted + */ + protected abstract String handleEvent(CommInfrastructure infra, String topic, PdpStatus message); } diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java index 2dc1aa2c..d0dba808 100644 --- a/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java +++ b/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java @@ -32,78 +32,79 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.pap.main.comm.MultiPdpStatusListener; +import org.onap.policy.pdp.common.models.PdpResponseDetails; import org.onap.policy.pdp.common.models.PdpStatus; public class MultiPdpStatusListenerTest { private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; private static final String TOPIC = "my-topic"; - private static final String NAME1 = "pdp_1"; - private static final String NAME2 = "pdp_2"; - private static final List<String> NAME_LIST = Arrays.asList(NAME1, NAME2); + private static final String ID1 = "request-1"; + private static final String ID2 = "request-2"; + private static final List<String> ID_LIST = Arrays.asList(ID1, ID2); private MultiPdpStatusListener listener; private PdpStatus status; @Test public void testMultiPdpStatusListenerString() throws Exception { - listener = new MultiPdpStatusListener(NAME1); - assertEquals(Arrays.asList(NAME1).toString(), listener.getUnseenPdpNames().toString()); + listener = new MyListener(ID1); + assertEquals(Arrays.asList(ID1).toString(), listener.getUnseenIds().toString()); - // a name is in the queue - not done yet + // an ID is in the queue - not done yet assertFalse(doWait(0)); } @Test public void testMultiPdpStatusListenerCollectionOfString() throws Exception { - List<String> lst = NAME_LIST; + List<String> lst = ID_LIST; - listener = new MultiPdpStatusListener(lst); - assertEquals(lst.toString(), listener.getUnseenPdpNames().toString()); + listener = new MyListener(lst); + assertEquals(lst.toString(), listener.getUnseenIds().toString()); - // a name is in the queue - not done yet + // an ID is in the queue - not done yet assertFalse(doWait(0)); /* * Try with an empty list - should already be complete. */ - listener = new MultiPdpStatusListener(new LinkedList<>()); - assertTrue(listener.getUnseenPdpNames().isEmpty()); + listener = new MyListener(new LinkedList<>()); + assertTrue(listener.getUnseenIds().isEmpty()); assertTrue(doWait(0)); } @Test - public void testGetUnseenPdpNames() { - List<String> lst = NAME_LIST; + public void testGetUnseenIds() { + List<String> lst = ID_LIST; - listener = new MultiPdpStatusListener(lst); - assertEquals(lst.toString(), listener.getUnseenPdpNames().toString()); + listener = new MyListener(lst); + assertEquals(lst.toString(), listener.getUnseenIds().toString()); // receive message from one PDP status = new PdpStatus(); - status.setName(NAME2); + status.setResponse(makeResponse(ID2)); listener.onTopicEvent(INFRA, TOPIC, status); - assertEquals(Arrays.asList(NAME1).toString(), listener.getUnseenPdpNames().toString()); + assertEquals(Arrays.asList(ID1).toString(), listener.getUnseenIds().toString()); // receive message from the other PDP status = new PdpStatus(); - status.setName(NAME1); + status.setResponse(makeResponse(ID1)); listener.onTopicEvent(INFRA, TOPIC, status); - assertTrue(listener.getUnseenPdpNames().isEmpty()); + assertTrue(listener.getUnseenIds().isEmpty()); } @Test public void testAwait() throws Exception { // try with an empty list - should already be complete - listener = new MultiPdpStatusListener(new LinkedList<>()); + listener = new MyListener(new LinkedList<>()); assertTrue(doWait(0)); // try it with something in the list - listener = new MultiPdpStatusListener(NAME_LIST); + listener = new MyListener(ID_LIST); assertFalse(doWait(0)); // process a message from one PDP - wait should block the entire time status = new PdpStatus(); - status.setName(NAME1); + status.setResponse(makeResponse(ID1)); listener.onTopicEvent(INFRA, TOPIC, status); long tbeg = System.currentTimeMillis(); assertFalse(doWait(50)); @@ -111,7 +112,7 @@ public class MultiPdpStatusListenerTest { // process a message from the other PDP - wait should NOT block status = new PdpStatus(); - status.setName(NAME2); + status.setResponse(makeResponse(ID2)); listener.onTopicEvent(INFRA, TOPIC, status); tbeg = System.currentTimeMillis(); assertTrue(doWait(4000)); @@ -120,28 +121,52 @@ public class MultiPdpStatusListenerTest { @Test public void testOnTopicEvent() throws Exception { - listener = new MultiPdpStatusListener(NAME_LIST); + listener = new MyListener(ID_LIST); // not done yet assertFalse(doWait(0)); - // process a message - still not done as have another name to go + // process a message - still not done as have another ID to go status = new PdpStatus(); - status.setName(NAME1); + status.setResponse(makeResponse(ID1)); listener.onTopicEvent(INFRA, TOPIC, status); assertFalse(doWait(0)); // process a message from the same PDP - still not done status = new PdpStatus(); - status.setName(NAME1); + status.setResponse(makeResponse(ID1)); listener.onTopicEvent(INFRA, TOPIC, status); assertFalse(doWait(0)); // process another message - now we're done status = new PdpStatus(); - status.setName(NAME2); + status.setResponse(makeResponse(ID2)); listener.onTopicEvent(INFRA, TOPIC, status); assertTrue(doWait(0)); + + // handleEvent throws an exception - doWait does not return true + listener = new MyListener(ID1) { + @Override + protected String handleEvent(CommInfrastructure infra, String topic, PdpStatus message) { + throw new RuntimeException("expected exception"); + } + }; + status = new PdpStatus(); + status.setResponse(makeResponse(ID1)); + listener.onTopicEvent(INFRA, TOPIC, status); + assertFalse(doWait(0)); + + // handleEvent returns null - doWait does not return true + listener = new MyListener(ID1) { + @Override + protected String handleEvent(CommInfrastructure infra, String topic, PdpStatus message) { + return null; + } + }; + status = new PdpStatus(); + status.setResponse(makeResponse(ID1)); + listener.onTopicEvent(INFRA, TOPIC, status); + assertFalse(doWait(0)); } /** @@ -174,4 +199,33 @@ public class MultiPdpStatusListenerTest { return done.get(); } + + /** + * Makes a response for the given request ID. + * + * @param id ID of the request + * @return a new response + */ + private PdpResponseDetails makeResponse(String id) { + PdpResponseDetails resp = new PdpResponseDetails(); + resp.setResponseTo(id); + + return resp; + } + + private static class MyListener extends MultiPdpStatusListener { + + public MyListener(String id) { + super(id); + } + + public MyListener(List<String> lst) { + super(lst); + } + + @Override + protected String handleEvent(CommInfrastructure infra, String topic, PdpStatus message) { + return (message.getResponse().getResponseTo()); + } + } } |