aboutsummaryrefslogtreecommitdiffstats
path: root/main
diff options
context:
space:
mode:
Diffstat (limited to 'main')
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java65
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java112
2 files changed, 127 insertions, 50 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java
index 38588d89..652756a4 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/MultiPdpStatusListener.java
@@ -30,12 +30,15 @@ import java.util.concurrent.TimeUnit;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.TypedMessageListener;
import org.onap.policy.pdp.common.models.PdpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Listener for PDP Status messages expected to be received from multiple PDPs. The
* listener "completes" once a message has been seen from all of the PDPs.
*/
-public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> {
+public abstract class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> {
+ private static final Logger logger = LoggerFactory.getLogger(MultiPdpStatusListener.class);
/**
* This is decremented once a message has been received from every PDP.
@@ -43,40 +46,40 @@ public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> {
private final CountDownLatch allSeen = new CountDownLatch(1);
/**
- * PDPs from which no message has been received yet.
+ * IDs for which no message has been received yet.
*/
- private final Set<String> unseenPdpNames = ConcurrentHashMap.newKeySet();
+ private final Set<String> unseenIds = ConcurrentHashMap.newKeySet();
/**
* Constructs the object.
*
- * @param pdpName name of the PDP for which to wait
+ * @param id ID for which to wait
*/
- public MultiPdpStatusListener(String pdpName) {
- unseenPdpNames.add(pdpName);
+ public MultiPdpStatusListener(String id) {
+ unseenIds.add(id);
}
/**
* Constructs the object.
*
- * @param pdpNames names of the PDP for which to wait
+ * @param ids IDs for which to wait
*/
- public MultiPdpStatusListener(Collection<String> pdpNames) {
- if (pdpNames.isEmpty()) {
+ public MultiPdpStatusListener(Collection<String> ids) {
+ if (ids.isEmpty()) {
allSeen.countDown();
} else {
- unseenPdpNames.addAll(pdpNames);
+ unseenIds.addAll(ids);
}
}
/**
- * Gets the set of names for which messages have not yet been received.
+ * Gets the set of IDs for which messages have not yet been received.
*
- * @return the names of the PDPs that have not been seen yet
+ * @return the IDs that have not been seen yet
*/
- public SortedSet<String> getUnseenPdpNames() {
- return new TreeSet<>(unseenPdpNames);
+ public SortedSet<String> getUnseenIds() {
+ return new TreeSet<>(unseenIds);
}
/**
@@ -93,17 +96,37 @@ public class MultiPdpStatusListener implements TypedMessageListener<PdpStatus> {
}
/**
- * Indicates that a message was received for a PDP. Triggers completion of
- * {@link #await(long, TimeUnit)} if all PDPs have received a message. Threads may
- * override this method to process a message. However, they should still invoke this
- * method so that PDPs can be properly tracked.
+ * After giving the event to the subclass via
+ * {@link #handleEvent(CommInfrastructure, String, PdpStatus)}, this triggers
+ * completion of {@link #await(long, TimeUnit)}, if all PDPs have received a message.
*/
@Override
- public void onTopicEvent(CommInfrastructure infra, String topic, PdpStatus message) {
- unseenPdpNames.remove(message.getName());
+ public final void onTopicEvent(CommInfrastructure infra, String topic, PdpStatus message) {
+ String id = null;
+ try {
+ id = handleEvent(infra, topic, message);
+ } catch (RuntimeException e) {
+ logger.warn("handleEvent failed due to: {}", e.getMessage(), e);
+ }
+
+ if (id == null) {
+ return;
+ }
+
+ unseenIds.remove(id);
- if (unseenPdpNames.isEmpty()) {
+ if (unseenIds.isEmpty()) {
allSeen.countDown();
}
}
+
+ /**
+ * Indicates that a message was received for a PDP.
+ *
+ * @param infra infrastructure with which the message was received
+ * @param topic topic on which the message was received
+ * @param message message that was received
+ * @return the ID extracted from the message, or {@code null} if it cannot be extracted
+ */
+ protected abstract String handleEvent(CommInfrastructure infra, String topic, PdpStatus message);
}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java
index 2dc1aa2c..d0dba808 100644
--- a/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/MultiPdpStatusListenerTest.java
@@ -32,78 +32,79 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.pap.main.comm.MultiPdpStatusListener;
+import org.onap.policy.pdp.common.models.PdpResponseDetails;
import org.onap.policy.pdp.common.models.PdpStatus;
public class MultiPdpStatusListenerTest {
private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
private static final String TOPIC = "my-topic";
- private static final String NAME1 = "pdp_1";
- private static final String NAME2 = "pdp_2";
- private static final List<String> NAME_LIST = Arrays.asList(NAME1, NAME2);
+ private static final String ID1 = "request-1";
+ private static final String ID2 = "request-2";
+ private static final List<String> ID_LIST = Arrays.asList(ID1, ID2);
private MultiPdpStatusListener listener;
private PdpStatus status;
@Test
public void testMultiPdpStatusListenerString() throws Exception {
- listener = new MultiPdpStatusListener(NAME1);
- assertEquals(Arrays.asList(NAME1).toString(), listener.getUnseenPdpNames().toString());
+ listener = new MyListener(ID1);
+ assertEquals(Arrays.asList(ID1).toString(), listener.getUnseenIds().toString());
- // a name is in the queue - not done yet
+ // an ID is in the queue - not done yet
assertFalse(doWait(0));
}
@Test
public void testMultiPdpStatusListenerCollectionOfString() throws Exception {
- List<String> lst = NAME_LIST;
+ List<String> lst = ID_LIST;
- listener = new MultiPdpStatusListener(lst);
- assertEquals(lst.toString(), listener.getUnseenPdpNames().toString());
+ listener = new MyListener(lst);
+ assertEquals(lst.toString(), listener.getUnseenIds().toString());
- // a name is in the queue - not done yet
+ // an ID is in the queue - not done yet
assertFalse(doWait(0));
/*
* Try with an empty list - should already be complete.
*/
- listener = new MultiPdpStatusListener(new LinkedList<>());
- assertTrue(listener.getUnseenPdpNames().isEmpty());
+ listener = new MyListener(new LinkedList<>());
+ assertTrue(listener.getUnseenIds().isEmpty());
assertTrue(doWait(0));
}
@Test
- public void testGetUnseenPdpNames() {
- List<String> lst = NAME_LIST;
+ public void testGetUnseenIds() {
+ List<String> lst = ID_LIST;
- listener = new MultiPdpStatusListener(lst);
- assertEquals(lst.toString(), listener.getUnseenPdpNames().toString());
+ listener = new MyListener(lst);
+ assertEquals(lst.toString(), listener.getUnseenIds().toString());
// receive message from one PDP
status = new PdpStatus();
- status.setName(NAME2);
+ status.setResponse(makeResponse(ID2));
listener.onTopicEvent(INFRA, TOPIC, status);
- assertEquals(Arrays.asList(NAME1).toString(), listener.getUnseenPdpNames().toString());
+ assertEquals(Arrays.asList(ID1).toString(), listener.getUnseenIds().toString());
// receive message from the other PDP
status = new PdpStatus();
- status.setName(NAME1);
+ status.setResponse(makeResponse(ID1));
listener.onTopicEvent(INFRA, TOPIC, status);
- assertTrue(listener.getUnseenPdpNames().isEmpty());
+ assertTrue(listener.getUnseenIds().isEmpty());
}
@Test
public void testAwait() throws Exception {
// try with an empty list - should already be complete
- listener = new MultiPdpStatusListener(new LinkedList<>());
+ listener = new MyListener(new LinkedList<>());
assertTrue(doWait(0));
// try it with something in the list
- listener = new MultiPdpStatusListener(NAME_LIST);
+ listener = new MyListener(ID_LIST);
assertFalse(doWait(0));
// process a message from one PDP - wait should block the entire time
status = new PdpStatus();
- status.setName(NAME1);
+ status.setResponse(makeResponse(ID1));
listener.onTopicEvent(INFRA, TOPIC, status);
long tbeg = System.currentTimeMillis();
assertFalse(doWait(50));
@@ -111,7 +112,7 @@ public class MultiPdpStatusListenerTest {
// process a message from the other PDP - wait should NOT block
status = new PdpStatus();
- status.setName(NAME2);
+ status.setResponse(makeResponse(ID2));
listener.onTopicEvent(INFRA, TOPIC, status);
tbeg = System.currentTimeMillis();
assertTrue(doWait(4000));
@@ -120,28 +121,52 @@ public class MultiPdpStatusListenerTest {
@Test
public void testOnTopicEvent() throws Exception {
- listener = new MultiPdpStatusListener(NAME_LIST);
+ listener = new MyListener(ID_LIST);
// not done yet
assertFalse(doWait(0));
- // process a message - still not done as have another name to go
+ // process a message - still not done as have another ID to go
status = new PdpStatus();
- status.setName(NAME1);
+ status.setResponse(makeResponse(ID1));
listener.onTopicEvent(INFRA, TOPIC, status);
assertFalse(doWait(0));
// process a message from the same PDP - still not done
status = new PdpStatus();
- status.setName(NAME1);
+ status.setResponse(makeResponse(ID1));
listener.onTopicEvent(INFRA, TOPIC, status);
assertFalse(doWait(0));
// process another message - now we're done
status = new PdpStatus();
- status.setName(NAME2);
+ status.setResponse(makeResponse(ID2));
listener.onTopicEvent(INFRA, TOPIC, status);
assertTrue(doWait(0));
+
+ // handleEvent throws an exception - doWait does not return true
+ listener = new MyListener(ID1) {
+ @Override
+ protected String handleEvent(CommInfrastructure infra, String topic, PdpStatus message) {
+ throw new RuntimeException("expected exception");
+ }
+ };
+ status = new PdpStatus();
+ status.setResponse(makeResponse(ID1));
+ listener.onTopicEvent(INFRA, TOPIC, status);
+ assertFalse(doWait(0));
+
+ // handleEvent returns null - doWait does not return true
+ listener = new MyListener(ID1) {
+ @Override
+ protected String handleEvent(CommInfrastructure infra, String topic, PdpStatus message) {
+ return null;
+ }
+ };
+ status = new PdpStatus();
+ status.setResponse(makeResponse(ID1));
+ listener.onTopicEvent(INFRA, TOPIC, status);
+ assertFalse(doWait(0));
}
/**
@@ -174,4 +199,33 @@ public class MultiPdpStatusListenerTest {
return done.get();
}
+
+ /**
+ * Makes a response for the given request ID.
+ *
+ * @param id ID of the request
+ * @return a new response
+ */
+ private PdpResponseDetails makeResponse(String id) {
+ PdpResponseDetails resp = new PdpResponseDetails();
+ resp.setResponseTo(id);
+
+ return resp;
+ }
+
+ private static class MyListener extends MultiPdpStatusListener {
+
+ public MyListener(String id) {
+ super(id);
+ }
+
+ public MyListener(List<String> lst) {
+ super(lst);
+ }
+
+ @Override
+ protected String handleEvent(CommInfrastructure infra, String topic, PdpStatus message) {
+ return (message.getResponse().getResponseTo());
+ }
+ }
}