diff options
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java')
-rw-r--r-- | feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java | 93 |
1 files changed, 65 insertions, 28 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java index f68f2395..a91671fd 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java @@ -21,6 +21,8 @@ package org.onap.policy.drools.pooling; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; @@ -211,11 +213,11 @@ public class DmaapManagerTest { expectException("startPublisher,start", xxx -> mgr.startPublisher()); expectException("startPublisher,publish", xxx -> mgr.publish(MSG)); - + // allow it to succeed this time reset(sink); when(sink.send(any())).thenReturn(true); - + mgr.startPublisher(); verify(sink).start(); @@ -227,29 +229,64 @@ public class DmaapManagerTest { @Test public void testStopPublisher() throws PoolingFeatureException { // not publishing yet, so stopping should have no effect - mgr.stopPublisher(); + mgr.stopPublisher(0); verify(sink, never()).stop(); - + // now start it mgr.startPublisher(); - + // this time, stop should do something - mgr.stopPublisher(); + mgr.stopPublisher(0); verify(sink).stop(); - + // re-stopping should have no effect - mgr.stopPublisher(); + mgr.stopPublisher(0); verify(sink).stop(); } @Test + public void testStopPublisher_WithDelay() throws PoolingFeatureException { + + mgr.startPublisher(); + + long tbeg = System.currentTimeMillis(); + + mgr.stopPublisher(100L); + + assertTrue(System.currentTimeMillis() >= tbeg + 100L); + } + + @Test + public void testStopPublisher_WithDelayInterrupted() throws Exception { + + mgr.startPublisher(); + + long minms = 2000L; + + // tell the publisher to stop in minms + additional time + Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L)); + thread.start(); + + // give the thread a chance to start + Thread.sleep(50L); + + // interrupt it - it should immediately finish its work + thread.interrupt(); + + // wait for it to stop, but only wait the minimum time + thread.join(minms); + + assertFalse(thread.isAlive()); + } + + @Test public void testStopPublisher_Exception() throws PoolingFeatureException { mgr.startPublisher(); - + // force exception when it stops doThrow(new IllegalStateException("expected")).when(sink).stop(); - mgr.stopPublisher(); + mgr.stopPublisher(0); } @Test @@ -270,14 +307,14 @@ public class DmaapManagerTest { // not consuming yet, so stopping should have no effect mgr.stopConsumer(listener); verify(source, never()).unregister(any()); - + // now start it mgr.startConsumer(listener); - + // this time, stop should do something mgr.stopConsumer(listener); verify(source).unregister(listener); - + // re-stopping should have no effect mgr.stopConsumer(listener); verify(source).unregister(listener); @@ -292,7 +329,7 @@ public class DmaapManagerTest { public void testSetFilter_Exception() throws PoolingFeatureException { // force an error when setFilter() is called doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any()); - + mgr.setFilter(FILTER); } @@ -300,41 +337,41 @@ public class DmaapManagerTest { public void testPublish() throws PoolingFeatureException { // cannot publish before starting expectException("publish,pre", xxx -> mgr.publish(MSG)); - + mgr.startPublisher(); - + // publish several messages mgr.publish(MSG); verify(sink).send(MSG); - - mgr.publish(MSG+"a"); - verify(sink).send(MSG+"a"); - - mgr.publish(MSG+"b"); - verify(sink).send(MSG+"b"); - + + mgr.publish(MSG + "a"); + verify(sink).send(MSG + "a"); + + mgr.publish(MSG + "b"); + verify(sink).send(MSG + "b"); + // stop and verify we can no longer publish - mgr.stopPublisher(); + mgr.stopPublisher(0); expectException("publish,stopped", xxx -> mgr.publish(MSG)); } @Test(expected = PoolingFeatureException.class) public void testPublish_SendFailed() throws PoolingFeatureException { mgr.startPublisher(); - + // arrange for send() to fail when(sink.send(MSG)).thenReturn(false); - + mgr.publish(MSG); } @Test(expected = PoolingFeatureException.class) public void testPublish_SendEx() throws PoolingFeatureException { mgr.startPublisher(); - + // arrange for send() to throw an exception doThrow(new IllegalStateException("expected")).when(sink).send(MSG); - + mgr.publish(MSG); } |