summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java')
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java93
1 files changed, 65 insertions, 28 deletions
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
index f68f2395..a91671fd 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -21,6 +21,8 @@
package org.onap.policy.drools.pooling;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
@@ -211,11 +213,11 @@ public class DmaapManagerTest {
expectException("startPublisher,start", xxx -> mgr.startPublisher());
expectException("startPublisher,publish", xxx -> mgr.publish(MSG));
-
+
// allow it to succeed this time
reset(sink);
when(sink.send(any())).thenReturn(true);
-
+
mgr.startPublisher();
verify(sink).start();
@@ -227,29 +229,64 @@ public class DmaapManagerTest {
@Test
public void testStopPublisher() throws PoolingFeatureException {
// not publishing yet, so stopping should have no effect
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
verify(sink, never()).stop();
-
+
// now start it
mgr.startPublisher();
-
+
// this time, stop should do something
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
verify(sink).stop();
-
+
// re-stopping should have no effect
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
verify(sink).stop();
}
@Test
+ public void testStopPublisher_WithDelay() throws PoolingFeatureException {
+
+ mgr.startPublisher();
+
+ long tbeg = System.currentTimeMillis();
+
+ mgr.stopPublisher(100L);
+
+ assertTrue(System.currentTimeMillis() >= tbeg + 100L);
+ }
+
+ @Test
+ public void testStopPublisher_WithDelayInterrupted() throws Exception {
+
+ mgr.startPublisher();
+
+ long minms = 2000L;
+
+ // tell the publisher to stop in minms + additional time
+ Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L));
+ thread.start();
+
+ // give the thread a chance to start
+ Thread.sleep(50L);
+
+ // interrupt it - it should immediately finish its work
+ thread.interrupt();
+
+ // wait for it to stop, but only wait the minimum time
+ thread.join(minms);
+
+ assertFalse(thread.isAlive());
+ }
+
+ @Test
public void testStopPublisher_Exception() throws PoolingFeatureException {
mgr.startPublisher();
-
+
// force exception when it stops
doThrow(new IllegalStateException("expected")).when(sink).stop();
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
}
@Test
@@ -270,14 +307,14 @@ public class DmaapManagerTest {
// not consuming yet, so stopping should have no effect
mgr.stopConsumer(listener);
verify(source, never()).unregister(any());
-
+
// now start it
mgr.startConsumer(listener);
-
+
// this time, stop should do something
mgr.stopConsumer(listener);
verify(source).unregister(listener);
-
+
// re-stopping should have no effect
mgr.stopConsumer(listener);
verify(source).unregister(listener);
@@ -292,7 +329,7 @@ public class DmaapManagerTest {
public void testSetFilter_Exception() throws PoolingFeatureException {
// force an error when setFilter() is called
doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any());
-
+
mgr.setFilter(FILTER);
}
@@ -300,41 +337,41 @@ public class DmaapManagerTest {
public void testPublish() throws PoolingFeatureException {
// cannot publish before starting
expectException("publish,pre", xxx -> mgr.publish(MSG));
-
+
mgr.startPublisher();
-
+
// publish several messages
mgr.publish(MSG);
verify(sink).send(MSG);
-
- mgr.publish(MSG+"a");
- verify(sink).send(MSG+"a");
-
- mgr.publish(MSG+"b");
- verify(sink).send(MSG+"b");
-
+
+ mgr.publish(MSG + "a");
+ verify(sink).send(MSG + "a");
+
+ mgr.publish(MSG + "b");
+ verify(sink).send(MSG + "b");
+
// stop and verify we can no longer publish
- mgr.stopPublisher();
+ mgr.stopPublisher(0);
expectException("publish,stopped", xxx -> mgr.publish(MSG));
}
@Test(expected = PoolingFeatureException.class)
public void testPublish_SendFailed() throws PoolingFeatureException {
mgr.startPublisher();
-
+
// arrange for send() to fail
when(sink.send(MSG)).thenReturn(false);
-
+
mgr.publish(MSG);
}
@Test(expected = PoolingFeatureException.class)
public void testPublish_SendEx() throws PoolingFeatureException {
mgr.startPublisher();
-
+
// arrange for send() to throw an exception
doThrow(new IllegalStateException("expected")).when(sink).send(MSG);
-
+
mgr.publish(MSG);
}