aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java')
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java92
1 files changed, 54 insertions, 38 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index 01ee61ef..e32fa545 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -25,7 +25,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -40,7 +41,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -71,6 +71,7 @@ public class PoolingManagerImplTest {
protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
+ protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
private static final String HOST2 = "other.host";
@@ -85,8 +86,8 @@ public class PoolingManagerImplTest {
private static final String REQUEST_ID = "my.request.id";
/**
- * Number of dmaap.publish() invocations that should be issued when the
- * manager is started.
+ * Number of dmaap.publish() invocations that should be issued when the manager is
+ * started.
*/
private static final int START_PUB = 1;
@@ -135,6 +136,7 @@ public class PoolingManagerImplTest {
when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
+ when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
futures = new LinkedList<>();
ser = new Serializer();
@@ -180,11 +182,6 @@ public class PoolingManagerImplTest {
mgr = new PoolingManagerImpl(controller, poolProps);
}
- @After
- public void tearDown() throws Exception {
-
- }
-
@Test
public void testPoolingManagerImpl() {
mgr = new PoolingManagerImpl(controller, poolProps);
@@ -199,8 +196,8 @@ public class PoolingManagerImplTest {
@Test
public void testPoolingManagerImpl_ClassEx() {
/*
- * this controller does not implement TopicListener, which should cause
- * a ClassCastException
+ * this controller does not implement TopicListener, which should cause a
+ * ClassCastException
*/
PolicyController ctlr = mock(PolicyController.class);
@@ -316,6 +313,7 @@ public class PoolingManagerImplTest {
verify(dmaap).stopConsumer(mgr);
verify(sched).shutdownNow();
+ verify(dmaap).publish(contains("offline"));
assertTrue(mgr.getCurrent() instanceof IdleState);
}
@@ -362,7 +360,7 @@ public class PoolingManagerImplTest {
mgr.afterStop();
verify(eventQueue).clear();
- verify(dmaap).stopPublisher();
+ verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
}
@Test
@@ -376,7 +374,7 @@ public class PoolingManagerImplTest {
mgr.afterStop();
verify(eventQueue, never()).clear();
- verify(dmaap).stopPublisher();
+ verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
}
@Test
@@ -511,7 +509,7 @@ public class PoolingManagerImplTest {
CountDownLatch latch = new CountDownLatch(1);
- mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
latch.countDown();
return null;
});
@@ -539,7 +537,7 @@ public class PoolingManagerImplTest {
CountDownLatch latch = new CountDownLatch(1);
- mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
latch.countDown();
return null;
});
@@ -608,8 +606,7 @@ public class PoolingManagerImplTest {
StartState st = (StartState) mgr.getCurrent();
/*
- * give it its heart beat, that should cause it to transition to the
- * Query state.
+ * give it its heart beat, that should cause it to transition to the Query state.
*/
Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
hb.setChannel(Message.ADMIN);
@@ -985,14 +982,35 @@ public class PoolingManagerImplTest {
}
@Test
+ public void testInject_Ex() throws Exception {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ // generate RuntimeException when onTopicEvent() is invoked
+ doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
+
+ CountDownLatch latch = catchRecursion(true);
+
+ Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+ mgr.handle(msg);
+
+ verify(dmaap, times(START_PUB)).publish(any());
+ verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+
+ // ensure we made it past both beforeXxx() methods
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
public void testHandleInternal() throws Exception {
startMgr();
StartState st = (StartState) mgr.getCurrent();
/*
- * give it its heart beat, that should cause it to transition to the
- * Query state.
+ * give it its heart beat, that should cause it to transition to the Query state.
*/
Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
hb.setChannel(Message.ADMIN);
@@ -1022,8 +1040,8 @@ public class PoolingManagerImplTest {
Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
/*
- * do NOT set the channel - this will cause the message to be invalid,
- * triggering an exception
+ * do NOT set the channel - this will cause the message to be invalid, triggering
+ * an exception
*/
String msg = ser.encodeMsg(hb);
@@ -1068,7 +1086,7 @@ public class PoolingManagerImplTest {
when(eventQueue.poll()).thenAnswer(args -> lst.poll());
// route the messages to this host
- mgr.startDistributing(makeAssignments(true));
+ assertTrue(mgr.startDistributing(makeAssignments(true)).await(2, TimeUnit.SECONDS));
// all of the events should have been processed locally
verify(dmaap, times(START_PUB)).publish(any());
@@ -1088,7 +1106,7 @@ public class PoolingManagerImplTest {
when(eventQueue.poll()).thenAnswer(args -> lst.poll());
// route the messages to the OTHER host
- mgr.startDistributing(makeAssignments(false));
+ assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS));
// all of the events should have been forwarded
verify(dmaap, times(4)).publish(any());
@@ -1140,7 +1158,7 @@ public class PoolingManagerImplTest {
CountDownLatch latch = new CountDownLatch(1);
- mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
latch.countDown();
return null;
});
@@ -1163,7 +1181,7 @@ public class PoolingManagerImplTest {
CountDownLatch latch = new CountDownLatch(1);
- mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> {
+ mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
latch.countDown();
return null;
});
@@ -1208,15 +1226,14 @@ public class PoolingManagerImplTest {
}
/**
- * Configure the mock controller to act like a real controller, invoking
- * beforeOffer and then beforeInsert, so we can make sure they pass through.
- * We'll keep count to ensure we don't get into infinite recursion.
+ * Configure the mock controller to act like a real controller, invoking beforeOffer
+ * and then beforeInsert, so we can make sure they pass through. We'll keep count to
+ * ensure we don't get into infinite recursion.
*
- * @param invokeBeforeInsert {@code true} if beforeInsert() should be
- * invoked, {@code false} if it should be skipped
+ * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked,
+ * {@code false} if it should be skipped
*
- * @return a latch that will be counted down if both beforeXxx() methods
- * return false
+ * @return a latch that will be counted down if both beforeXxx() methods return false
*/
private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
CountDownLatch recursion = new CountDownLatch(3);
@@ -1230,9 +1247,9 @@ public class PoolingManagerImplTest {
}
int iarg = 0;
- CommInfrastructure proto = args.getArgumentAt(iarg++, CommInfrastructure.class);
- String topic = args.getArgumentAt(iarg++, String.class);
- String event = args.getArgumentAt(iarg++, String.class);
+ CommInfrastructure proto = args.getArgument(iarg++);
+ String topic = args.getArgument(iarg++);
+ String event = args.getArgument(iarg++);
if (mgr.beforeOffer(proto, topic, event)) {
return null;
@@ -1253,9 +1270,8 @@ public class PoolingManagerImplTest {
/**
* Makes an assignment with two buckets.
*
- * @param sameHost {@code true} if the {@link #REQUEST_ID} should has to the
- * manager's bucket, {@code false} if it should hash to the other
- * host's bucket
+ * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the
+ * manager's bucket, {@code false} if it should hash to the other host's bucket
* @return a new bucket assignment
*/
private BucketAssignments makeAssignments(boolean sameHost) {