diff options
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java')
-rw-r--r-- | feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java | 92 |
1 files changed, 54 insertions, 38 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index 01ee61ef..e32fa545 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -25,7 +25,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -40,7 +41,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -71,6 +71,7 @@ public class PoolingManagerImplTest { protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1; protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1; protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1; + protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1; private static final String HOST2 = "other.host"; @@ -85,8 +86,8 @@ public class PoolingManagerImplTest { private static final String REQUEST_ID = "my.request.id"; /** - * Number of dmaap.publish() invocations that should be issued when the - * manager is started. + * Number of dmaap.publish() invocations that should be issued when the manager is + * started. */ private static final int START_PUB = 1; @@ -135,6 +136,7 @@ public class PoolingManagerImplTest { when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS); when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS); when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS); + when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS); futures = new LinkedList<>(); ser = new Serializer(); @@ -180,11 +182,6 @@ public class PoolingManagerImplTest { mgr = new PoolingManagerImpl(controller, poolProps); } - @After - public void tearDown() throws Exception { - - } - @Test public void testPoolingManagerImpl() { mgr = new PoolingManagerImpl(controller, poolProps); @@ -199,8 +196,8 @@ public class PoolingManagerImplTest { @Test public void testPoolingManagerImpl_ClassEx() { /* - * this controller does not implement TopicListener, which should cause - * a ClassCastException + * this controller does not implement TopicListener, which should cause a + * ClassCastException */ PolicyController ctlr = mock(PolicyController.class); @@ -316,6 +313,7 @@ public class PoolingManagerImplTest { verify(dmaap).stopConsumer(mgr); verify(sched).shutdownNow(); + verify(dmaap).publish(contains("offline")); assertTrue(mgr.getCurrent() instanceof IdleState); } @@ -362,7 +360,7 @@ public class PoolingManagerImplTest { mgr.afterStop(); verify(eventQueue).clear(); - verify(dmaap).stopPublisher(); + verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); } @Test @@ -376,7 +374,7 @@ public class PoolingManagerImplTest { mgr.afterStop(); verify(eventQueue, never()).clear(); - verify(dmaap).stopPublisher(); + verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS); } @Test @@ -511,7 +509,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -539,7 +537,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -608,8 +606,7 @@ public class PoolingManagerImplTest { StartState st = (StartState) mgr.getCurrent(); /* - * give it its heart beat, that should cause it to transition to the - * Query state. + * give it its heart beat, that should cause it to transition to the Query state. */ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs()); hb.setChannel(Message.ADMIN); @@ -985,14 +982,35 @@ public class PoolingManagerImplTest { } @Test + public void testInject_Ex() throws Exception { + startMgr(); + + // route the message to this host + mgr.startDistributing(makeAssignments(true)); + + // generate RuntimeException when onTopicEvent() is invoked + doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any()); + + CountDownLatch latch = catchRecursion(true); + + Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID); + mgr.handle(msg); + + verify(dmaap, times(START_PUB)).publish(any()); + verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT); + + // ensure we made it past both beforeXxx() methods + assertEquals(0, latch.getCount()); + } + + @Test public void testHandleInternal() throws Exception { startMgr(); StartState st = (StartState) mgr.getCurrent(); /* - * give it its heart beat, that should cause it to transition to the - * Query state. + * give it its heart beat, that should cause it to transition to the Query state. */ Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs()); hb.setChannel(Message.ADMIN); @@ -1022,8 +1040,8 @@ public class PoolingManagerImplTest { Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs()); /* - * do NOT set the channel - this will cause the message to be invalid, - * triggering an exception + * do NOT set the channel - this will cause the message to be invalid, triggering + * an exception */ String msg = ser.encodeMsg(hb); @@ -1068,7 +1086,7 @@ public class PoolingManagerImplTest { when(eventQueue.poll()).thenAnswer(args -> lst.poll()); // route the messages to this host - mgr.startDistributing(makeAssignments(true)); + assertTrue(mgr.startDistributing(makeAssignments(true)).await(2, TimeUnit.SECONDS)); // all of the events should have been processed locally verify(dmaap, times(START_PUB)).publish(any()); @@ -1088,7 +1106,7 @@ public class PoolingManagerImplTest { when(eventQueue.poll()).thenAnswer(args -> lst.poll()); // route the messages to the OTHER host - mgr.startDistributing(makeAssignments(false)); + assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS)); // all of the events should have been forwarded verify(dmaap, times(4)).publish(any()); @@ -1140,7 +1158,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -1163,7 +1181,7 @@ public class PoolingManagerImplTest { CountDownLatch latch = new CountDownLatch(1); - mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, xxx -> { + mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> { latch.countDown(); return null; }); @@ -1208,15 +1226,14 @@ public class PoolingManagerImplTest { } /** - * Configure the mock controller to act like a real controller, invoking - * beforeOffer and then beforeInsert, so we can make sure they pass through. - * We'll keep count to ensure we don't get into infinite recursion. + * Configure the mock controller to act like a real controller, invoking beforeOffer + * and then beforeInsert, so we can make sure they pass through. We'll keep count to + * ensure we don't get into infinite recursion. * - * @param invokeBeforeInsert {@code true} if beforeInsert() should be - * invoked, {@code false} if it should be skipped + * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked, + * {@code false} if it should be skipped * - * @return a latch that will be counted down if both beforeXxx() methods - * return false + * @return a latch that will be counted down if both beforeXxx() methods return false */ private CountDownLatch catchRecursion(boolean invokeBeforeInsert) { CountDownLatch recursion = new CountDownLatch(3); @@ -1230,9 +1247,9 @@ public class PoolingManagerImplTest { } int iarg = 0; - CommInfrastructure proto = args.getArgumentAt(iarg++, CommInfrastructure.class); - String topic = args.getArgumentAt(iarg++, String.class); - String event = args.getArgumentAt(iarg++, String.class); + CommInfrastructure proto = args.getArgument(iarg++); + String topic = args.getArgument(iarg++); + String event = args.getArgument(iarg++); if (mgr.beforeOffer(proto, topic, event)) { return null; @@ -1253,9 +1270,8 @@ public class PoolingManagerImplTest { /** * Makes an assignment with two buckets. * - * @param sameHost {@code true} if the {@link #REQUEST_ID} should has to the - * manager's bucket, {@code false} if it should hash to the other - * host's bucket + * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the + * manager's bucket, {@code false} if it should hash to the other host's bucket * @return a new bucket assignment */ private BucketAssignments makeAssignments(boolean sameHost) { |